[breaking change] use semantic version string

This commit is contained in:
Michael Jerger 2022-08-19 10:16:40 +00:00
parent 9c9d7dc14b
commit 1a7955b738
3 changed files with 29 additions and 27 deletions

View file

@ -5,6 +5,7 @@ import subprocess
import sys import sys
import tempfile import tempfile
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Type, Union from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Type, Union
from packaging import version
from dda_python_terraform.tfstate import Tfstate from dda_python_terraform.tfstate import Tfstate
@ -52,7 +53,7 @@ class Terraform:
var_file: Optional[str] = None, var_file: Optional[str] = None,
terraform_bin_path: Optional[str] = None, terraform_bin_path: Optional[str] = None,
is_env_vars_included: bool = True, is_env_vars_included: bool = True,
terraform_version: Optional[float] = 0.13 terraform_semantic_version: Optional[str] = "0.13.0"
): ):
""" """
:param working_dir: the folder of the working folder, if not given, :param working_dir: the folder of the working folder, if not given,
@ -69,6 +70,7 @@ class Terraform:
:param terraform_bin_path: binary path of terraform :param terraform_bin_path: binary path of terraform
:type is_env_vars_included: bool :type is_env_vars_included: bool
:param is_env_vars_included: included env variables when calling terraform cmd :param is_env_vars_included: included env variables when calling terraform cmd
:param terrform_semantic_version encodes major.minor.patch version of terraform. Defaults to 0.13.0
""" """
self.is_env_vars_included = is_env_vars_included self.is_env_vars_included = is_env_vars_included
self.working_dir = working_dir self.working_dir = working_dir
@ -79,7 +81,7 @@ class Terraform:
self.terraform_bin_path = ( self.terraform_bin_path = (
terraform_bin_path if terraform_bin_path else "terraform" terraform_bin_path if terraform_bin_path else "terraform"
) )
self.terraform_version = terraform_version self.terraform_semantic_version = terraform_semantic_version
self.var_file = var_file self.var_file = var_file
self.temp_var_files = VariableFiles() self.temp_var_files = VariableFiles()
@ -156,7 +158,7 @@ class Terraform:
global_opts = self._generate_default_general_options(dir_or_plan) global_opts = self._generate_default_general_options(dir_or_plan)
default = kwargs.copy() default = kwargs.copy()
# force is no longer a flag in version >= 1.0 # force is no longer a flag in version >= 1.0
if self.terraform_version < 1.0: if version.parse(self.terraform_semantic_version) < version.parse("1.0.0"):
default["force"] = force default["force"] = force
default["auto-approve"] = True default["auto-approve"] = True
options = self._generate_default_options(default) options = self._generate_default_options(default)
@ -458,13 +460,13 @@ class Terraform:
) )
def _generate_default_args(self, dir_or_plan: Optional[str]) -> Sequence[str]: def _generate_default_args(self, dir_or_plan: Optional[str]) -> Sequence[str]:
if (self.terraform_version < 1.0 and dir_or_plan): if (version.parse(self.terraform_semantic_version) < version.parse("1.0.0") and dir_or_plan):
return [dir_or_plan] return [dir_or_plan]
else: else:
return [] return []
def _generate_default_general_options(self, dir_or_plan: Optional[str]) -> Dict[str, Any]: def _generate_default_general_options(self, dir_or_plan: Optional[str]) -> Dict[str, Any]:
if (self.terraform_version >= 1.0 and dir_or_plan): if (version.parse(self.terraform_semantic_version) >= version.parse("1.0.0") and dir_or_plan):
return {"chdir": dir_or_plan} return {"chdir": dir_or_plan}
else: else:
return {} return {}
@ -544,7 +546,6 @@ class Terraform:
class VariableFiles: class VariableFiles:
def __init__(self): def __init__(self):
self.files = [] self.files = []

View file

@ -0,0 +1 @@
packaging

View file

