Support Terraform 0.14
This commit is contained in:
parent
785e9a3ce7
commit
7080f38a93
3 changed files with 135 additions and 150 deletions
|
@ -4,7 +4,7 @@ import os
|
|||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from typing import Callable, Dict, List, Optional, Sequence, Tuple, Type, TypeVar, Union
|
||||
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Type, Union
|
||||
|
||||
from python_terraform.tfstate import Tfstate
|
||||
|
||||
|
@ -33,6 +33,7 @@ class TerraformCommandError(subprocess.CalledProcessError):
|
|||
super(TerraformCommandError, self).__init__(ret_code, cmd)
|
||||
self.out = out
|
||||
self.err = err
|
||||
logger.error("Error with command %s. Reason: %s", self.cmd, self.err)
|
||||
|
||||
|
||||
class Terraform:
|
||||
|
@ -46,7 +47,7 @@ class Terraform:
|
|||
working_dir: Optional[str] = None,
|
||||
targets: Optional[Sequence[str]] = None,
|
||||
state: Optional[str] = None,
|
||||
variables: Optional[Sequence[str]] = None,
|
||||
variables: Optional[Dict[str, str]] = None,
|
||||
parallelism: Optional[str] = None,
|
||||
var_file: Optional[str] = None,
|
||||
terraform_bin_path: Optional[str] = None,
|
||||
|
@ -114,28 +115,30 @@ class Terraform:
|
|||
"""
|
||||
if not skip_plan:
|
||||
return self.plan(dir_or_plan=dir_or_plan, **kwargs)
|
||||
default = kwargs
|
||||
default = kwargs.copy()
|
||||
default["input"] = input
|
||||
default["no_color"] = no_color
|
||||
default["auto-approve"] = True
|
||||
default["auto-approve"] = True # a False value will require an input
|
||||
option_dict = self._generate_default_options(default)
|
||||
args = self._generate_default_args(dir_or_plan)
|
||||
return self.cmd("apply", *args, **option_dict)
|
||||
|
||||
def _generate_default_args(self, dir_or_plan) -> Sequence[str]:
|
||||
def _generate_default_args(self, dir_or_plan: Optional[str]) -> Sequence[str]:
|
||||
return [dir_or_plan] if dir_or_plan else []
|
||||
|
||||
def _generate_default_options(self, input_options):
|
||||
option_dict = dict()
|
||||
option_dict["state"] = self.state
|
||||
option_dict["target"] = self.targets
|
||||
option_dict["var"] = self.variables
|
||||
option_dict["var_file"] = self.var_file
|
||||
option_dict["parallelism"] = self.parallelism
|
||||
option_dict["no_color"] = IsFlagged
|
||||
option_dict["input"] = False
|
||||
option_dict.update(input_options)
|
||||
return option_dict
|
||||
def _generate_default_options(
|
||||
self, input_options: Dict[str, Any]
|
||||
) -> Dict[str, Any]:
|
||||
return {
|
||||
"state": self.state,
|
||||
"target": self.targets,
|
||||
"var": self.variables,
|
||||
"var_file": self.var_file,
|
||||
"parallelism": self.parallelism,
|
||||
"no_color": IsFlagged,
|
||||
"input": False,
|
||||
**input_options,
|
||||
}
|
||||
|
||||
def destroy(
|
||||
self,
|
||||
|
@ -148,7 +151,7 @@ class Terraform:
|
|||
force/no-color option is flagged by default
|
||||
:return: ret_code, stdout, stderr
|
||||
"""
|
||||
default = kwargs
|
||||
default = kwargs.copy()
|
||||
default["force"] = force
|
||||
options = self._generate_default_options(default)
|
||||
args = self._generate_default_args(dir_or_plan)
|
||||
|
@ -167,7 +170,7 @@ class Terraform:
|
|||
:param kwargs: options
|
||||
:return: ret_code, stdout, stderr
|
||||
"""
|
||||
options = kwargs
|
||||
options = kwargs.copy()
|
||||
options["detailed_exitcode"] = detailed_exitcode
|
||||
options = self._generate_default_options(options)
|
||||
args = self._generate_default_args(dir_or_plan)
|
||||
|
@ -196,10 +199,14 @@ class Terraform:
|
|||
:param kwargs: options
|
||||
:return: ret_code, stdout, stderr
|
||||
"""
|
||||
options = kwargs
|
||||
options["backend_config"] = backend_config
|
||||
options["reconfigure"] = reconfigure
|
||||
options["backend"] = backend
|
||||
options = kwargs.copy()
|
||||
options.update(
|
||||
{
|
||||
"backend_config": backend_config,
|
||||
"reconfigure": reconfigure,
|
||||
"backend": backend,
|
||||
}
|
||||
)
|
||||
options = self._generate_default_options(options)
|
||||
args = self._generate_default_args(dir_or_plan)
|
||||
return self.cmd("init", *args, **options)
|
||||
|
@ -281,7 +288,7 @@ class Terraform:
|
|||
cmd: str,
|
||||
*args,
|
||||
capture_output: Union[bool, str] = True,
|
||||
raise_on_error: bool = False,
|
||||
raise_on_error: bool = True,
|
||||
synchronous: bool = True,
|
||||
**kwargs,
|
||||
) -> CommandOutput:
|
||||
|
@ -322,7 +329,7 @@ class Terraform:
|
|||
stdout = sys.stdout
|
||||
|
||||
cmds = self.generate_cmd_string(cmd, *args, **kwargs)
|
||||
logger.debug("Command: %s", " ".join(cmds))
|
||||
logger.info("Command: %s", " ".join(cmds))
|
||||
|
||||
working_folder = self.working_dir if self.working_dir else None
|
||||
|
||||
|
@ -339,7 +346,7 @@ class Terraform:
|
|||
|
||||
out, err = p.communicate()
|
||||
ret_code = p.returncode
|
||||
logger.debug("output: %s", out)
|
||||
logger.info("output: %s", out)
|
||||
|
||||
if ret_code == 0:
|
||||
self.read_state_file()
|
||||
|
|
|
@ -28,6 +28,6 @@ class Tfstate:
|
|||
tf_state.tfstate_file = file_path
|
||||
return tf_state
|
||||
|
||||
logger.debug("%s is not exist", file_path)
|
||||
logger.debug("%s does not exist", file_path)
|
||||
|
||||
return Tfstate()
|
||||
|
|
|
@ -8,6 +8,7 @@ from io import StringIO
|
|||
from typing import Callable
|
||||
|
||||
import pytest
|
||||
from _pytest.logging import LogCaptureFixture, caplog
|
||||
|
||||
from python_terraform import IsFlagged, IsNotFlagged, Terraform, TerraformCommandError
|
||||
|
||||
|
@ -42,7 +43,11 @@ CMD_CASES = [
|
|||
[
|
||||
[
|
||||
lambda x: x.cmd(
|
||||
"plan", "var_to_output", no_color=IsFlagged, var={"test_var": "test"}
|
||||
"plan",
|
||||
"var_to_output",
|
||||
no_color=IsFlagged,
|
||||
var={"test_var": "test"},
|
||||
raise_on_error=False,
|
||||
),
|
||||
# Expected output varies by terraform version
|
||||
"Plan: 0 to add, 0 to change, 0 to destroy.",
|
||||
|
@ -52,35 +57,27 @@ CMD_CASES = [
|
|||
"var_to_output",
|
||||
],
|
||||
# try import aws instance
|
||||
[
|
||||
lambda x: x.cmd(
|
||||
"import", "aws_instance.foo", "i-abcd1234", no_color=IsFlagged
|
||||
),
|
||||
"",
|
||||
1,
|
||||
False,
|
||||
"Command: terraform import -no-color aws_instance.foo i-abcd1234",
|
||||
"",
|
||||
],
|
||||
# try import aws instance with raise_on_error
|
||||
[
|
||||
lambda x: x.cmd(
|
||||
"import",
|
||||
"aws_instance.foo",
|
||||
"i-abcd1234",
|
||||
no_color=IsFlagged,
|
||||
raise_on_error=True,
|
||||
raise_on_error=False,
|
||||
),
|
||||
"",
|
||||
1,
|
||||
True,
|
||||
"Command: terraform import -no-color aws_instance.foo i-abcd1234",
|
||||
False,
|
||||
"Error: No Terraform configuration files",
|
||||
"",
|
||||
],
|
||||
# test with space and special character in file path
|
||||
[
|
||||
lambda x: x.cmd(
|
||||
"plan", "var_to_output", out=FILE_PATH_WITH_SPACE_AND_SPACIAL_CHARS
|
||||
"plan",
|
||||
"var_to_output",
|
||||
out=FILE_PATH_WITH_SPACE_AND_SPACIAL_CHARS,
|
||||
raise_on_error=False,
|
||||
),
|
||||
"",
|
||||
0,
|
||||
|
@ -90,7 +87,9 @@ CMD_CASES = [
|
|||
],
|
||||
# test workspace command (commands with subcommand)
|
||||
[
|
||||
lambda x: x.cmd("workspace", "show", no_color=IsFlagged),
|
||||
lambda x: x.cmd(
|
||||
"workspace", "show", no_color=IsFlagged, raise_on_error=False
|
||||
),
|
||||
"",
|
||||
0,
|
||||
False,
|
||||
|
@ -114,24 +113,23 @@ def fmt_test_file(request):
|
|||
return
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def string_logger(request) -> Callable[..., str]:
|
||||
log_stream = StringIO()
|
||||
handler = logging.StreamHandler(log_stream)
|
||||
root_logger.addHandler(handler)
|
||||
# @pytest.fixture()
|
||||
# def string_logger(request) -> Callable[..., str]:
|
||||
# log_stream = StringIO()
|
||||
# handler = logging.StreamHandler(log_stream)
|
||||
# root_logger.addHandler(handler)
|
||||
|
||||
def td():
|
||||
root_logger.removeHandler(handler)
|
||||
log_stream.close()
|
||||
# def td():
|
||||
# root_logger.removeHandler(handler)
|
||||
# log_stream.close()
|
||||
|
||||
request.addfinalizer(td)
|
||||
return lambda: str(log_stream.getvalue())
|
||||
# request.addfinalizer(td)
|
||||
# return lambda: str(log_stream.getvalue())
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def workspace_setup_teardown():
|
||||
"""
|
||||
Fixture used in workspace related tests
|
||||
"""Fixture used in workspace related tests.
|
||||
|
||||
Create and tear down a workspace
|
||||
*Use as a contextmanager*
|
||||
|
@ -151,11 +149,9 @@ def workspace_setup_teardown():
|
|||
yield wrapper
|
||||
|
||||
|
||||
class TestTerraform(object):
|
||||
class TestTerraform:
|
||||
def teardown_method(self, _) -> None:
|
||||
""" teardown any state that was previously setup with a setup_method
|
||||
call.
|
||||
"""
|
||||
"""Teardown any state that was previously setup with a setup_method call."""
|
||||
exclude = ["test_tfstate_file", "test_tfstate_file2", "test_tfstate_file3"]
|
||||
|
||||
def purge(dir: str, pattern: str) -> None:
|
||||
|
@ -190,9 +186,10 @@ class TestTerraform(object):
|
|||
expected_ret_code: int,
|
||||
expected_exception: bool,
|
||||
expected_logs: str,
|
||||
string_logger: Callable[..., str],
|
||||
folder,
|
||||
caplog: LogCaptureFixture,
|
||||
folder: str,
|
||||
):
|
||||
with caplog.at_level(logging.INFO):
|
||||
tf = Terraform(working_dir=current_path)
|
||||
tf.init(folder)
|
||||
try:
|
||||
|
@ -203,11 +200,9 @@ class TestTerraform(object):
|
|||
ret = e.returncode
|
||||
out = e.out
|
||||
|
||||
logs = string_logger()
|
||||
logs = logs.replace("\n", "")
|
||||
assert expected_output in out
|
||||
assert expected_ret_code == ret
|
||||
assert expected_logs in logs
|
||||
assert expected_logs in caplog.text
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("folder", "variables", "var_files", "expected_output", "options"),
|
||||
|
@ -254,15 +249,18 @@ class TestTerraform(object):
|
|||
assert expected_output in out.replace("\n", "").replace(" ", "")
|
||||
assert err == ""
|
||||
|
||||
def test_apply_with_var_file(self, string_logger):
|
||||
def test_apply_with_var_file(self, caplog: LogCaptureFixture):
|
||||
with caplog.at_level(logging.INFO):
|
||||
tf = Terraform(working_dir=current_path)
|
||||
|
||||
tf.init()
|
||||
tf.apply(var_file=os.path.join(current_path, "tfvar_file", "test.tfvars"))
|
||||
logs = string_logger()
|
||||
logs = logs.split("\n")
|
||||
for log in logs:
|
||||
if log.startswith("command: terraform apply"):
|
||||
folder = "var_to_output"
|
||||
tf.init(folder)
|
||||
tf.apply(
|
||||
folder,
|
||||
var_file=os.path.join(current_path, "tfvar_files", "test.tfvars"),
|
||||
)
|
||||
for log in caplog.messages:
|
||||
if log.startswith("Command: terraform apply"):
|
||||
assert log.count("-var-file=") == 1
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -315,33 +313,23 @@ class TestTerraform(object):
|
|||
out = tf.output("test_output")
|
||||
assert "test2" in out
|
||||
|
||||
@pytest.mark.parametrize(("param"), [({}), ({"module": "test2"}),])
|
||||
def test_output(self, param, string_logger):
|
||||
tf = Terraform(working_dir=current_path, variables={"test_var": "test"})
|
||||
tf.init("var_to_output")
|
||||
tf.apply("var_to_output")
|
||||
result = tf.output("test_output", **param)
|
||||
regex = re.compile(
|
||||
"terraform output (-module=test2 -json|-json -module=test2) test_output"
|
||||
@pytest.mark.parametrize("output_all", [True, False])
|
||||
def test_output(self, caplog: LogCaptureFixture, output_all: bool):
|
||||
expected_value = "test"
|
||||
required_output = "test_output"
|
||||
with caplog.at_level(logging.INFO):
|
||||
tf = Terraform(
|
||||
working_dir=current_path, variables={"test_var": expected_value}
|
||||
)
|
||||
log_str = string_logger()
|
||||
if param:
|
||||
assert re.search(regex, log_str), log_str
|
||||
else:
|
||||
assert result == "test"
|
||||
|
||||
@pytest.mark.parametrize(("param"), [({}), ({"module": "test2"}),])
|
||||
def test_output_all(self, param, string_logger):
|
||||
tf = Terraform(working_dir=current_path, variables={"test_var": "test"})
|
||||
tf.init("var_to_output")
|
||||
tf.apply("var_to_output")
|
||||
result = tf.output(**param)
|
||||
regex = re.compile("terraform output (-module=test2 -json|-json -module=test2)")
|
||||
log_str = string_logger()
|
||||
if param:
|
||||
assert re.search(regex, log_str), log_str
|
||||
params = tuple() if output_all else (required_output,)
|
||||
result = tf.output(*params)
|
||||
if output_all:
|
||||
assert result[required_output]["value"] == expected_value
|
||||
else:
|
||||
assert result["test_output"]["value"] == "test"
|
||||
assert result == expected_value
|
||||
assert expected_value in caplog.messages[-1]
|
||||
|
||||
def test_destroy(self):
|
||||
tf = Terraform(working_dir=current_path, variables={"test_var": "test"})
|
||||
|
@ -355,22 +343,19 @@ class TestTerraform(object):
|
|||
)
|
||||
def test_plan(self, plan, variables, expected_ret):
|
||||
tf = Terraform(working_dir=current_path, variables=variables)
|
||||
ret, out, err = tf.plan(plan)
|
||||
assert ret == expected_ret
|
||||
tf.init(plan)
|
||||
with pytest.raises(TerraformCommandError) as e:
|
||||
tf.plan(plan)
|
||||
assert (
|
||||
e.value.err
|
||||
== """\nError: Missing required argument\n\nThe argument "region" is required, but was not set.\n\n"""
|
||||
)
|
||||
|
||||
def test_fmt(self, fmt_test_file):
|
||||
tf = Terraform(working_dir=current_path, variables={"test_var": "test"})
|
||||
ret, out, err = tf.fmt(diff=True)
|
||||
assert ret == 0
|
||||
|
||||
def test_import(self, string_logger):
|
||||
tf = Terraform(working_dir=current_path)
|
||||
tf.import_cmd("aws_instance.foo", "i-abc1234", no_color=IsFlagged)
|
||||
assert (
|
||||
"Command: terraform import -no-color aws_instance.foo i-abc1234"
|
||||
in string_logger()
|
||||
)
|
||||
|
||||
def test_create_workspace(self, workspace_setup_teardown):
|
||||
workspace_name = "test"
|
||||
with workspace_setup_teardown(workspace_name, create=False) as tf:
|
||||
|
@ -378,25 +363,24 @@ class TestTerraform(object):
|
|||
assert ret == 0
|
||||
assert err == ""
|
||||
|
||||
def test_create_workspace_with_args(self, workspace_setup_teardown, string_logger):
|
||||
def test_create_workspace_with_args(self, workspace_setup_teardown, caplog):
|
||||
workspace_name = "test"
|
||||
state_file_path = os.path.join(
|
||||
current_path, "test_tfstate_file2", "terraform.tfstate"
|
||||
)
|
||||
with workspace_setup_teardown(workspace_name, create=False) as tf:
|
||||
with workspace_setup_teardown(
|
||||
workspace_name, create=False
|
||||
) as tf, caplog.at_level(logging.INFO):
|
||||
ret, out, err = tf.create_workspace(
|
||||
"test", current_path, no_color=IsFlagged
|
||||
)
|
||||
|
||||
assert ret == 0
|
||||
assert err == ""
|
||||
|
||||
logs = string_logger()
|
||||
logs = logs.replace("\n", "")
|
||||
expected_log = "Command: terraform workspace new -no-color test {}".format(
|
||||
current_path
|
||||
assert (
|
||||
f"Command: terraform workspace new -no-color test {current_path}"
|
||||
in caplog.messages
|
||||
)
|
||||
assert expected_log in logs
|
||||
|
||||
def test_set_workspace(self, workspace_setup_teardown):
|
||||
workspace_name = "test"
|
||||
|
@ -405,22 +389,21 @@ class TestTerraform(object):
|
|||
assert ret == 0
|
||||
assert err == ""
|
||||
|
||||
def test_set_workspace_with_args(self, workspace_setup_teardown, string_logger):
|
||||
def test_set_workspace_with_args(self, workspace_setup_teardown, caplog):
|
||||
workspace_name = "test"
|
||||
with workspace_setup_teardown(workspace_name) as tf:
|
||||
with workspace_setup_teardown(workspace_name) as tf, caplog.at_level(
|
||||
logging.INFO
|
||||
):
|
||||
ret, out, err = tf.set_workspace(
|
||||
workspace_name, current_path, no_color=IsFlagged
|
||||
)
|
||||
|
||||
assert ret == 0
|
||||
assert err == ""
|
||||
|
||||
logs = string_logger()
|
||||
logs = logs.replace("\n", "")
|
||||
expected_log = "Command: terraform workspace select -no-color test {}".format(
|
||||
current_path
|
||||
assert (
|
||||
f"Command: terraform workspace select -no-color test {current_path}"
|
||||
in caplog.messages
|
||||
)
|
||||
assert expected_log in logs
|
||||
|
||||
def test_show_workspace(self, workspace_setup_teardown):
|
||||
workspace_name = "test"
|
||||
|
@ -429,20 +412,16 @@ class TestTerraform(object):
|
|||
assert ret == 0
|
||||
assert err == ""
|
||||
|
||||
def test_show_workspace_with_no_color(
|
||||
self, workspace_setup_teardown, string_logger
|
||||
):
|
||||
def test_show_workspace_with_no_color(self, workspace_setup_teardown, caplog):
|
||||
workspace_name = "test"
|
||||
with workspace_setup_teardown(workspace_name) as tf:
|
||||
with workspace_setup_teardown(workspace_name) as tf, caplog.at_level(
|
||||
logging.INFO
|
||||
):
|
||||
ret, out, err = tf.show_workspace(no_color=IsFlagged)
|
||||
|
||||
assert ret == 0
|
||||
assert err == ""
|
||||
|
||||
logs = string_logger()
|
||||
logs = logs.replace("\n", "")
|
||||
expected_log = "Command: terraform workspace show -no-color"
|
||||
assert expected_log in logs
|
||||
assert "Command: terraform workspace show -no-color" in caplog.messages
|
||||
|
||||
def test_delete_workspace(self, workspace_setup_teardown):
|
||||
workspace_name = "test"
|
||||
|
@ -452,9 +431,11 @@ class TestTerraform(object):
|
|||
assert ret == 0
|
||||
assert err == ""
|
||||
|
||||
def test_delete_workspace_with_args(self, workspace_setup_teardown, string_logger):
|
||||
def test_delete_workspace_with_args(self, workspace_setup_teardown, caplog):
|
||||
workspace_name = "test"
|
||||
with workspace_setup_teardown(workspace_name, delete=False) as tf:
|
||||
with workspace_setup_teardown(
|
||||
workspace_name, delete=False
|
||||
) as tf, caplog.at_level(logging.INFO):
|
||||
tf.set_workspace("default")
|
||||
ret, out, err = tf.delete_workspace(
|
||||
workspace_name, current_path, force=IsFlagged,
|
||||
|
@ -462,10 +443,7 @@ class TestTerraform(object):
|
|||
|
||||
assert ret == 0
|
||||
assert err == ""
|
||||
|
||||
logs = string_logger()
|
||||
logs = logs.replace("\n", "")
|
||||
expected_log = "Command: terraform workspace delete -force test {}".format(
|
||||
current_path
|
||||
assert (
|
||||
f"Command: terraform workspace delete -force test {current_path}"
|
||||
in caplog.messages
|
||||
)
|
||||
assert expected_log in logs
|
||||
|
|
Loading…
Reference in a new issue