renamed python_terraform to dda_python_terraform
This commit is contained in:
parent
fd431b229d
commit
30daf13244
3 changed files with 589 additions and 0 deletions
8
dda_python_terraform/__init__.py
Normal file
8
dda_python_terraform/__init__.py
Normal file
|
@ -0,0 +1,8 @@
|
|||
from .terraform import (
|
||||
IsFlagged,
|
||||
IsNotFlagged,
|
||||
Terraform,
|
||||
TerraformCommandError,
|
||||
VariableFiles,
|
||||
)
|
||||
from .tfstate import Tfstate
|
548
dda_python_terraform/terraform.py
Normal file
548
dda_python_terraform/terraform.py
Normal file
|
@ -0,0 +1,548 @@
|
|||
import json
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Type, Union
|
||||
|
||||
from python_terraform.tfstate import Tfstate
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
COMMAND_WITH_SUBCOMMANDS = {"workspace"}
|
||||
|
||||
|
||||
class TerraformFlag:
|
||||
pass
|
||||
|
||||
|
||||
class IsFlagged(TerraformFlag):
|
||||
pass
|
||||
|
||||
|
||||
class IsNotFlagged(TerraformFlag):
|
||||
pass
|
||||
|
||||
|
||||
CommandOutput = Tuple[Optional[int], Optional[str], Optional[str]]
|
||||
|
||||
|
||||
class TerraformCommandError(subprocess.CalledProcessError):
|
||||
def __init__(self, ret_code: int, cmd: str, out: Optional[str], err: Optional[str]):
|
||||
super(TerraformCommandError, self).__init__(ret_code, cmd)
|
||||
self.out = out
|
||||
self.err = err
|
||||
logger.error("Error with command %s. Reason: %s", self.cmd, self.err)
|
||||
|
||||
|
||||
class Terraform:
|
||||
"""Wrapper of terraform command line tool.
|
||||
|
||||
https://www.terraform.io/
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
working_dir: Optional[str] = None,
|
||||
targets: Optional[Sequence[str]] = None,
|
||||
state: Optional[str] = None,
|
||||
variables: Optional[Dict[str, str]] = None,
|
||||
parallelism: Optional[str] = None,
|
||||
var_file: Optional[str] = None,
|
||||
terraform_bin_path: Optional[str] = None,
|
||||
is_env_vars_included: bool = True,
|
||||
terraform_version: Optional[float] = 0.13
|
||||
):
|
||||
"""
|
||||
:param working_dir: the folder of the working folder, if not given,
|
||||
will be current working folder
|
||||
:param targets: list of target
|
||||
as default value of apply/destroy/plan command
|
||||
:param state: path of state file relative to working folder,
|
||||
as a default value of apply/destroy/plan command
|
||||
:param variables: default variables for apply/destroy/plan command,
|
||||
will be override by variable passing by apply/destroy/plan method
|
||||
:param parallelism: default parallelism value for apply/destroy command
|
||||
:param var_file: passed as value of -var-file option,
|
||||
could be string or list, list stands for multiple -var-file option
|
||||
:param terraform_bin_path: binary path of terraform
|
||||
:type is_env_vars_included: bool
|
||||
:param is_env_vars_included: included env variables when calling terraform cmd
|
||||
"""
|
||||
self.is_env_vars_included = is_env_vars_included
|
||||
self.working_dir = working_dir
|
||||
self.state = state
|
||||
self.targets = [] if targets is None else targets
|
||||
self.variables = dict() if variables is None else variables
|
||||
self.parallelism = parallelism
|
||||
self.terraform_bin_path = (
|
||||
terraform_bin_path if terraform_bin_path else "terraform"
|
||||
)
|
||||
self.terraform_version = terraform_version
|
||||
self.var_file = var_file
|
||||
self.temp_var_files = VariableFiles()
|
||||
|
||||
# store the tfstate data
|
||||
self.tfstate = None
|
||||
self.read_state_file(self.state)
|
||||
|
||||
self.latest_cmd = ''
|
||||
|
||||
def apply(
|
||||
self,
|
||||
dir_or_plan: Optional[str] = None,
|
||||
input: bool = False,
|
||||
skip_plan: bool = True,
|
||||
no_color: Type[TerraformFlag] = IsFlagged,
|
||||
**kwargs,
|
||||
) -> CommandOutput:
|
||||
"""Refer to https://terraform.io/docs/commands/apply.html
|
||||
|
||||
no-color is flagged by default
|
||||
:param no_color: disable color of stdout
|
||||
:param input: disable prompt for a missing variable
|
||||
:param dir_or_plan: folder relative to working folder
|
||||
:param skip_plan: force apply without plan (default: false)
|
||||
:param kwargs: same as kwags in method 'cmd'
|
||||
:returns return_code, stdout, stderr
|
||||
"""
|
||||
if not skip_plan:
|
||||
return self.plan(dir_or_plan=dir_or_plan, **kwargs)
|
||||
global_opts = self._generate_default_general_options(dir_or_plan)
|
||||
default = kwargs.copy()
|
||||
default["input"] = input
|
||||
default["no_color"] = no_color
|
||||
default["auto-approve"] = True # a False value will require an input
|
||||
option_dict = self._generate_default_options(default)
|
||||
args = self._generate_default_args(dir_or_plan)
|
||||
return self.cmd(global_opts, "apply", *args, **option_dict)
|
||||
|
||||
def refresh(
|
||||
self,
|
||||
dir_or_plan: Optional[str] = None,
|
||||
input: bool = False,
|
||||
no_color: Type[TerraformFlag] = IsFlagged,
|
||||
**kwargs,
|
||||
) -> CommandOutput:
|
||||
"""Refer to https://terraform.io/docs/commands/refresh.html
|
||||
|
||||
no-color is flagged by default
|
||||
:param no_color: disable color of stdout
|
||||
:param input: disable prompt for a missing variable
|
||||
:param dir_or_plan: folder relative to working folder
|
||||
:param kwargs: same as kwags in method 'cmd'
|
||||
:returns return_code, stdout, stderr
|
||||
"""
|
||||
global_opts = self._generate_default_general_options(dir_or_plan)
|
||||
default = kwargs.copy()
|
||||
default["input"] = input
|
||||
default["no_color"] = no_color
|
||||
option_dict = self._generate_default_options(default)
|
||||
args = self._generate_default_args(dir_or_plan)
|
||||
return self.cmd(global_opts, "refresh", *args, **option_dict)
|
||||
|
||||
def destroy(
|
||||
self,
|
||||
dir_or_plan: Optional[str] = None,
|
||||
force: Type[TerraformFlag] = IsFlagged,
|
||||
**kwargs,
|
||||
) -> CommandOutput:
|
||||
"""Refer to https://www.terraform.io/docs/commands/destroy.html
|
||||
|
||||
force/no-color option is flagged by default
|
||||
:return: ret_code, stdout, stderr
|
||||
"""
|
||||
global_opts = self._generate_default_general_options(dir_or_plan)
|
||||
default = kwargs.copy()
|
||||
# force is no longer a flag in version >= 1.0
|
||||
if self.terraform_version < 1.0:
|
||||
default["force"] = force
|
||||
default["auto-approve"] = True
|
||||
options = self._generate_default_options(default)
|
||||
args = self._generate_default_args(dir_or_plan)
|
||||
return self.cmd(global_opts, "destroy", *args, **options)
|
||||
|
||||
def plan(
|
||||
self,
|
||||
dir_or_plan: Optional[str] = None,
|
||||
detailed_exitcode: Type[TerraformFlag] = IsFlagged,
|
||||
**kwargs,
|
||||
) -> CommandOutput:
|
||||
"""Refer to https://www.terraform.io/docs/commands/plan.html
|
||||
|
||||
:param detailed_exitcode: Return a detailed exit code when the command exits.
|
||||
:param dir_or_plan: relative path to plan/folder
|
||||
:param kwargs: options
|
||||
:return: ret_code, stdout, stderr
|
||||
"""
|
||||
global_opts = self._generate_default_general_options(dir_or_plan)
|
||||
options = kwargs.copy()
|
||||
options["detailed_exitcode"] = detailed_exitcode
|
||||
options = self._generate_default_options(options)
|
||||
args = self._generate_default_args(dir_or_plan)
|
||||
return self.cmd(global_opts, "plan", *args, **options)
|
||||
|
||||
def init(
|
||||
self,
|
||||
dir_or_plan: Optional[str] = None,
|
||||
backend_config: Optional[Dict[str, str]] = None,
|
||||
reconfigure: Type[TerraformFlag] = IsFlagged,
|
||||
backend: bool = True,
|
||||
**kwargs,
|
||||
) -> CommandOutput:
|
||||
"""Refer to https://www.terraform.io/docs/commands/init.html
|
||||
|
||||
By default, this assumes you want to use backend config, and tries to
|
||||
init fresh. The flags -reconfigure and -backend=true are default.
|
||||
|
||||
:param dir_or_plan: relative path to the folder want to init
|
||||
:param backend_config: a dictionary of backend config options. eg.
|
||||
t = Terraform()
|
||||
t.init(backend_config={'access_key': 'myaccesskey',
|
||||
'secret_key': 'mysecretkey', 'bucket': 'mybucketname'})
|
||||
:param reconfigure: whether or not to force reconfiguration of backend
|
||||
:param backend: whether or not to use backend settings for init
|
||||
:param kwargs: options
|
||||
:return: ret_code, stdout, stderr
|
||||
"""
|
||||
options = kwargs.copy()
|
||||
options.update(
|
||||
{
|
||||
"backend_config": backend_config,
|
||||
"reconfigure": reconfigure,
|
||||
"backend": backend,
|
||||
}
|
||||
)
|
||||
options = self._generate_default_options(options)
|
||||
global_opts = self._generate_default_general_options(dir_or_plan)
|
||||
args = self._generate_default_args(dir_or_plan)
|
||||
return self.cmd(global_opts, "init", *args, **options)
|
||||
|
||||
def generate_cmd_string(self, global_options: Dict[str, Any], cmd: str, *args, **kwargs) -> List[str]:
|
||||
"""For any generate_cmd_string doesn't written as public method of Terraform
|
||||
|
||||
examples:
|
||||
1. call import command,
|
||||
ref to https://www.terraform.io/docs/commands/import.html
|
||||
--> generate_cmd_string call:
|
||||
terraform import -input=true aws_instance.foo i-abcd1234
|
||||
--> python call:
|
||||
tf.generate_cmd_string('import', 'aws_instance.foo', 'i-abcd1234', input=True)
|
||||
|
||||
2. call apply command,
|
||||
--> generate_cmd_string call:
|
||||
terraform apply -var='a=b' -var='c=d' -no-color the_folder
|
||||
--> python call:
|
||||
tf.generate_cmd_string('apply', the_folder, no_color=IsFlagged, var={'a':'b', 'c':'d'})
|
||||
|
||||
:param cmd: command and sub-command of terraform, seperated with space
|
||||
refer to https://www.terraform.io/docs/commands/index.html
|
||||
:param args: arguments of a command
|
||||
:param kwargs: same as kwags in method 'cmd'
|
||||
:return: string of valid terraform command
|
||||
"""
|
||||
cmds = [self.terraform_bin_path]
|
||||
cmds += self._generate_cmd_options(**global_options)
|
||||
cmds += cmd.split()
|
||||
if cmd in COMMAND_WITH_SUBCOMMANDS:
|
||||
args = list(args)
|
||||
subcommand = args.pop(0)
|
||||
cmds.append(subcommand)
|
||||
|
||||
cmds += self._generate_cmd_options(**kwargs)
|
||||
|
||||
cmds += args
|
||||
self.latest_cmd = ' '.join(cmds)
|
||||
return cmds
|
||||
|
||||
def cmd(
|
||||
self,
|
||||
global_opts: Dict[str, Any],
|
||||
cmd: str,
|
||||
*args,
|
||||
capture_output: Union[bool, str] = True,
|
||||
raise_on_error: bool = True,
|
||||
synchronous: bool = True,
|
||||
**kwargs,
|
||||
) -> CommandOutput:
|
||||
"""Run a terraform command, if success, will try to read state file
|
||||
|
||||
:param cmd: command and sub-command of terraform, seperated with space
|
||||
refer to https://www.terraform.io/docs/commands/index.html
|
||||
:param args: arguments of a command
|
||||
:param kwargs: any option flag with key value without prefixed dash character
|
||||
if there's a dash in the option name, use under line instead of dash,
|
||||
ex. -no-color --> no_color
|
||||
if it's a simple flag with no value, value should be IsFlagged
|
||||
ex. cmd('taint', allow_missing=IsFlagged)
|
||||
if it's a boolean value flag, assign True or false
|
||||
if it's a flag could be used multiple times, assign list to it's value
|
||||
if it's a "var" variable flag, assign dictionary to it
|
||||
if a value is None, will skip this option
|
||||
if the option 'capture_output' is passed (with any value other than
|
||||
True), terraform output will be printed to stdout/stderr and
|
||||
"None" will be returned as out and err.
|
||||
if the option 'raise_on_error' is passed (with any value that evaluates to True),
|
||||
and the terraform command returns a nonzerop return code, then
|
||||
a TerraformCommandError exception will be raised. The exception object will
|
||||
have the following properties:
|
||||
returncode: The command's return code
|
||||
out: The captured stdout, or None if not captured
|
||||
err: The captured stderr, or None if not captured
|
||||
:return: ret_code, out, err
|
||||
"""
|
||||
if capture_output is True:
|
||||
stderr = subprocess.PIPE
|
||||
stdout = subprocess.PIPE
|
||||
elif capture_output == "framework":
|
||||
stderr = None
|
||||
stdout = None
|
||||
else:
|
||||
stderr = sys.stderr
|
||||
stdout = sys.stdout
|
||||
|
||||
cmds = self.generate_cmd_string(global_opts, cmd, *args, **kwargs)
|
||||
logger.info("Command: %s", " ".join(cmds))
|
||||
|
||||
working_folder = self.working_dir if self.working_dir else None
|
||||
|
||||
environ_vars = {}
|
||||
if self.is_env_vars_included:
|
||||
environ_vars = os.environ.copy()
|
||||
|
||||
p = subprocess.Popen(
|
||||
cmds, stdout=stdout, stderr=stderr, cwd=working_folder, env=environ_vars
|
||||
)
|
||||
|
||||
if not synchronous:
|
||||
return None, None, None
|
||||
|
||||
out, err = p.communicate()
|
||||
ret_code = p.returncode
|
||||
logger.info("output: %s", out)
|
||||
|
||||
if ret_code == 0:
|
||||
self.read_state_file()
|
||||
else:
|
||||
logger.warning("error: %s", err)
|
||||
|
||||
self.temp_var_files.clean_up()
|
||||
if capture_output is True:
|
||||
out = out.decode()
|
||||
err = err.decode()
|
||||
else:
|
||||
out = None
|
||||
err = None
|
||||
|
||||
if ret_code and raise_on_error:
|
||||
raise TerraformCommandError(ret_code, " ".join(cmds), out=out, err=err)
|
||||
|
||||
return ret_code, out, err
|
||||
|
||||
def output(
|
||||
self, dir_or_plan: Optional[str] = None, *args, capture_output: bool = True, **kwargs
|
||||
) -> Union[None, str, Dict[str, str], Dict[str, Dict[str, str]]]:
|
||||
"""Refer https://www.terraform.io/docs/commands/output.html
|
||||
|
||||
Note that this method does not conform to the (ret_code, out, err) return
|
||||
convention. To use the "output" command with the standard convention,
|
||||
call "output_cmd" instead of "output".
|
||||
|
||||
:param args: Positional arguments. There is one optional positional
|
||||
argument NAME; if supplied, the returned output text
|
||||
will be the json for a single named output value.
|
||||
:param kwargs: Named options, passed to the command. In addition,
|
||||
'full_value': If True, and NAME is provided, then
|
||||
the return value will be a dict with
|
||||
"value', 'type', and 'sensitive'
|
||||
properties.
|
||||
:return: None, if an error occured
|
||||
Output value as a string, if NAME is provided and full_value
|
||||
is False or not provided
|
||||
Output value as a dict with 'value', 'sensitive', and 'type' if
|
||||
NAME is provided and full_value is True.
|
||||
dict of named dicts each with 'value', 'sensitive', and 'type',
|
||||
if NAME is not provided
|
||||
"""
|
||||
kwargs["json"] = IsFlagged
|
||||
if capture_output is False:
|
||||
raise ValueError("capture_output is required for this method")
|
||||
|
||||
global_opts = self._generate_default_general_options(dir_or_plan)
|
||||
ret, out, _ = self.cmd(global_opts, "output", *args, **kwargs)
|
||||
|
||||
# ret, out, _ = self.output_cmd(global_opts, *args, **kwargs)
|
||||
|
||||
if ret:
|
||||
return None
|
||||
|
||||
return json.loads(out.lstrip())
|
||||
|
||||
def read_state_file(self, file_path=None) -> None:
|
||||
"""Read .tfstate file
|
||||
|
||||
:param file_path: relative path to working dir
|
||||
:return: states file in dict type
|
||||
"""
|
||||
|
||||
working_dir = self.working_dir or ""
|
||||
|
||||
file_path = file_path or self.state or ""
|
||||
|
||||
if not file_path:
|
||||
backend_path = os.path.join(file_path, ".terraform", "terraform.tfstate")
|
||||
|
||||
if os.path.exists(os.path.join(working_dir, backend_path)):
|
||||
file_path = backend_path
|
||||
else:
|
||||
file_path = os.path.join(file_path, "terraform.tfstate")
|
||||
|
||||
file_path = os.path.join(working_dir, file_path)
|
||||
|
||||
self.tfstate = Tfstate.load_file(file_path)
|
||||
|
||||
def set_workspace(self, workspace, *args, **kwargs) -> CommandOutput:
|
||||
"""Set workspace
|
||||
|
||||
:param workspace: the desired workspace.
|
||||
:return: status
|
||||
"""
|
||||
global_opts = self._generate_default_general_options(False)
|
||||
return self.cmd(global_opts, "workspace", "select", workspace, *args, **kwargs)
|
||||
|
||||
def create_workspace(self, workspace, *args, **kwargs) -> CommandOutput:
|
||||
"""Create workspace
|
||||
|
||||
:param workspace: the desired workspace.
|
||||
:return: status
|
||||
"""
|
||||
global_opts = self._generate_default_general_options(False)
|
||||
return self.cmd(global_opts, "workspace", "new", workspace, *args, **kwargs)
|
||||
|
||||
def delete_workspace(self, workspace, *args, **kwargs) -> CommandOutput:
|
||||
"""Delete workspace
|
||||
|
||||
:param workspace: the desired workspace.
|
||||
:return: status
|
||||
"""
|
||||
global_opts = self._generate_default_general_options(False)
|
||||
return self.cmd(global_opts, "workspace", "delete", workspace, *args, **kwargs)
|
||||
|
||||
def show_workspace(self, **kwargs) -> CommandOutput:
|
||||
"""Show workspace, this command does not need the [DIR] part
|
||||
|
||||
:return: workspace
|
||||
"""
|
||||
global_opts = self._generate_default_general_options(False)
|
||||
return self.cmd(global_opts, "workspace", "show", **kwargs)
|
||||
|
||||
def _generate_default_args(self, dir_or_plan: Optional[str]) -> Sequence[str]:
|
||||
if (self.terraform_version < 1.0 and dir_or_plan):
|
||||
return [dir_or_plan]
|
||||
else:
|
||||
return []
|
||||
|
||||
def _generate_default_general_options(self, dir_or_plan: Optional[str]) -> Dict[str, Any]:
|
||||
if (self.terraform_version >= 1.0 and dir_or_plan):
|
||||
return {"chdir": dir_or_plan}
|
||||
else:
|
||||
return {}
|
||||
|
||||
def _generate_default_options(
|
||||
self, input_options: Dict[str, Any]
|
||||
) -> Dict[str, Any]:
|
||||
return {
|
||||
"state": self.state,
|
||||
"target": self.targets,
|
||||
"var": self.variables,
|
||||
"var_file": self.var_file,
|
||||
"parallelism": self.parallelism,
|
||||
"no_color": IsFlagged,
|
||||
"input": False,
|
||||
**input_options,
|
||||
}
|
||||
|
||||
def _generate_cmd_options(self, **kwargs) -> List[str]:
|
||||
"""
|
||||
"""
|
||||
result = []
|
||||
|
||||
for option, value in kwargs.items():
|
||||
if "_" in option:
|
||||
option = option.replace("_", "-")
|
||||
|
||||
if isinstance(value, list):
|
||||
for sub_v in value:
|
||||
result += [f"-{option}={sub_v}"]
|
||||
continue
|
||||
|
||||
if isinstance(value, dict):
|
||||
if "backend-config" in option:
|
||||
for bk, bv in value.items():
|
||||
result += [f"-backend-config={bk}={bv}"]
|
||||
continue
|
||||
|
||||
# since map type sent in string won't work, create temp var file for
|
||||
# variables, and clean it up later
|
||||
elif option == "var":
|
||||
# We do not create empty var-files if there is no var passed.
|
||||
# An empty var-file would result in an error: An argument or block definition is required here
|
||||
if value:
|
||||
filename = self.temp_var_files.create(value)
|
||||
result += [f"-var-file={filename}"]
|
||||
|
||||
continue
|
||||
|
||||
# simple flag,
|
||||
if value is IsFlagged:
|
||||
result += [f"-{option}"]
|
||||
continue
|
||||
|
||||
if value is None or value is IsNotFlagged:
|
||||
continue
|
||||
|
||||
if isinstance(value, bool):
|
||||
value = "true" if value else "false"
|
||||
|
||||
result += [f"-{option}={value}"]
|
||||
return result
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback) -> None:
|
||||
self.temp_var_files.clean_up()
|
||||
|
||||
def __getattr__(self, item: str) -> Callable:
|
||||
def wrapper(*args, **kwargs):
|
||||
cmd_name = str(item)
|
||||
if cmd_name.endswith("_cmd"):
|
||||
cmd_name = cmd_name[:-4]
|
||||
logger.debug("called with %r and %r", args, kwargs)
|
||||
global_opts = self._generate_default_general_options(False)
|
||||
return self.cmd(global_opts, cmd_name, *args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
|
||||
|
||||
class VariableFiles:
|
||||
def __init__(self):
|
||||
self.files = []
|
||||
|
||||
def create(self, variables: Dict[str, str]) -> str:
|
||||
with tempfile.NamedTemporaryFile(
|
||||
"w+t", suffix=".tfvars.json", delete=False
|
||||
) as temp:
|
||||
logger.debug("%s is created", temp.name)
|
||||
self.files.append(temp)
|
||||
logger.debug("variables wrote to tempfile: %s", variables)
|
||||
temp.write(json.dumps(variables))
|
||||
file_name = temp.name
|
||||
|
||||
return file_name
|
||||
|
||||
def clean_up(self):
|
||||
for f in self.files:
|
||||
os.unlink(f.name)
|
||||
|
||||
self.files = []
|
33
dda_python_terraform/tfstate.py
Normal file
33
dda_python_terraform/tfstate.py
Normal file
|
@ -0,0 +1,33 @@
|
|||
import json
|
||||
import logging
|
||||
import os
|
||||
from typing import Dict, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Tfstate:
|
||||
def __init__(self, data: Optional[Dict[str, str]] = None):
|
||||
self.tfstate_file: Optional[str] = None
|
||||
self.native_data = data
|
||||
if data:
|
||||
self.__dict__ = data
|
||||
|
||||
@staticmethod
|
||||
def load_file(file_path: str) -> "Tfstate":
|
||||
"""Read the tfstate file and load its contents.
|
||||
|
||||
Parses then as JSON and put the result into the object.
|
||||
"""
|
||||
logger.debug("read data from %s", file_path)
|
||||
if os.path.exists(file_path):
|
||||
with open(file_path) as f:
|
||||
json_data = json.load(f)
|
||||
|
||||
tf_state = Tfstate(json_data)
|
||||
tf_state.tfstate_file = file_path
|
||||
return tf_state
|
||||
|
||||
logger.debug("%s does not exist", file_path)
|
||||
|
||||
return Tfstate()
|
Loading…
Reference in a new issue