dda-python-terraform/dda_python_terraform/terraform.py

579 lines
22 KiB
Python
Raw Permalink Normal View History

2023-01-27 10:19:03 +00:00
"""Module providing wrapper for terraform."""
import json
import logging
import os
import subprocess
import sys
import tempfile
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Type, Union
from packaging import version
2022-01-28 08:17:28 +00:00
from dda_python_terraform.tfstate import Tfstate
logger = logging.getLogger(__name__)
COMMAND_WITH_SUBCOMMANDS = {"workspace"}
class TerraformFlag:
pass
class IsFlagged(TerraformFlag):
pass
class IsNotFlagged(TerraformFlag):
pass
CommandOutput = Tuple[Optional[int], Optional[str], Optional[str]]
class TerraformCommandError(subprocess.CalledProcessError):
2023-01-27 10:19:03 +00:00
"""Class representing a terraform error"""
def __init__(self, ret_code: int, cmd: str, out: Optional[str], err: Optional[str]):
2023-01-27 10:19:03 +00:00
super().__init__(ret_code, cmd)
self.out = out
self.err = err
logger.error("Error with command %s. Reason: %s", self.cmd, self.err)
class Terraform:
"""Wrapper of terraform command line tool.
https://www.terraform.io/
"""
def __init__(
self,
working_dir: Optional[str] = None,
targets: Optional[Sequence[str]] = None,
state: Optional[str] = None,
variables: Optional[Dict[str, str]] = None,
parallelism: Optional[str] = None,
var_file: Optional[str] = None,
terraform_bin_path: Optional[str] = None,
is_env_vars_included: bool = True,
terraform_semantic_version: Optional[str] = "0.13.0"
):
"""
:param working_dir: the folder of the working folder, if not given,
will be current working folder
:param targets: list of target
as default value of apply/destroy/plan command
:param state: path of state file relative to working folder,
as a default value of apply/destroy/plan command
:param variables: default variables for apply/destroy/plan command,
will be override by variable passing by apply/destroy/plan method
:param parallelism: default parallelism value for apply/destroy command
:param var_file: passed as value of -var-file option,
could be string or list, list stands for multiple -var-file option
:param terraform_bin_path: binary path of terraform
:type is_env_vars_included: bool
:param is_env_vars_included: included env variables when calling terraform cmd
:param terrform_semantic_version encodes major.minor.patch version of terraform. Defaults to 0.13.0
"""
self.is_env_vars_included = is_env_vars_included
self.working_dir = working_dir
self.state = state
self.targets = [] if targets is None else targets
2023-01-27 10:19:03 +00:00
self.variables = {} if variables is None else variables
self.parallelism = parallelism
self.terraform_bin_path = (
terraform_bin_path if terraform_bin_path else "terraform"
)
self.terraform_semantic_version = terraform_semantic_version
self.var_file = var_file
self.temp_var_files = VariableFiles()
# store the tfstate data
self.tfstate = None
self.read_state_file(self.state)
self.latest_cmd = ''
def apply(
self,
dir_or_plan: Optional[str] = None,
input: bool = False,
skip_plan: bool = True,
no_color: Type[TerraformFlag] = IsFlagged,
**kwargs,
) -> CommandOutput:
"""Refer to https://terraform.io/docs/commands/apply.html
no-color is flagged by default
:param no_color: disable color of stdout
:param input: disable prompt for a missing variable
:param dir_or_plan: folder relative to working folder
:param skip_plan: force apply without plan (default: false)
:param kwargs: same as kwags in method 'cmd'
:returns return_code, stdout, stderr
"""
if not skip_plan:
return self.plan(dir_or_plan=dir_or_plan, **kwargs)
global_opts = self._generate_default_general_options(dir_or_plan)
default = kwargs.copy()
default["input"] = input
default["no_color"] = no_color
default["auto-approve"] = True # a False value will require an input
option_dict = self._generate_default_options(default)
args = self._generate_default_args(dir_or_plan)
return self.cmd(global_opts, "apply", *args, **option_dict)
def refresh(
self,
dir_or_plan: Optional[str] = None,
input: bool = False,
no_color: Type[TerraformFlag] = IsFlagged,
**kwargs,
) -> CommandOutput:
"""Refer to https://terraform.io/docs/commands/refresh.html
no-color is flagged by default
:param no_color: disable color of stdout
:param input: disable prompt for a missing variable
:param dir_or_plan: folder relative to working folder
:param kwargs: same as kwags in method 'cmd'
:returns return_code, stdout, stderr
"""
global_opts = self._generate_default_general_options(dir_or_plan)
default = kwargs.copy()
default["input"] = input
default["no_color"] = no_color
option_dict = self._generate_default_options(default)
args = self._generate_default_args(dir_or_plan)
return self.cmd(global_opts, "refresh", *args, **option_dict)
def destroy(
self,
dir_or_plan: Optional[str] = None,
force: Type[TerraformFlag] = IsFlagged,
**kwargs,
) -> CommandOutput:
"""Refer to https://www.terraform.io/docs/commands/destroy.html
force/no-color option is flagged by default
:return: ret_code, stdout, stderr
"""
global_opts = self._generate_default_general_options(dir_or_plan)
default = kwargs.copy()
# force is no longer a flag in version >= 1.0
if version.parse(self.terraform_semantic_version) < version.parse("1.0.0"):
default["force"] = force
default["auto-approve"] = True
options = self._generate_default_options(default)
args = self._generate_default_args(dir_or_plan)
return self.cmd(global_opts, "destroy", *args, **options)
def plan(
self,
dir_or_plan: Optional[str] = None,
detailed_exitcode: Type[TerraformFlag] = IsFlagged,
**kwargs,
) -> CommandOutput:
"""Refer to https://www.terraform.io/docs/commands/plan.html
:param detailed_exitcode: Return a detailed exit code when the command exits.
:param dir_or_plan: relative path to plan/folder
:param kwargs: options
:return: ret_code, stdout, stderr
"""
global_opts = self._generate_default_general_options(dir_or_plan)
options = kwargs.copy()
options["detailed_exitcode"] = detailed_exitcode
options = self._generate_default_options(options)
args = self._generate_default_args(dir_or_plan)
return self.cmd(global_opts, "plan", *args, **options)
def init(
self,
dir_or_plan: Optional[str] = None,
backend_config: Optional[Dict[str, str]] = None,
reconfigure: Type[TerraformFlag] = IsFlagged,
backend: bool = True,
**kwargs,
) -> CommandOutput:
"""Refer to https://www.terraform.io/docs/commands/init.html
By default, this assumes you want to use backend config, and tries to
init fresh. The flags -reconfigure and -backend=true are default.
:param dir_or_plan: relative path to the folder want to init
:param backend_config: a dictionary of backend config options. eg.
t = Terraform()
t.init(backend_config={'access_key': 'myaccesskey',
'secret_key': 'mysecretkey', 'bucket': 'mybucketname'})
:param reconfigure: whether or not to force reconfiguration of backend
:param backend: whether or not to use backend settings for init
:param kwargs: options
:return: ret_code, stdout, stderr
"""
options = kwargs.copy()
options.update(
{
"backend_config": backend_config,
"reconfigure": reconfigure,
"backend": backend,
}
)
options = self._generate_default_options(options)
global_opts = self._generate_default_general_options(dir_or_plan)
args = self._generate_default_args(dir_or_plan)
return self.cmd(global_opts, "init", *args, **options)
def generate_cmd_string(self, global_options: Dict[str, Any], cmd: str, *args, **kwargs) -> List[str]:
"""For any generate_cmd_string doesn't written as public method of Terraform
examples:
1. call import command,
ref to https://www.terraform.io/docs/commands/import.html
--> generate_cmd_string call:
terraform import -input=true aws_instance.foo i-abcd1234
--> python call:
tf.generate_cmd_string('import', 'aws_instance.foo', 'i-abcd1234', input=True)
2. call apply command,
--> generate_cmd_string call:
terraform apply -var='a=b' -var='c=d' -no-color the_folder
--> python call:
tf.generate_cmd_string('apply', the_folder, no_color=IsFlagged, var={'a':'b', 'c':'d'})
:param cmd: command and sub-command of terraform, seperated with space
refer to https://www.terraform.io/docs/commands/index.html
:param args: arguments of a command
:param kwargs: same as kwags in method 'cmd'
:return: string of valid terraform command
"""
cmds = [self.terraform_bin_path]
cmds += self._generate_cmd_options(**global_options)
cmds += cmd.split()
if cmd in COMMAND_WITH_SUBCOMMANDS:
args = list(args)
subcommand = args.pop(0)
cmds.append(subcommand)
cmds += self._generate_cmd_options(**kwargs)
cmds += args
self.latest_cmd = ' '.join(cmds)
return cmds
def cmd(
self,
global_opts: Dict[str, Any],
cmd: str,
*args,
capture_output: Union[bool, str] = True,
raise_on_error: bool = True,
synchronous: bool = True,
**kwargs,
) -> CommandOutput:
"""Run a terraform command, if success, will try to read state file
:param cmd: command and sub-command of terraform, seperated with space
refer to https://www.terraform.io/docs/commands/index.html
:param args: arguments of a command
:param kwargs: any option flag with key value without prefixed dash character
if there's a dash in the option name, use under line instead of dash,
ex. -no-color --> no_color
if it's a simple flag with no value, value should be IsFlagged
ex. cmd('taint', allow_missing=IsFlagged)
if it's a boolean value flag, assign True or false
if it's a flag could be used multiple times, assign list to it's value
if it's a "var" variable flag, assign dictionary to it
if a value is None, will skip this option
if the option 'capture_output' is passed (with any value other than
True), terraform output will be printed to stdout/stderr and
"None" will be returned as out and err.
if the option 'raise_on_error' is passed (with any value that evaluates to True),
and the terraform command returns a nonzerop return code, then
a TerraformCommandError exception will be raised. The exception object will
have the following properties:
returncode: The command's return code
out: The captured stdout, or None if not captured
err: The captured stderr, or None if not captured
:return: ret_code, out, err
"""
if capture_output is True:
stderr = subprocess.PIPE
stdout = subprocess.PIPE
elif capture_output == "framework":
stderr = None
stdout = None
else:
stderr = sys.stderr
stdout = sys.stdout
cmds = self.generate_cmd_string(global_opts, cmd, *args, **kwargs)
logger.info("Command: %s", " ".join(cmds))
working_folder = self.working_dir if self.working_dir else None
environ_vars = {}
if self.is_env_vars_included:
environ_vars = os.environ.copy()
2023-01-27 10:19:03 +00:00
proc = subprocess.Popen(
cmds, stdout=stdout, stderr=stderr, cwd=working_folder, env=environ_vars
)
if not synchronous:
return None, None, None
2023-01-27 10:19:03 +00:00
out, err = proc.communicate()
ret_code = proc.returncode
logger.info("output: %s", out)
if ret_code == 0:
self.read_state_file()
else:
logger.warning("error: %s", err)
self.temp_var_files.clean_up()
if capture_output is True:
out = out.decode()
err = err.decode()
else:
out = None
err = None
if ret_code and raise_on_error:
raise TerraformCommandError(ret_code, " ".join(cmds), out=out, err=err)
return ret_code, out, err
def output(
self, dir_or_plan: Optional[str] = None, *args, capture_output: bool = True, **kwargs
) -> Union[None, str, Dict[str, str], Dict[str, Dict[str, str]]]:
"""Refer https://www.terraform.io/docs/commands/output.html
Note that this method does not conform to the (ret_code, out, err) return
convention. To use the "output" command with the standard convention,
call "output_cmd" instead of "output".
:param args: Positional arguments. There is one optional positional
argument NAME; if supplied, the returned output text
will be the json for a single named output value.
:param kwargs: Named options, passed to the command. In addition,
'full_value': If True, and NAME is provided, then
the return value will be a dict with
"value', 'type', and 'sensitive'
properties.
:return: None, if an error occured
Output value as a string, if NAME is provided and full_value
is False or not provided
Output value as a dict with 'value', 'sensitive', and 'type' if
NAME is provided and full_value is True.
dict of named dicts each with 'value', 'sensitive', and 'type',
if NAME is not provided
"""
kwargs["json"] = IsFlagged
if capture_output is False:
raise ValueError("capture_output is required for this method")
global_opts = self._generate_default_general_options(dir_or_plan)
ret, out, _ = self.cmd(global_opts, "output", *args, **kwargs)
# ret, out, _ = self.output_cmd(global_opts, *args, **kwargs)
if ret:
return None
return json.loads(out.lstrip())
def read_state_file(self, file_path=None) -> None:
"""Read .tfstate file
:param file_path: relative path to working dir
:return: states file in dict type
"""
working_dir = self.working_dir or ""
file_path = file_path or self.state or ""
if not file_path:
backend_path = os.path.join(file_path, ".terraform", "terraform.tfstate")
if os.path.exists(os.path.join(working_dir, backend_path)):
file_path = backend_path
else:
file_path = os.path.join(file_path, "terraform.tfstate")
file_path = os.path.join(working_dir, file_path)
self.tfstate = Tfstate.load_file(file_path)
def set_workspace(self, workspace, *args, **kwargs) -> CommandOutput:
"""Set workspace
:param workspace: the desired workspace.
:return: status
"""
global_opts = self._generate_default_general_options(False)
return self.cmd(global_opts, "workspace", "select", workspace, *args, **kwargs)
def create_workspace(self, workspace, *args, **kwargs) -> CommandOutput:
"""Create workspace
:param workspace: the desired workspace.
:return: status
"""
global_opts = self._generate_default_general_options(False)
return self.cmd(global_opts, "workspace", "new", workspace, *args, **kwargs)
def delete_workspace(self, workspace, *args, **kwargs) -> CommandOutput:
"""Delete workspace
:param workspace: the desired workspace.
:return: status
"""
global_opts = self._generate_default_general_options(False)
return self.cmd(global_opts, "workspace", "delete", workspace, *args, **kwargs)
def show_workspace(self, **kwargs) -> CommandOutput:
"""Show workspace, this command does not need the [DIR] part
:return: workspace
"""
global_opts = self._generate_default_general_options(False)
return self.cmd(global_opts, "workspace", "show", **kwargs)
2022-08-05 16:09:17 +00:00
def list_workspace(self) -> List[str]:
"""List of workspaces
:return: workspaces
:example:
>>> tf = Terraform()
>>> tf.list_workspace()
['default', 'test']
"""
global_opts = self._generate_default_general_options(False)
return list(
filter(
lambda workspace: len(workspace) > 0,
map(
2023-01-27 09:47:01 +00:00
lambda workspace: workspace.strip('*').strip(),
2022-08-05 16:09:17 +00:00
(self.cmd(global_opts, "workspace", "list")[1] or '').split()
)
)
)
def _generate_default_args(self, dir_or_plan: Optional[str]) -> Sequence[str]:
if (version.parse(self.terraform_semantic_version) < version.parse("1.0.0") and dir_or_plan):
return [dir_or_plan]
elif (version.parse(self.terraform_semantic_version) >= version.parse("1.0.0") and dir_or_plan and os.path.isfile(f'{self.working_dir}/{dir_or_plan}')):
plan = dir_or_plan.split('/')[-1]
return [plan]
else:
return []
def _generate_default_general_options(self, dir_or_plan: Optional[str]) -> Dict[str, Any]:
if (version.parse(self.terraform_semantic_version) >= version.parse("1.0.0") and dir_or_plan):
if os.path.isdir(self.working_dir + '/' + dir_or_plan):
return {"chdir": dir_or_plan}
else:
plan_path = dir_or_plan.split('/')
dir_to_plan_path = "/".join(plan_path[:-1])
return {"chdir": dir_to_plan_path}
else:
return {}
def _generate_default_options(
self, input_options: Dict[str, Any]
) -> Dict[str, Any]:
return {
"state": self.state,
"target": self.targets,
"var": self.variables,
"var_file": self.var_file,
"parallelism": self.parallelism,
"no_color": IsFlagged,
"input": False,
**input_options,
}
def _generate_cmd_options(self, **kwargs) -> List[str]:
result = []
2022-08-05 16:09:17 +00:00
for option, value in kwargs.items():
if "_" in option:
option = option.replace("_", "-")
if isinstance(value, list):
for sub_v in value:
result += [f"-{option}={sub_v}"]
continue
if isinstance(value, dict):
if "backend-config" in option:
2023-01-27 10:19:03 +00:00
for backend_key, backend_value in value.items():
result += [f"-backend-config={backend_key}={backend_value}"]
continue
# since map type sent in string won't work, create temp var file for
# variables, and clean it up later
2023-01-27 10:19:03 +00:00
if option == "var":
# We do not create empty var-files if there is no var passed.
# An empty var-file would result in an error: An argument or block definition is required here
if value:
filename = self.temp_var_files.create(value)
result += [f"-var-file={filename}"]
continue
# simple flag,
if value is IsFlagged:
result += [f"-{option}"]
continue
if value is None or value is IsNotFlagged:
continue
if isinstance(value, bool):
value = "true" if value else "false"
result += [f"-{option}={value}"]
return result
2022-08-05 16:09:17 +00:00
def __exit__(self, exc_type, exc_value, traceback) -> None:
self.temp_var_files.clean_up()
2022-08-05 16:09:17 +00:00
def __getattr__(self, item: str) -> Callable:
def wrapper(*args, **kwargs):
cmd_name = str(item)
if cmd_name.endswith("_cmd"):
cmd_name = cmd_name[:-4]
logger.debug("called with %r and %r", args, kwargs)
global_opts = self._generate_default_general_options(False)
return self.cmd(global_opts, cmd_name, *args, **kwargs)
return wrapper
2022-08-05 16:09:17 +00:00
class VariableFiles:
2023-01-27 10:19:03 +00:00
"""Class representing a terraform var files"""
def __init__(self):
self.files = []
def create(self, variables: Dict[str, str]) -> str:
2023-01-27 10:19:03 +00:00
"""create var file in temp"""
with tempfile.NamedTemporaryFile(
"w+t", suffix=".tfvars.json", delete=False
) as temp:
logger.debug("%s is created", temp.name)
self.files.append(temp)
logger.debug("variables wrote to tempfile: %s", variables)
temp.write(json.dumps(variables))
file_name = temp.name
return file_name
def clean_up(self):
2023-01-27 10:19:03 +00:00
"""cleanup the var file"""
for fle in self.files:
os.unlink(fle.name)
self.files = []