import fnmatch import logging import os import re import shutil from contextlib import contextmanager from io import StringIO from typing import Callable from packaging import version import pytest from _pytest.logging import LogCaptureFixture, caplog from dda_python_terraform import IsFlagged, IsNotFlagged, Terraform, TerraformCommandError logging.basicConfig(level=logging.DEBUG) root_logger = logging.getLogger() current_path = os.path.dirname(os.path.realpath(__file__)) semantic_version = os.environ.get("TFVER") FILE_PATH_WITH_SPACE_AND_SPACIAL_CHARS = "test 'test.out!" STRING_CASES = [ [ lambda x: x.generate_cmd_string( {}, "apply", "the_folder", no_color=IsFlagged), "terraform apply -no-color the_folder", ], [ lambda x: x.generate_cmd_string({}, "push", "path", vcs=True, token="token", atlas_address="url" ), "terraform push -vcs=true -token=token -atlas-address=url path", ], [ lambda x: x.generate_cmd_string({}, "refresh", "path", token="token" ), "terraform refresh -token=token path", ], ] CMD_CASES_0_x = [ [ "method", "expected_output", "expected_ret_code", "expected_exception", "expected_logs", "folder", ], [ [ lambda x: x.cmd( {}, "plan", "var_to_output", no_color=IsFlagged, var={"test_var": "test"}, raise_on_error=False, ), # Expected output varies by terraform semantic_version "Plan: 0 to add, 0 to change, 0 to destroy.", 0, False, "", "var_to_output", ], # try import aws instance [ lambda x: x.cmd( {}, "import", "aws_instance.foo", "i-abcd1234", no_color=IsFlagged, raise_on_error=False, ), "", 1, False, "Error: No Terraform configuration files", "", ], # test with space and special character in file path [ lambda x: x.cmd( {}, "plan", "var_to_output", out=FILE_PATH_WITH_SPACE_AND_SPACIAL_CHARS, raise_on_error=False, ), "", 0, False, "", "var_to_output", ], # test workspace command (commands with subcommand) [ lambda x: x.cmd( {}, "workspace", "show", no_color=IsFlagged, raise_on_error=False ), "", 0, False, "Command: terraform workspace show -no-color", "", ], ], ] CMD_CASES_1_x = [ [ "method", "expected_output", "expected_ret_code", "expected_exception", "expected_logs", "folder", ], [ [ lambda x: x.cmd( {"chdir": "var_to_output"}, "plan", no_color=IsFlagged, var={"test_var": "test"}, raise_on_error=False, ), # Expected output varies by terraform semantic_version "Changes to Outputs:", 0, False, "", "var_to_output", ], # try import aws instance [ lambda x: x.cmd( {}, "import", "aws_instance.foo", "i-abcd1234", no_color=IsFlagged, raise_on_error=False, ), "", 1, False, "Error: No Terraform configuration files", "", ], # test with space and special character in file path [ lambda x: x.cmd( {"chdir": "var_to_output"}, "plan", out=FILE_PATH_WITH_SPACE_AND_SPACIAL_CHARS, raise_on_error=False, ), "", 0, False, "", "var_to_output", ], # test workspace command (commands with subcommand) [ lambda x: x.cmd( {}, "workspace", "show", no_color=IsFlagged, raise_on_error=False ), "", 0, False, "Command: terraform workspace show -no-color", "", ], ], ] APPLY_CASES_0_x = [ ["folder", "variables", "var_files", "expected_output", "options"], [("var_to_output", {"test_var": "test"}, None, 'test_output=test', {}), ("var_to_output", {"test_list_var": ["c", "d"]}, None, 'test_list_output=["c","d",]', {},), ("var_to_output", {"test_map_var": {"c": "c", "d": "d"}}, None, 'test_map_output={"c"="c""d"="d"}', {},), ("var_to_output", {"test_map_var": {"c": "c", "d": "d"}}, "var_to_output/test_map_var.json", 'test_map_output={"e"="e""f"="f"}', {},), ("var_to_output", {}, None, "\x1b[0m\x1b[1m\x1b[32mApplycomplete!", {"no_color": IsNotFlagged},), ]] APPLY_CASES_1_x = [ ["folder", "variables", "var_files", "expected_output", "options"], [("var_to_output", {"test_var": "test"}, None, 'test_output="test"', {}), ("var_to_output", {"test_list_var": ["c", "d"]}, None, 'test_list_output=tolist(["c","d",])', {},), ("var_to_output", {"test_map_var": {"c": "c", "d": "d"}}, None, 'test_map_output=tomap({"c"="c""d"="d"})', {},), ("var_to_output", {"test_map_var": {"c": "c", "d": "d"}}, "test_map_var.json", 'test_map_output=tomap({"e"="e""f"="f"})', {},), ("var_to_output", {}, None, "\x1b[0m\x1b[1m\x1b[32mApplycomplete!", {"no_color": IsNotFlagged},), ]] @pytest.fixture(scope="function") def fmt_test_file(request): target = os.path.join(current_path, "bad_fmt", "test.backup") orgin = os.path.join(current_path, "bad_fmt", "test.tf") shutil.copy(orgin, target) def td(): shutil.move(target, orgin) request.addfinalizer(td) return # @pytest.fixture() # def string_logger(request) -> Callable[..., str]: # log_stream = StringIO() # handler = logging.StreamHandler(log_stream) # root_logger.addHandler(handler) # def td(): # root_logger.removeHandler(handler) # log_stream.close() # request.addfinalizer(td) # return lambda: str(log_stream.getvalue()) @pytest.fixture() def workspace_setup_teardown(): """Fixture used in workspace related tests. Create and tear down a workspace *Use as a contextmanager* """ @contextmanager def wrapper(workspace_name, create=True, delete=True, *args, **kwargs): tf = Terraform(working_dir=current_path, terraform_semantic_version=semantic_version) tf.init() if create: tf.create_workspace(workspace_name, *args, **kwargs) yield tf if delete: tf.set_workspace("default") tf.delete_workspace(workspace_name) yield wrapper class TestTerraform: def teardown_method(self, _) -> None: """Teardown any state that was previously setup with a setup_method call.""" exclude = ["test_tfstate_file", "test_tfstate_file2", "test_tfstate_file3"] def purge(dir: str, pattern: str) -> None: for root, dirnames, filenames in os.walk(dir): dirnames[:] = [d for d in dirnames if d not in exclude] for filename in fnmatch.filter(filenames, pattern): f = os.path.join(root, filename) os.remove(f) for dirname in fnmatch.filter(dirnames, pattern): d = os.path.join(root, dirname) shutil.rmtree(d) purge(".", "*.tfstate") purge(".", "*.tfstate.backup") purge(".", "*.terraform") purge(".", FILE_PATH_WITH_SPACE_AND_SPACIAL_CHARS) @pytest.mark.parametrize(["method", "expected"], STRING_CASES) def test_generate_cmd_string(self, method: Callable[..., str], expected: str): tf = Terraform(working_dir=current_path, terraform_semantic_version=semantic_version) result = method(tf) strs = expected.split() for s in strs: assert s in result @pytest.mark.parametrize(*(CMD_CASES_1_x if version.parse(semantic_version) >= version.parse("1.0.0") else CMD_CASES_0_x)) def test_cmd( self, method: Callable[..., str], expected_output: str, expected_ret_code: int, expected_exception: bool, expected_logs: str, caplog: LogCaptureFixture, folder: str, ): with caplog.at_level(logging.INFO): tf = Terraform(working_dir=current_path, terraform_semantic_version=semantic_version) tf.init(folder) try: ret, out, _ = method(tf) assert not expected_exception except TerraformCommandError as e: assert expected_exception ret = e.returncode out = e.out assert expected_output in out assert expected_ret_code == ret assert expected_logs in caplog.text @pytest.mark.parametrize(*(APPLY_CASES_1_x if version.parse(semantic_version) >= version.parse("1.0.0") else APPLY_CASES_0_x)) def test_apply(self, folder, variables, var_files, expected_output, options): tf = Terraform( working_dir=current_path, variables=variables, var_file=var_files, terraform_semantic_version=semantic_version ) tf.init(folder) ret, out, err = tf.apply(folder, **options) assert ret == 0 assert expected_output in out.replace("\n", "").replace(" ", "") assert err == "" def test_apply_plan(self): # test is only applicable to version > 1.0.0 if version.parse(semantic_version) < version.parse("1.0.0"): return tf = Terraform( working_dir=current_path, terraform_semantic_version=semantic_version ) out_folder = 'var_to_output' out_file_name = 'test.out' out_file_path = f'{out_folder}/{out_file_name}' tf.init(out_folder) ret, _, err = tf.plan(out_folder, detailed_exitcode=IsNotFlagged, out=out_file_name) assert ret == 0 assert err == "" ret, _, err = tf.apply(out_file_path, skip_plan=True) assert ret == 0 assert err == "" def test_apply_with_var_file(self, caplog: LogCaptureFixture): with caplog.at_level(logging.INFO): tf = Terraform(working_dir=current_path, terraform_semantic_version=semantic_version) folder = "var_to_output" tf.init(folder) tf.apply( folder, var_file=os.path.join( current_path, "tfvar_files", "test.tfvars"), ) for log in caplog.messages: if log.startswith("Command: terraform apply"): assert log.count("-var-file=") == 1 @pytest.mark.parametrize( ["cmd", "args", "options"], [ # bool value ("fmt", ["bad_fmt"], {"list": False, "diff": False}) ], ) def test_options(self, cmd, args, options, fmt_test_file): tf = Terraform(working_dir=current_path, terraform_semantic_version=semantic_version) ret, out, err = getattr(tf, cmd)(*args, **options) assert ret == 0 assert out == "" def test_state_data(self): cwd = os.path.join(current_path, "test_tfstate_file") tf = Terraform(working_dir=cwd, state="tfstate.test", terraform_semantic_version=semantic_version) tf.read_state_file() assert tf.tfstate.modules[0]["path"] == ["root"] def test_state_default(self): cwd = os.path.join(current_path, "test_tfstate_file2") tf = Terraform(working_dir=cwd, terraform_semantic_version=semantic_version) tf.read_state_file() assert tf.tfstate.modules[0]["path"] == ["default"] def test_state_default_backend(self): cwd = os.path.join(current_path, "test_tfstate_file3") tf = Terraform(working_dir=cwd, terraform_semantic_version=semantic_version) tf.read_state_file() assert tf.tfstate.modules[0]["path"] == ["default_backend"] def test_pre_load_state_data(self): cwd = os.path.join(current_path, "test_tfstate_file") tf = Terraform(working_dir=cwd, state="tfstate.test", terraform_semantic_version=semantic_version) assert tf.tfstate.modules[0]["path"] == ["root"] @pytest.mark.parametrize( ("folder", "variables"), [("var_to_output", {"test_var": "test"})] ) def test_override_default(self, folder, variables): tf = Terraform(working_dir=current_path, variables=variables, terraform_semantic_version=semantic_version) tf.init(folder) ret, out, err = tf.apply( folder, var={"test_var": "test2"}, no_color=IsNotFlagged, ) out = out.replace("\n", "") assert "\x1b[0m\x1b[1m\x1b[32mApply" in out out = tf.output(folder, "test_output") assert "test2" in out @pytest.mark.parametrize("output_all", [True, False]) def test_output(self, caplog: LogCaptureFixture, output_all: bool): expected_value = "test" required_output = "test_output" with caplog.at_level(logging.INFO): tf = Terraform( working_dir=current_path, variables={"test_var": expected_value}, terraform_semantic_version=semantic_version ) tf.init("var_to_output") tf.apply("var_to_output") params = tuple() if output_all else (required_output,) result = tf.output("var_to_output", *params) if output_all: assert result[required_output]["value"] == expected_value else: assert result == expected_value assert expected_value in caplog.messages[-1] def test_destroy(self): tf = Terraform(working_dir=current_path, variables={ "test_var": "test"}, terraform_semantic_version=semantic_version) tf.init("var_to_output") ret, out, err = tf.destroy("var_to_output") assert ret == 0 assert "Destroy complete! Resources: 0 destroyed." in out @pytest.mark.parametrize( ("plan", "variables", "expected_ret"), [("vars_require_input", {}, 1)] ) def test_plan(self, plan, variables, expected_ret): tf = Terraform(working_dir=current_path, variables=variables, terraform_semantic_version=semantic_version) tf.init(plan) with pytest.raises(TerraformCommandError) as e: tf.plan(plan) assert ( "\nError:" in e.value.err ) def test_fmt(self, fmt_test_file): tf = Terraform(working_dir=current_path, variables={ "test_var": "test"}, terraform_semantic_version=semantic_version) ret, out, err = tf.fmt(diff=True) assert ret == 0 def test_create_workspace(self, workspace_setup_teardown): workspace_name = "test" with workspace_setup_teardown(workspace_name, create=False) as tf: ret, out, err = tf.create_workspace("test") assert ret == 0 assert err == "" # The directory flag is no longer supported in v1.0.8 # this should either be done with -chdir or probably just be removed """ def test_create_workspace_with_args(self, workspace_setup_teardown, caplog): workspace_name = "test" state_file_path = os.path.join( current_path, "test_tfstate_file2", "terraform.tfstate" ) with workspace_setup_teardown( workspace_name, create=False ) as tf, caplog.at_level(logging.INFO): ret, out, err = tf.create_workspace( "test", current_path, no_color=IsFlagged ) assert ret == 0 assert err == "" assert ( f"Command: terraform workspace new -no-color test {current_path}" in caplog.messages ) """ def test_set_workspace(self, workspace_setup_teardown): workspace_name = "test" with workspace_setup_teardown(workspace_name) as tf: ret, out, err = tf.set_workspace(workspace_name) assert ret == 0 assert err == "" # see comment on test_create_workspace_with_args """ def test_set_workspace_with_args(self, workspace_setup_teardown, caplog): workspace_name = "test" with workspace_setup_teardown(workspace_name) as tf, caplog.at_level( logging.INFO ): ret, out, err = tf.set_workspace( workspace_name, current_path, no_color=IsFlagged ) assert ret == 0 assert err == "" assert ( f"Command: terraform workspace select -no-color test {current_path}" in caplog.messages ) """ def test_show_workspace(self, workspace_setup_teardown): workspace_name = "test" with workspace_setup_teardown(workspace_name) as tf: ret, out, err = tf.show_workspace() assert ret == 0 assert err == "" def test_show_workspace_with_no_color(self, workspace_setup_teardown, caplog): workspace_name = "test" with workspace_setup_teardown(workspace_name) as tf, caplog.at_level( logging.INFO ): ret, out, err = tf.show_workspace(no_color=IsFlagged) assert ret == 0 assert err == "" assert "Command: terraform workspace show -no-color" in caplog.messages def test_delete_workspace(self, workspace_setup_teardown): workspace_name = "test" with workspace_setup_teardown(workspace_name, delete=False) as tf: tf.set_workspace("default") ret, out, err = tf.delete_workspace(workspace_name) assert ret == 0 assert err == "" # see above comments """ def test_delete_workspace_with_args(self, workspace_setup_teardown, caplog): workspace_name = "test" with workspace_setup_teardown( workspace_name, delete=False ) as tf, caplog.at_level(logging.INFO): tf.set_workspace("default") ret, out, err = tf.delete_workspace( workspace_name, current_path, force=IsFlagged, ) assert ret == 0 assert err == "" assert ( f"Command: terraform workspace delete -force test {current_path}" in caplog.messages ) """ def test_list_workspace(self): tf = Terraform(working_dir=current_path) workspaces = tf.list_workspace() assert len(workspaces) > 0 assert 'default' in workspaces