From 30daf1324409b0f30ebec67eea0c67fbbf65c5cf Mon Sep 17 00:00:00 2001 From: bom Date: Thu, 27 Jan 2022 17:03:30 +0100 Subject: [PATCH] renamed python_terraform to dda_python_terraform --- dda_python_terraform/__init__.py | 8 + dda_python_terraform/terraform.py | 548 ++++++++++++++++++++++++++++++ dda_python_terraform/tfstate.py | 33 ++ 3 files changed, 589 insertions(+) create mode 100644 dda_python_terraform/__init__.py create mode 100644 dda_python_terraform/terraform.py create mode 100644 dda_python_terraform/tfstate.py diff --git a/dda_python_terraform/__init__.py b/dda_python_terraform/__init__.py new file mode 100644 index 0000000..c2e2c10 --- /dev/null +++ b/dda_python_terraform/__init__.py @@ -0,0 +1,8 @@ +from .terraform import ( + IsFlagged, + IsNotFlagged, + Terraform, + TerraformCommandError, + VariableFiles, +) +from .tfstate import Tfstate diff --git a/dda_python_terraform/terraform.py b/dda_python_terraform/terraform.py new file mode 100644 index 0000000..753f410 --- /dev/null +++ b/dda_python_terraform/terraform.py @@ -0,0 +1,548 @@ +import json +import logging +import os +import subprocess +import sys +import tempfile +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Type, Union + +from python_terraform.tfstate import Tfstate + +logger = logging.getLogger(__name__) + +COMMAND_WITH_SUBCOMMANDS = {"workspace"} + + +class TerraformFlag: + pass + + +class IsFlagged(TerraformFlag): + pass + + +class IsNotFlagged(TerraformFlag): + pass + + +CommandOutput = Tuple[Optional[int], Optional[str], Optional[str]] + + +class TerraformCommandError(subprocess.CalledProcessError): + def __init__(self, ret_code: int, cmd: str, out: Optional[str], err: Optional[str]): + super(TerraformCommandError, self).__init__(ret_code, cmd) + self.out = out + self.err = err + logger.error("Error with command %s. Reason: %s", self.cmd, self.err) + + +class Terraform: + """Wrapper of terraform command line tool. + + https://www.terraform.io/ + """ + + def __init__( + self, + working_dir: Optional[str] = None, + targets: Optional[Sequence[str]] = None, + state: Optional[str] = None, + variables: Optional[Dict[str, str]] = None, + parallelism: Optional[str] = None, + var_file: Optional[str] = None, + terraform_bin_path: Optional[str] = None, + is_env_vars_included: bool = True, + terraform_version: Optional[float] = 0.13 + ): + """ + :param working_dir: the folder of the working folder, if not given, + will be current working folder + :param targets: list of target + as default value of apply/destroy/plan command + :param state: path of state file relative to working folder, + as a default value of apply/destroy/plan command + :param variables: default variables for apply/destroy/plan command, + will be override by variable passing by apply/destroy/plan method + :param parallelism: default parallelism value for apply/destroy command + :param var_file: passed as value of -var-file option, + could be string or list, list stands for multiple -var-file option + :param terraform_bin_path: binary path of terraform + :type is_env_vars_included: bool + :param is_env_vars_included: included env variables when calling terraform cmd + """ + self.is_env_vars_included = is_env_vars_included + self.working_dir = working_dir + self.state = state + self.targets = [] if targets is None else targets + self.variables = dict() if variables is None else variables + self.parallelism = parallelism + self.terraform_bin_path = ( + terraform_bin_path if terraform_bin_path else "terraform" + ) + self.terraform_version = terraform_version + self.var_file = var_file + self.temp_var_files = VariableFiles() + + # store the tfstate data + self.tfstate = None + self.read_state_file(self.state) + + self.latest_cmd = '' + + def apply( + self, + dir_or_plan: Optional[str] = None, + input: bool = False, + skip_plan: bool = True, + no_color: Type[TerraformFlag] = IsFlagged, + **kwargs, + ) -> CommandOutput: + """Refer to https://terraform.io/docs/commands/apply.html + + no-color is flagged by default + :param no_color: disable color of stdout + :param input: disable prompt for a missing variable + :param dir_or_plan: folder relative to working folder + :param skip_plan: force apply without plan (default: false) + :param kwargs: same as kwags in method 'cmd' + :returns return_code, stdout, stderr + """ + if not skip_plan: + return self.plan(dir_or_plan=dir_or_plan, **kwargs) + global_opts = self._generate_default_general_options(dir_or_plan) + default = kwargs.copy() + default["input"] = input + default["no_color"] = no_color + default["auto-approve"] = True # a False value will require an input + option_dict = self._generate_default_options(default) + args = self._generate_default_args(dir_or_plan) + return self.cmd(global_opts, "apply", *args, **option_dict) + + def refresh( + self, + dir_or_plan: Optional[str] = None, + input: bool = False, + no_color: Type[TerraformFlag] = IsFlagged, + **kwargs, + ) -> CommandOutput: + """Refer to https://terraform.io/docs/commands/refresh.html + + no-color is flagged by default + :param no_color: disable color of stdout + :param input: disable prompt for a missing variable + :param dir_or_plan: folder relative to working folder + :param kwargs: same as kwags in method 'cmd' + :returns return_code, stdout, stderr + """ + global_opts = self._generate_default_general_options(dir_or_plan) + default = kwargs.copy() + default["input"] = input + default["no_color"] = no_color + option_dict = self._generate_default_options(default) + args = self._generate_default_args(dir_or_plan) + return self.cmd(global_opts, "refresh", *args, **option_dict) + + def destroy( + self, + dir_or_plan: Optional[str] = None, + force: Type[TerraformFlag] = IsFlagged, + **kwargs, + ) -> CommandOutput: + """Refer to https://www.terraform.io/docs/commands/destroy.html + + force/no-color option is flagged by default + :return: ret_code, stdout, stderr + """ + global_opts = self._generate_default_general_options(dir_or_plan) + default = kwargs.copy() + # force is no longer a flag in version >= 1.0 + if self.terraform_version < 1.0: + default["force"] = force + default["auto-approve"] = True + options = self._generate_default_options(default) + args = self._generate_default_args(dir_or_plan) + return self.cmd(global_opts, "destroy", *args, **options) + + def plan( + self, + dir_or_plan: Optional[str] = None, + detailed_exitcode: Type[TerraformFlag] = IsFlagged, + **kwargs, + ) -> CommandOutput: + """Refer to https://www.terraform.io/docs/commands/plan.html + + :param detailed_exitcode: Return a detailed exit code when the command exits. + :param dir_or_plan: relative path to plan/folder + :param kwargs: options + :return: ret_code, stdout, stderr + """ + global_opts = self._generate_default_general_options(dir_or_plan) + options = kwargs.copy() + options["detailed_exitcode"] = detailed_exitcode + options = self._generate_default_options(options) + args = self._generate_default_args(dir_or_plan) + return self.cmd(global_opts, "plan", *args, **options) + + def init( + self, + dir_or_plan: Optional[str] = None, + backend_config: Optional[Dict[str, str]] = None, + reconfigure: Type[TerraformFlag] = IsFlagged, + backend: bool = True, + **kwargs, + ) -> CommandOutput: + """Refer to https://www.terraform.io/docs/commands/init.html + + By default, this assumes you want to use backend config, and tries to + init fresh. The flags -reconfigure and -backend=true are default. + + :param dir_or_plan: relative path to the folder want to init + :param backend_config: a dictionary of backend config options. eg. + t = Terraform() + t.init(backend_config={'access_key': 'myaccesskey', + 'secret_key': 'mysecretkey', 'bucket': 'mybucketname'}) + :param reconfigure: whether or not to force reconfiguration of backend + :param backend: whether or not to use backend settings for init + :param kwargs: options + :return: ret_code, stdout, stderr + """ + options = kwargs.copy() + options.update( + { + "backend_config": backend_config, + "reconfigure": reconfigure, + "backend": backend, + } + ) + options = self._generate_default_options(options) + global_opts = self._generate_default_general_options(dir_or_plan) + args = self._generate_default_args(dir_or_plan) + return self.cmd(global_opts, "init", *args, **options) + + def generate_cmd_string(self, global_options: Dict[str, Any], cmd: str, *args, **kwargs) -> List[str]: + """For any generate_cmd_string doesn't written as public method of Terraform + + examples: + 1. call import command, + ref to https://www.terraform.io/docs/commands/import.html + --> generate_cmd_string call: + terraform import -input=true aws_instance.foo i-abcd1234 + --> python call: + tf.generate_cmd_string('import', 'aws_instance.foo', 'i-abcd1234', input=True) + + 2. call apply command, + --> generate_cmd_string call: + terraform apply -var='a=b' -var='c=d' -no-color the_folder + --> python call: + tf.generate_cmd_string('apply', the_folder, no_color=IsFlagged, var={'a':'b', 'c':'d'}) + + :param cmd: command and sub-command of terraform, seperated with space + refer to https://www.terraform.io/docs/commands/index.html + :param args: arguments of a command + :param kwargs: same as kwags in method 'cmd' + :return: string of valid terraform command + """ + cmds = [self.terraform_bin_path] + cmds += self._generate_cmd_options(**global_options) + cmds += cmd.split() + if cmd in COMMAND_WITH_SUBCOMMANDS: + args = list(args) + subcommand = args.pop(0) + cmds.append(subcommand) + + cmds += self._generate_cmd_options(**kwargs) + + cmds += args + self.latest_cmd = ' '.join(cmds) + return cmds + + def cmd( + self, + global_opts: Dict[str, Any], + cmd: str, + *args, + capture_output: Union[bool, str] = True, + raise_on_error: bool = True, + synchronous: bool = True, + **kwargs, + ) -> CommandOutput: + """Run a terraform command, if success, will try to read state file + + :param cmd: command and sub-command of terraform, seperated with space + refer to https://www.terraform.io/docs/commands/index.html + :param args: arguments of a command + :param kwargs: any option flag with key value without prefixed dash character + if there's a dash in the option name, use under line instead of dash, + ex. -no-color --> no_color + if it's a simple flag with no value, value should be IsFlagged + ex. cmd('taint', allow_missing=IsFlagged) + if it's a boolean value flag, assign True or false + if it's a flag could be used multiple times, assign list to it's value + if it's a "var" variable flag, assign dictionary to it + if a value is None, will skip this option + if the option 'capture_output' is passed (with any value other than + True), terraform output will be printed to stdout/stderr and + "None" will be returned as out and err. + if the option 'raise_on_error' is passed (with any value that evaluates to True), + and the terraform command returns a nonzerop return code, then + a TerraformCommandError exception will be raised. The exception object will + have the following properties: + returncode: The command's return code + out: The captured stdout, or None if not captured + err: The captured stderr, or None if not captured + :return: ret_code, out, err + """ + if capture_output is True: + stderr = subprocess.PIPE + stdout = subprocess.PIPE + elif capture_output == "framework": + stderr = None + stdout = None + else: + stderr = sys.stderr + stdout = sys.stdout + + cmds = self.generate_cmd_string(global_opts, cmd, *args, **kwargs) + logger.info("Command: %s", " ".join(cmds)) + + working_folder = self.working_dir if self.working_dir else None + + environ_vars = {} + if self.is_env_vars_included: + environ_vars = os.environ.copy() + + p = subprocess.Popen( + cmds, stdout=stdout, stderr=stderr, cwd=working_folder, env=environ_vars + ) + + if not synchronous: + return None, None, None + + out, err = p.communicate() + ret_code = p.returncode + logger.info("output: %s", out) + + if ret_code == 0: + self.read_state_file() + else: + logger.warning("error: %s", err) + + self.temp_var_files.clean_up() + if capture_output is True: + out = out.decode() + err = err.decode() + else: + out = None + err = None + + if ret_code and raise_on_error: + raise TerraformCommandError(ret_code, " ".join(cmds), out=out, err=err) + + return ret_code, out, err + + def output( + self, dir_or_plan: Optional[str] = None, *args, capture_output: bool = True, **kwargs + ) -> Union[None, str, Dict[str, str], Dict[str, Dict[str, str]]]: + """Refer https://www.terraform.io/docs/commands/output.html + + Note that this method does not conform to the (ret_code, out, err) return + convention. To use the "output" command with the standard convention, + call "output_cmd" instead of "output". + + :param args: Positional arguments. There is one optional positional + argument NAME; if supplied, the returned output text + will be the json for a single named output value. + :param kwargs: Named options, passed to the command. In addition, + 'full_value': If True, and NAME is provided, then + the return value will be a dict with + "value', 'type', and 'sensitive' + properties. + :return: None, if an error occured + Output value as a string, if NAME is provided and full_value + is False or not provided + Output value as a dict with 'value', 'sensitive', and 'type' if + NAME is provided and full_value is True. + dict of named dicts each with 'value', 'sensitive', and 'type', + if NAME is not provided + """ + kwargs["json"] = IsFlagged + if capture_output is False: + raise ValueError("capture_output is required for this method") + + global_opts = self._generate_default_general_options(dir_or_plan) + ret, out, _ = self.cmd(global_opts, "output", *args, **kwargs) + + # ret, out, _ = self.output_cmd(global_opts, *args, **kwargs) + + if ret: + return None + + return json.loads(out.lstrip()) + + def read_state_file(self, file_path=None) -> None: + """Read .tfstate file + + :param file_path: relative path to working dir + :return: states file in dict type + """ + + working_dir = self.working_dir or "" + + file_path = file_path or self.state or "" + + if not file_path: + backend_path = os.path.join(file_path, ".terraform", "terraform.tfstate") + + if os.path.exists(os.path.join(working_dir, backend_path)): + file_path = backend_path + else: + file_path = os.path.join(file_path, "terraform.tfstate") + + file_path = os.path.join(working_dir, file_path) + + self.tfstate = Tfstate.load_file(file_path) + + def set_workspace(self, workspace, *args, **kwargs) -> CommandOutput: + """Set workspace + + :param workspace: the desired workspace. + :return: status + """ + global_opts = self._generate_default_general_options(False) + return self.cmd(global_opts, "workspace", "select", workspace, *args, **kwargs) + + def create_workspace(self, workspace, *args, **kwargs) -> CommandOutput: + """Create workspace + + :param workspace: the desired workspace. + :return: status + """ + global_opts = self._generate_default_general_options(False) + return self.cmd(global_opts, "workspace", "new", workspace, *args, **kwargs) + + def delete_workspace(self, workspace, *args, **kwargs) -> CommandOutput: + """Delete workspace + + :param workspace: the desired workspace. + :return: status + """ + global_opts = self._generate_default_general_options(False) + return self.cmd(global_opts, "workspace", "delete", workspace, *args, **kwargs) + + def show_workspace(self, **kwargs) -> CommandOutput: + """Show workspace, this command does not need the [DIR] part + + :return: workspace + """ + global_opts = self._generate_default_general_options(False) + return self.cmd(global_opts, "workspace", "show", **kwargs) + + def _generate_default_args(self, dir_or_plan: Optional[str]) -> Sequence[str]: + if (self.terraform_version < 1.0 and dir_or_plan): + return [dir_or_plan] + else: + return [] + + def _generate_default_general_options(self, dir_or_plan: Optional[str]) -> Dict[str, Any]: + if (self.terraform_version >= 1.0 and dir_or_plan): + return {"chdir": dir_or_plan} + else: + return {} + + def _generate_default_options( + self, input_options: Dict[str, Any] + ) -> Dict[str, Any]: + return { + "state": self.state, + "target": self.targets, + "var": self.variables, + "var_file": self.var_file, + "parallelism": self.parallelism, + "no_color": IsFlagged, + "input": False, + **input_options, + } + + def _generate_cmd_options(self, **kwargs) -> List[str]: + """ + """ + result = [] + + for option, value in kwargs.items(): + if "_" in option: + option = option.replace("_", "-") + + if isinstance(value, list): + for sub_v in value: + result += [f"-{option}={sub_v}"] + continue + + if isinstance(value, dict): + if "backend-config" in option: + for bk, bv in value.items(): + result += [f"-backend-config={bk}={bv}"] + continue + + # since map type sent in string won't work, create temp var file for + # variables, and clean it up later + elif option == "var": + # We do not create empty var-files if there is no var passed. + # An empty var-file would result in an error: An argument or block definition is required here + if value: + filename = self.temp_var_files.create(value) + result += [f"-var-file={filename}"] + + continue + + # simple flag, + if value is IsFlagged: + result += [f"-{option}"] + continue + + if value is None or value is IsNotFlagged: + continue + + if isinstance(value, bool): + value = "true" if value else "false" + + result += [f"-{option}={value}"] + return result + + def __exit__(self, exc_type, exc_value, traceback) -> None: + self.temp_var_files.clean_up() + + def __getattr__(self, item: str) -> Callable: + def wrapper(*args, **kwargs): + cmd_name = str(item) + if cmd_name.endswith("_cmd"): + cmd_name = cmd_name[:-4] + logger.debug("called with %r and %r", args, kwargs) + global_opts = self._generate_default_general_options(False) + return self.cmd(global_opts, cmd_name, *args, **kwargs) + + return wrapper + + + + +class VariableFiles: + def __init__(self): + self.files = [] + + def create(self, variables: Dict[str, str]) -> str: + with tempfile.NamedTemporaryFile( + "w+t", suffix=".tfvars.json", delete=False + ) as temp: + logger.debug("%s is created", temp.name) + self.files.append(temp) + logger.debug("variables wrote to tempfile: %s", variables) + temp.write(json.dumps(variables)) + file_name = temp.name + + return file_name + + def clean_up(self): + for f in self.files: + os.unlink(f.name) + + self.files = [] diff --git a/dda_python_terraform/tfstate.py b/dda_python_terraform/tfstate.py new file mode 100644 index 0000000..f39af0f --- /dev/null +++ b/dda_python_terraform/tfstate.py @@ -0,0 +1,33 @@ +import json +import logging +import os +from typing import Dict, Optional + +logger = logging.getLogger(__name__) + + +class Tfstate: + def __init__(self, data: Optional[Dict[str, str]] = None): + self.tfstate_file: Optional[str] = None + self.native_data = data + if data: + self.__dict__ = data + + @staticmethod + def load_file(file_path: str) -> "Tfstate": + """Read the tfstate file and load its contents. + + Parses then as JSON and put the result into the object. + """ + logger.debug("read data from %s", file_path) + if os.path.exists(file_path): + with open(file_path) as f: + json_data = json.load(f) + + tf_state = Tfstate(json_data) + tf_state.tfstate_file = file_path + return tf_state + + logger.debug("%s does not exist", file_path) + + return Tfstate()