dda-python-terraform/test/test_terraform.py
bom 3fd4a72d3d Adjust logic for plan files (version > 1.0.0)
We will no longer try to chdir into files and instead
add them as subcommands.
Includes regression test

Resolves #3 (https://gitlab.com/domaindrivenarchitecture/dda-python-terraform/-/issues/3)
2023-11-24 22:08:38 +01:00

557 lines
19 KiB
Python

import fnmatch
import logging
import os
import re
import shutil
from contextlib import contextmanager
from io import StringIO
from typing import Callable
from packaging import version
import pytest
from _pytest.logging import LogCaptureFixture, caplog
from dda_python_terraform import IsFlagged, IsNotFlagged, Terraform, TerraformCommandError
logging.basicConfig(level=logging.DEBUG)
root_logger = logging.getLogger()
current_path = os.path.dirname(os.path.realpath(__file__))
semantic_version = os.environ.get("TFVER")
FILE_PATH_WITH_SPACE_AND_SPACIAL_CHARS = "test 'test.out!"
STRING_CASES = [
[
lambda x: x.generate_cmd_string(
{}, "apply", "the_folder", no_color=IsFlagged),
"terraform apply -no-color the_folder",
],
[
lambda x: x.generate_cmd_string({},
"push", "path", vcs=True, token="token", atlas_address="url"
),
"terraform push -vcs=true -token=token -atlas-address=url path",
],
[
lambda x: x.generate_cmd_string({},
"refresh", "path", token="token"
),
"terraform refresh -token=token path",
],
]
CMD_CASES_0_x = [
[
"method",
"expected_output",
"expected_ret_code",
"expected_exception",
"expected_logs",
"folder",
],
[
[
lambda x: x.cmd(
{},
"plan",
"var_to_output",
no_color=IsFlagged,
var={"test_var": "test"},
raise_on_error=False,
),
# Expected output varies by terraform semantic_version
"Plan: 0 to add, 0 to change, 0 to destroy.",
0,
False,
"",
"var_to_output",
],
# try import aws instance
[
lambda x: x.cmd(
{},
"import",
"aws_instance.foo",
"i-abcd1234",
no_color=IsFlagged,
raise_on_error=False,
),
"",
1,
False,
"Error: No Terraform configuration files",
"",
],
# test with space and special character in file path
[
lambda x: x.cmd(
{},
"plan",
"var_to_output",
out=FILE_PATH_WITH_SPACE_AND_SPACIAL_CHARS,
raise_on_error=False,
),
"",
0,
False,
"",
"var_to_output",
],
# test workspace command (commands with subcommand)
[
lambda x: x.cmd(
{}, "workspace", "show", no_color=IsFlagged, raise_on_error=False
),
"",
0,
False,
"Command: terraform workspace show -no-color",
"",
],
],
]
CMD_CASES_1_x = [
[
"method",
"expected_output",
"expected_ret_code",
"expected_exception",
"expected_logs",
"folder",
],
[
[
lambda x: x.cmd(
{"chdir": "var_to_output"},
"plan",
no_color=IsFlagged,
var={"test_var": "test"},
raise_on_error=False,
),
# Expected output varies by terraform semantic_version
"Changes to Outputs:",
0,
False,
"",
"var_to_output",
],
# try import aws instance
[
lambda x: x.cmd(
{},
"import",
"aws_instance.foo",
"i-abcd1234",
no_color=IsFlagged,
raise_on_error=False,
),
"",
1,
False,
"Error: No Terraform configuration files",
"",
],
# test with space and special character in file path
[
lambda x: x.cmd(
{"chdir": "var_to_output"},
"plan",
out=FILE_PATH_WITH_SPACE_AND_SPACIAL_CHARS,
raise_on_error=False,
),
"",
0,
False,
"",
"var_to_output",
],
# test workspace command (commands with subcommand)
[
lambda x: x.cmd(
{}, "workspace", "show", no_color=IsFlagged, raise_on_error=False
),
"",
0,
False,
"Command: terraform workspace show -no-color",
"",
],
],
]
APPLY_CASES_0_x = [
["folder", "variables", "var_files", "expected_output", "options"],
[("var_to_output", {"test_var": "test"}, None, 'test_output=test', {}),
("var_to_output", {"test_list_var": ["c", "d"]}, None, 'test_list_output=["c","d",]', {},),
("var_to_output", {"test_map_var": {"c": "c", "d": "d"}}, None, 'test_map_output={"c"="c""d"="d"}', {},),
("var_to_output", {"test_map_var": {"c": "c", "d": "d"}}, "var_to_output/test_map_var.json", 'test_map_output={"e"="e""f"="f"}', {},),
("var_to_output", {}, None, "\x1b[0m\x1b[1m\x1b[32mApplycomplete!", {"no_color": IsNotFlagged},), ]]
APPLY_CASES_1_x = [
["folder", "variables", "var_files", "expected_output", "options"],
[("var_to_output", {"test_var": "test"}, None, 'test_output="test"', {}),
("var_to_output", {"test_list_var": ["c", "d"]}, None, 'test_list_output=tolist(["c","d",])', {},),
("var_to_output", {"test_map_var": {"c": "c", "d": "d"}}, None, 'test_map_output=tomap({"c"="c""d"="d"})', {},),
("var_to_output", {"test_map_var": {"c": "c", "d": "d"}}, "test_map_var.json", 'test_map_output=tomap({"e"="e""f"="f"})', {},),
("var_to_output", {}, None, "\x1b[0m\x1b[1m\x1b[32mApplycomplete!", {"no_color": IsNotFlagged},), ]]
@pytest.fixture(scope="function")
def fmt_test_file(request):
target = os.path.join(current_path, "bad_fmt", "test.backup")
orgin = os.path.join(current_path, "bad_fmt", "test.tf")
shutil.copy(orgin, target)
def td():
shutil.move(target, orgin)
request.addfinalizer(td)
return
# @pytest.fixture()
# def string_logger(request) -> Callable[..., str]:
# log_stream = StringIO()
# handler = logging.StreamHandler(log_stream)
# root_logger.addHandler(handler)
# def td():
# root_logger.removeHandler(handler)
# log_stream.close()
# request.addfinalizer(td)
# return lambda: str(log_stream.getvalue())
@pytest.fixture()
def workspace_setup_teardown():
"""Fixture used in workspace related tests.
Create and tear down a workspace
*Use as a contextmanager*
"""
@contextmanager
def wrapper(workspace_name, create=True, delete=True, *args, **kwargs):
tf = Terraform(working_dir=current_path, terraform_semantic_version=semantic_version)
tf.init()
if create:
tf.create_workspace(workspace_name, *args, **kwargs)
yield tf
if delete:
tf.set_workspace("default")
tf.delete_workspace(workspace_name)
yield wrapper
class TestTerraform:
def teardown_method(self, _) -> None:
"""Teardown any state that was previously setup with a setup_method call."""
exclude = ["test_tfstate_file",
"test_tfstate_file2", "test_tfstate_file3"]
def purge(dir: str, pattern: str) -> None:
for root, dirnames, filenames in os.walk(dir):
dirnames[:] = [d for d in dirnames if d not in exclude]
for filename in fnmatch.filter(filenames, pattern):
f = os.path.join(root, filename)
os.remove(f)
for dirname in fnmatch.filter(dirnames, pattern):
d = os.path.join(root, dirname)
shutil.rmtree(d)
purge(".", "*.tfstate")
purge(".", "*.tfstate.backup")
purge(".", "*.terraform")
purge(".", FILE_PATH_WITH_SPACE_AND_SPACIAL_CHARS)
@pytest.mark.parametrize(["method", "expected"], STRING_CASES)
def test_generate_cmd_string(self, method: Callable[..., str], expected: str):
tf = Terraform(working_dir=current_path, terraform_semantic_version=semantic_version)
result = method(tf)
strs = expected.split()
for s in strs:
assert s in result
@pytest.mark.parametrize(*(CMD_CASES_1_x if version.parse(semantic_version) >= version.parse("1.0.0") else CMD_CASES_0_x))
def test_cmd(
self,
method: Callable[..., str],
expected_output: str,
expected_ret_code: int,
expected_exception: bool,
expected_logs: str,
caplog: LogCaptureFixture,
folder: str,
):
with caplog.at_level(logging.INFO):
tf = Terraform(working_dir=current_path, terraform_semantic_version=semantic_version)
tf.init(folder)
try:
ret, out, _ = method(tf)
assert not expected_exception
except TerraformCommandError as e:
assert expected_exception
ret = e.returncode
out = e.out
assert expected_output in out
assert expected_ret_code == ret
assert expected_logs in caplog.text
@pytest.mark.parametrize(*(APPLY_CASES_1_x if version.parse(semantic_version) >= version.parse("1.0.0") else APPLY_CASES_0_x))
def test_apply(self, folder, variables, var_files, expected_output, options):
tf = Terraform(
working_dir=current_path, variables=variables, var_file=var_files, terraform_semantic_version=semantic_version
)
tf.init(folder)
ret, out, err = tf.apply(folder, **options)
assert ret == 0
assert expected_output in out.replace("\n", "").replace(" ", "")
assert err == ""
def test_apply_plan(self):
# test is only applicable to version > 1.0.0
if version.parse(semantic_version) < version.parse("1.0.0"):
return
tf = Terraform(
working_dir=current_path, terraform_semantic_version=semantic_version
)
out_folder = 'var_to_output'
out_file_name = 'test.out'
out_file_path = f'{out_folder}/{out_file_name}'
tf.init(out_folder)
ret, _, err = tf.plan(out_folder, detailed_exitcode=IsNotFlagged, out=out_file_name)
assert ret == 0
assert err == ""
ret, _, err = tf.apply(out_file_path, skip_plan=True)
assert ret == 0
assert err == ""
def test_apply_with_var_file(self, caplog: LogCaptureFixture):
with caplog.at_level(logging.INFO):
tf = Terraform(working_dir=current_path, terraform_semantic_version=semantic_version)
folder = "var_to_output"
tf.init(folder)
tf.apply(
folder,
var_file=os.path.join(
current_path, "tfvar_files", "test.tfvars"),
)
for log in caplog.messages:
if log.startswith("Command: terraform apply"):
assert log.count("-var-file=") == 1
@pytest.mark.parametrize(
["cmd", "args", "options"],
[
# bool value
("fmt", ["bad_fmt"], {"list": False, "diff": False})
],
)
def test_options(self, cmd, args, options, fmt_test_file):
tf = Terraform(working_dir=current_path, terraform_semantic_version=semantic_version)
ret, out, err = getattr(tf, cmd)(*args, **options)
assert ret == 0
assert out == ""
def test_state_data(self):
cwd = os.path.join(current_path, "test_tfstate_file")
tf = Terraform(working_dir=cwd, state="tfstate.test",
terraform_semantic_version=semantic_version)
tf.read_state_file()
assert tf.tfstate.modules[0]["path"] == ["root"]
def test_state_default(self):
cwd = os.path.join(current_path, "test_tfstate_file2")
tf = Terraform(working_dir=cwd, terraform_semantic_version=semantic_version)
tf.read_state_file()
assert tf.tfstate.modules[0]["path"] == ["default"]
def test_state_default_backend(self):
cwd = os.path.join(current_path, "test_tfstate_file3")
tf = Terraform(working_dir=cwd, terraform_semantic_version=semantic_version)
tf.read_state_file()
assert tf.tfstate.modules[0]["path"] == ["default_backend"]
def test_pre_load_state_data(self):
cwd = os.path.join(current_path, "test_tfstate_file")
tf = Terraform(working_dir=cwd, state="tfstate.test",
terraform_semantic_version=semantic_version)
assert tf.tfstate.modules[0]["path"] == ["root"]
@pytest.mark.parametrize(
("folder", "variables"), [("var_to_output", {"test_var": "test"})]
)
def test_override_default(self, folder, variables):
tf = Terraform(working_dir=current_path,
variables=variables, terraform_semantic_version=semantic_version)
tf.init(folder)
ret, out, err = tf.apply(
folder, var={"test_var": "test2"}, no_color=IsNotFlagged,
)
out = out.replace("\n", "")
assert "\x1b[0m\x1b[1m\x1b[32mApply" in out
out = tf.output(folder, "test_output")
assert "test2" in out
@pytest.mark.parametrize("output_all", [True, False])
def test_output(self, caplog: LogCaptureFixture, output_all: bool):
expected_value = "test"
required_output = "test_output"
with caplog.at_level(logging.INFO):
tf = Terraform(
working_dir=current_path, variables={"test_var": expected_value}, terraform_semantic_version=semantic_version
)
tf.init("var_to_output")
tf.apply("var_to_output")
params = tuple() if output_all else (required_output,)
result = tf.output("var_to_output", *params)
if output_all:
assert result[required_output]["value"] == expected_value
else:
assert result == expected_value
assert expected_value in caplog.messages[-1]
def test_destroy(self):
tf = Terraform(working_dir=current_path, variables={
"test_var": "test"}, terraform_semantic_version=semantic_version)
tf.init("var_to_output")
ret, out, err = tf.destroy("var_to_output")
assert ret == 0
assert "Destroy complete! Resources: 0 destroyed." in out
@pytest.mark.parametrize(
("plan", "variables", "expected_ret"), [("vars_require_input", {}, 1)]
)
def test_plan(self, plan, variables, expected_ret):
tf = Terraform(working_dir=current_path,
variables=variables, terraform_semantic_version=semantic_version)
tf.init(plan)
with pytest.raises(TerraformCommandError) as e:
tf.plan(plan)
assert (
"\nError:" in e.value.err
)
def test_fmt(self, fmt_test_file):
tf = Terraform(working_dir=current_path, variables={
"test_var": "test"}, terraform_semantic_version=semantic_version)
ret, out, err = tf.fmt(diff=True)
assert ret == 0
def test_create_workspace(self, workspace_setup_teardown):
workspace_name = "test"
with workspace_setup_teardown(workspace_name, create=False) as tf:
ret, out, err = tf.create_workspace("test")
assert ret == 0
assert err == ""
# The directory flag is no longer supported in v1.0.8
# this should either be done with -chdir or probably just be removed
"""
def test_create_workspace_with_args(self, workspace_setup_teardown, caplog):
workspace_name = "test"
state_file_path = os.path.join(
current_path, "test_tfstate_file2", "terraform.tfstate"
)
with workspace_setup_teardown(
workspace_name, create=False
) as tf, caplog.at_level(logging.INFO):
ret, out, err = tf.create_workspace(
"test", current_path, no_color=IsFlagged
)
assert ret == 0
assert err == ""
assert (
f"Command: terraform workspace new -no-color test {current_path}"
in caplog.messages
)
"""
def test_set_workspace(self, workspace_setup_teardown):
workspace_name = "test"
with workspace_setup_teardown(workspace_name) as tf:
ret, out, err = tf.set_workspace(workspace_name)
assert ret == 0
assert err == ""
# see comment on test_create_workspace_with_args
"""
def test_set_workspace_with_args(self, workspace_setup_teardown, caplog):
workspace_name = "test"
with workspace_setup_teardown(workspace_name) as tf, caplog.at_level(
logging.INFO
):
ret, out, err = tf.set_workspace(
workspace_name, current_path, no_color=IsFlagged
)
assert ret == 0
assert err == ""
assert (
f"Command: terraform workspace select -no-color test {current_path}"
in caplog.messages
)
"""
def test_show_workspace(self, workspace_setup_teardown):
workspace_name = "test"
with workspace_setup_teardown(workspace_name) as tf:
ret, out, err = tf.show_workspace()
assert ret == 0
assert err == ""
def test_show_workspace_with_no_color(self, workspace_setup_teardown, caplog):
workspace_name = "test"
with workspace_setup_teardown(workspace_name) as tf, caplog.at_level(
logging.INFO
):
ret, out, err = tf.show_workspace(no_color=IsFlagged)
assert ret == 0
assert err == ""
assert "Command: terraform workspace show -no-color" in caplog.messages
def test_delete_workspace(self, workspace_setup_teardown):
workspace_name = "test"
with workspace_setup_teardown(workspace_name, delete=False) as tf:
tf.set_workspace("default")
ret, out, err = tf.delete_workspace(workspace_name)
assert ret == 0
assert err == ""
# see above comments
"""
def test_delete_workspace_with_args(self, workspace_setup_teardown, caplog):
workspace_name = "test"
with workspace_setup_teardown(
workspace_name, delete=False
) as tf, caplog.at_level(logging.INFO):
tf.set_workspace("default")
ret, out, err = tf.delete_workspace(
workspace_name, current_path, force=IsFlagged,
)
assert ret == 0
assert err == ""
assert (
f"Command: terraform workspace delete -force test {current_path}"
in caplog.messages
)
"""
def test_list_workspace(self):
tf = Terraform(working_dir=current_path)
workspaces = tf.list_workspace()
assert len(workspaces) > 0
assert 'default' in workspaces