@ -6,6 +6,7 @@ import shutil
from contextlib import contextmanager from contextlib import contextmanager
from io import StringIO from io import StringIO
from typing import Callable from typing import Callable
from packaging import version
import pytest import pytest
from _pytest.logging import LogCaptureFixture, caplog from _pytest.logging import LogCaptureFixture, caplog
@ -17,8 +18,7 @@ root_logger = logging.getLogger()
current_path = os.path.dirname(os.path.realpath(__file__)) current_path = os.path.dirname(os.path.realpath(__file__))
version = 1.0 if (os.environ.get("TFVER") and os.environ.get( semantic_version = os.environ.get("TFVER")
"TFVER").startswith("1")) else 0.13
FILE_PATH_WITH_SPACE_AND_SPACIAL_CHARS = "test 'test.out!" FILE_PATH_WITH_SPACE_AND_SPACIAL_CHARS = "test 'test.out!"
STRING_CASES = [ STRING_CASES = [
@ -60,7 +60,7 @@ CMD_CASES_0_x = [
var={"test_var": "test"}, var={"test_var": "test"},
raise_on_error=False, raise_on_error=False,
), ),
# Expected output varies by terraform version # Expected output varies by terraform semantic_version
"Plan: 0 to add, 0 to change, 0 to destroy.", "Plan: 0 to add, 0 to change, 0 to destroy.",
0, 0,
False, False,
@ -131,7 +131,7 @@ CMD_CASES_1_x = [
var={"test_var": "test"}, var={"test_var": "test"},
raise_on_error=False, raise_on_error=False,
), ),
# Expected output varies by terraform version # Expected output varies by terraform semantic_version
"Changes to Outputs:", "Changes to Outputs:",
0, 0,
False, False,
@ -236,7 +236,7 @@ def workspace_setup_teardown():
@contextmanager @contextmanager
def wrapper(workspace_name, create=True, delete=True, *args, **kwargs): def wrapper(workspace_name, create=True, delete=True, *args, **kwargs):
tf = Terraform(working_dir=current_path, terraform_version=version) tf = Terraform(working_dir=current_path, terraform_semantic_version=semantic_version)
tf.init() tf.init()
if create: if create:
tf.create_workspace(workspace_name, *args, **kwargs) tf.create_workspace(workspace_name, *args, **kwargs)
@ -271,14 +271,14 @@ class TestTerraform:
@pytest.mark.parametrize(["method", "expected"], STRING_CASES) @pytest.mark.parametrize(["method", "expected"], STRING_CASES)
def test_generate_cmd_string(self, method: Callable[..., str], expected: str): def test_generate_cmd_string(self, method: Callable[..., str], expected: str):
tf = Terraform(working_dir=current_path, terraform_version=version) tf = Terraform(working_dir=current_path, terraform_semantic_version=semantic_version)
result = method(tf) result = method(tf)
strs = expected.split() strs = expected.split()
for s in strs: for s in strs:
assert s in result assert s in result
@pytest.mark.parametrize(*(CMD_CASES_1_x if version >= 1.0 else CMD_CASES_0_x)) @pytest.mark.parametrize(*(CMD_CASES_1_x if version.parse(semantic_version) >= version.parse("1.0.0") else CMD_CASES_0_x))
def test_cmd( def test_cmd(
self, self,
method: Callable[..., str], method: Callable[..., str],
@ -290,7 +290,7 @@ class TestTerraform:
folder: str, folder: str,
): ):
with caplog.at_level(logging.INFO): with caplog.at_level(logging.INFO):
tf = Terraform(working_dir=current_path, terraform_version=version) tf = Terraform(working_dir=current_path, terraform_semantic_version=semantic_version)
tf.init(folder) tf.init(folder)
try: try:
ret, out, _ = method(tf) ret, out, _ = method(tf)
@ -305,10 +305,10 @@ class TestTerraform:
assert expected_logs in caplog.text assert expected_logs in caplog.text
@pytest.mark.parametrize(*(APPLY_CASES_1_x if version >= 1.0 else APPLY_CASES_0_x)) @pytest.mark.parametrize(*(APPLY_CASES_1_x if version.parse(semantic_version) >= version.parse("1.0.0") else APPLY_CASES_0_x))
def test_apply(self, folder, variables, var_files, expected_output, options): def test_apply(self, folder, variables, var_files, expected_output, options):
tf = Terraform( tf = Terraform(
working_dir=current_path, variables=variables, var_file=var_files, terraform_version=version working_dir=current_path, variables=variables, var_file=var_files, terraform_semantic_version=semantic_version
) )
tf.init(folder) tf.init(folder)
ret, out, err = tf.apply(folder, **options) ret, out, err = tf.apply(folder, **options)
@ -318,7 +318,7 @@ class TestTerraform:
def test_apply_with_var_file(self, caplog: LogCaptureFixture): def test_apply_with_var_file(self, caplog: LogCaptureFixture):
with caplog.at_level(logging.INFO): with caplog.at_level(logging.INFO):
tf = Terraform(working_dir=current_path, terraform_version=version) tf = Terraform(working_dir=current_path, terraform_semantic_version=semantic_version)
folder = "var_to_output" folder = "var_to_output"
tf.init(folder) tf.init(folder)
tf.apply( tf.apply(
@ -338,7 +338,7 @@ class TestTerraform:
], ],
) )
def test_options(self, cmd, args, options, fmt_test_file): def test_options(self, cmd, args, options, fmt_test_file):
tf = Terraform(working_dir=current_path, terraform_version=version) tf = Terraform(working_dir=current_path, terraform_semantic_version=semantic_version)
ret, out, err = getattr(tf, cmd)(*args, **options) ret, out, err = getattr(tf, cmd)(*args, **options)
assert ret == 0 assert ret == 0
assert out == "" assert out == ""
@ -346,26 +346,26 @@ class TestTerraform:
def test_state_data(self): def test_state_data(self):
cwd = os.path.join(current_path, "test_tfstate_file") cwd = os.path.join(current_path, "test_tfstate_file")
tf = Terraform(working_dir=cwd, state="tfstate.test", tf = Terraform(working_dir=cwd, state="tfstate.test",
terraform_version=version) terraform_semantic_version=semantic_version)
tf.read_state_file() tf.read_state_file()
assert tf.tfstate.modules[0]["path"] == ["root"] assert tf.tfstate.modules[0]["path"] == ["root"]
def test_state_default(self): def test_state_default(self):
cwd = os.path.join(current_path, "test_tfstate_file2") cwd = os.path.join(current_path, "test_tfstate_file2")
tf = Terraform(working_dir=cwd, terraform_version=version) tf = Terraform(working_dir=cwd, terraform_semantic_version=semantic_version)
tf.read_state_file() tf.read_state_file()
assert tf.tfstate.modules[0]["path"] == ["default"] assert tf.tfstate.modules[0]["path"] == ["default"]
def test_state_default_backend(self): def test_state_default_backend(self):
cwd = os.path.join(current_path, "test_tfstate_file3") cwd = os.path.join(current_path, "test_tfstate_file3")
tf = Terraform(working_dir=cwd, terraform_version=version) tf = Terraform(working_dir=cwd, terraform_semantic_version=semantic_version)
tf.read_state_file() tf.read_state_file()
assert tf.tfstate.modules[0]["path"] == ["default_backend"] assert tf.tfstate.modules[0]["path"] == ["default_backend"]
def test_pre_load_state_data(self): def test_pre_load_state_data(self):
cwd = os.path.join(current_path, "test_tfstate_file") cwd = os.path.join(current_path, "test_tfstate_file")
tf = Terraform(working_dir=cwd, state="tfstate.test", tf = Terraform(working_dir=cwd, state="tfstate.test",
terraform_version=version) terraform_semantic_version=semantic_version)
assert tf.tfstate.modules[0]["path"] == ["root"] assert tf.tfstate.modules[0]["path"] == ["root"]
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -373,7 +373,7 @@ class TestTerraform:
) )
def test_override_default(self, folder, variables): def test_override_default(self, folder, variables):
tf = Terraform(working_dir=current_path, tf = Terraform(working_dir=current_path,
variables=variables, terraform_version=version) variables=variables, terraform_semantic_version=semantic_version)
tf.init(folder) tf.init(folder)
ret, out, err = tf.apply( ret, out, err = tf.apply(
folder, var={"test_var": "test2"}, no_color=IsNotFlagged, folder, var={"test_var": "test2"}, no_color=IsNotFlagged,
@ -389,7 +389,7 @@ class TestTerraform:
required_output = "test_output" required_output = "test_output"
with caplog.at_level(logging.INFO): with caplog.at_level(logging.INFO):
tf = Terraform( tf = Terraform(
working_dir=current_path, variables={"test_var": expected_value}, terraform_version=version working_dir=current_path, variables={"test_var": expected_value}, terraform_semantic_version=semantic_version
) )
tf.init("var_to_output") tf.init("var_to_output")
tf.apply("var_to_output") tf.apply("var_to_output")
@ -403,7 +403,7 @@ class TestTerraform:
def test_destroy(self): def test_destroy(self):
tf = Terraform(working_dir=current_path, variables={ tf = Terraform(working_dir=current_path, variables={
"test_var": "test"}, terraform_version=version) "test_var": "test"}, terraform_semantic_version=semantic_version)
tf.init("var_to_output") tf.init("var_to_output")
ret, out, err = tf.destroy("var_to_output") ret, out, err = tf.destroy("var_to_output")
assert ret == 0 assert ret == 0
@ -414,7 +414,7 @@ class TestTerraform:
) )
def test_plan(self, plan, variables, expected_ret): def test_plan(self, plan, variables, expected_ret):
tf = Terraform(working_dir=current_path, tf = Terraform(working_dir=current_path,
variables=variables, terraform_version=version) variables=variables, terraform_semantic_version=semantic_version)
tf.init(plan) tf.init(plan)
with pytest.raises(TerraformCommandError) as e: with pytest.raises(TerraformCommandError) as e:
tf.plan(plan) tf.plan(plan)
@ -424,7 +424,7 @@ class TestTerraform:
def test_fmt(self, fmt_test_file): def test_fmt(self, fmt_test_file):
tf = Terraform(working_dir=current_path, variables={ tf = Terraform(working_dir=current_path, variables={
"test_var": "test"}, terraform_version=version) "test_var": "test"}, terraform_semantic_version=semantic_version)
ret, out, err = tf.fmt(diff=True) ret, out, err = tf.fmt(diff=True)
assert ret == 0 assert ret == 0