1. add test cases, try tdd
2. refactor to more generic method instead of aws method 3. ready for release to pypi
This commit is contained in:
parent
f1a414760b
commit
2ff59f3630
9 changed files with 268 additions and 164 deletions
2
VERSION
2
VERSION
|
@ -1 +1 @@
|
|||
0.0.4
|
||||
0.7.1
|
|
@ -3,180 +3,186 @@ import os
|
|||
import json
|
||||
import logging
|
||||
|
||||
from python_terraform.tfstate import Tfstate
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Terraform:
|
||||
def __init__(self, targets=None, state='terraform.tfstate', variables=None):
|
||||
"""
|
||||
Wrapper of terraform command line tool
|
||||
https://www.terraform.io/
|
||||
"""
|
||||
|
||||
def __init__(self, working_dir=None,
|
||||
targets=None,
|
||||
state=None,
|
||||
variables=None,
|
||||
parallelism=None,
|
||||
var_file=None):
|
||||
self.working_dir = working_dir
|
||||
self.state = state
|
||||
self.targets = [] if targets is None else targets
|
||||
self.variables = dict() if variables is None else variables
|
||||
|
||||
self.state_filename = state
|
||||
self.state_data = dict()
|
||||
self.parallelism = 50
|
||||
self.parallelism = parallelism
|
||||
self.terraform_bin_path = 'terraform'
|
||||
self.var_file = var_file
|
||||
self.input = False
|
||||
|
||||
def apply(self, targets=None, variables=None, **kargs):
|
||||
# store the tfstate data
|
||||
self.tfstate = dict()
|
||||
|
||||
def apply(self,
|
||||
working_dir=None,
|
||||
no_color=True,
|
||||
**kwargs):
|
||||
"""
|
||||
refer to https://terraform.io/docs/commands/apply.html
|
||||
:param variables: variables in dict type
|
||||
:param targets: targets in list
|
||||
:param working_dir: working folder
|
||||
:param no_color: Disables output with coloring.
|
||||
:returns return_code, stdout, stderr
|
||||
"""
|
||||
variables = self.variables if variables is None else variables
|
||||
targets = self.targets if targets is None else targets
|
||||
if not working_dir:
|
||||
working_dir = self.working_dir
|
||||
|
||||
parameters = []
|
||||
parameters += self._generate_targets(targets)
|
||||
parameters += self._generate_var_string(variables)
|
||||
parameters += self._gen_param_string(kargs)
|
||||
option_dict = dict()
|
||||
option_dict['state'] = self.state
|
||||
option_dict['target'] = self.targets
|
||||
option_dict['var'] = self.variables
|
||||
option_dict['var_file'] = self.var_file
|
||||
option_dict['parallelism'] = self.parallelism
|
||||
if no_color:
|
||||
option_dict['no_color'] = ''
|
||||
option_dict['input'] = self.input
|
||||
|
||||
parameters = \
|
||||
['terraform', 'apply', '-state=%s' % self.state_filename] + parameters
|
||||
option_dict.update(kwargs)
|
||||
|
||||
cmd = ' '.join(parameters)
|
||||
return self._run_cmd(cmd)
|
||||
args = [working_dir] if working_dir else []
|
||||
|
||||
def _gen_param_string(self, kargs):
|
||||
params = []
|
||||
for key, value in kargs.items():
|
||||
if value:
|
||||
params += ['-%s=%s' % (key, value)]
|
||||
else:
|
||||
params += ['-%s' % key]
|
||||
return params
|
||||
ret, out, err = self.cmd('apply', *args, **option_dict)
|
||||
|
||||
def _run_cmd(self, cmd):
|
||||
log.debug('command: ' + cmd)
|
||||
if ret != 0:
|
||||
raise RuntimeError(err)
|
||||
|
||||
p = subprocess.Popen(
|
||||
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
|
||||
def generate_cmd_string(self, cmd, *args, **kwargs):
|
||||
"""
|
||||
for any generate_cmd_string doesn't written as public method of terraform
|
||||
|
||||
examples:
|
||||
1. call import command,
|
||||
ref to https://www.terraform.io/docs/commands/import.html
|
||||
--> generate_cmd_string call:
|
||||
terraform import -input=true aws_instance.foo i-abcd1234
|
||||
--> python call:
|
||||
tf.generate_cmd_string('import', 'aws_instance.foo', 'i-abcd1234', input=True)
|
||||
|
||||
2. call apply command,
|
||||
--> generate_cmd_string call:
|
||||
terraform apply -var='a=b' -var='c=d' -no-color the_folder
|
||||
--> python call:
|
||||
tf.generate_cmd_string('apply', the_folder, no_color='', var={'a':'b', 'c':'d'})
|
||||
|
||||
:param cmd: command and sub-command of terraform, seperated with space
|
||||
refer to https://www.terraform.io/docs/commands/index.html
|
||||
:param args: argument other than options of a command
|
||||
:param kwargs: same as kwags in method 'cmd'
|
||||
:return: string of valid terraform command
|
||||
"""
|
||||
cmds = cmd.split()
|
||||
cmds = [self.terraform_bin_path] + cmds
|
||||
|
||||
for k, v in kwargs.items():
|
||||
if '_' in k:
|
||||
k = k.replace('_', '-')
|
||||
|
||||
if type(v) is list:
|
||||
for sub_v in v:
|
||||
cmds += ['-{k}={v}'.format(k=k, v=sub_v)]
|
||||
continue
|
||||
|
||||
if type(v) is dict:
|
||||
for sub_k, sub_v in v.items():
|
||||
cmds += ["-{k}='{var_k}={var_v}'".format(k=k,
|
||||
var_k=sub_k,
|
||||
var_v=sub_v)]
|
||||
continue
|
||||
|
||||
# simple flag,
|
||||
if v == '':
|
||||
cmds += ['-{k}'.format(k=k)]
|
||||
continue
|
||||
|
||||
if not v:
|
||||
continue
|
||||
|
||||
if type(v) is bool:
|
||||
v = 'true' if v else 'false'
|
||||
|
||||
cmds += ['-{k}={v}'.format(k=k, v=v)]
|
||||
|
||||
cmds += args
|
||||
cmd = ' '.join(cmds)
|
||||
return cmd
|
||||
|
||||
def cmd(self, cmd, *args, **kwargs):
|
||||
"""
|
||||
run a terraform command, if success, will try to read state file
|
||||
:param cmd: command and sub-command of terraform, seperated with space
|
||||
refer to https://www.terraform.io/docs/commands/index.html
|
||||
:param args: argument other than options of a command
|
||||
:param kwargs: any option flag with key value other than variables,
|
||||
if there's a dash in the option name, use under line instead of dash, ex -no-color --> no_color
|
||||
if it's a simple flag with no value, value should be empty string
|
||||
if it's a boolean value flag, assign True or false
|
||||
if it's a flag could be used multiple times, assign list to it's value
|
||||
if it's a "var" variable flag, assign dictionary to it
|
||||
if a value is None, will skip this option
|
||||
:return: ret_code, out, err
|
||||
"""
|
||||
cmd_string = self.generate_cmd_string(cmd, *args, **kwargs)
|
||||
log.debug('command: {c}'.format(c=cmd_string))
|
||||
|
||||
p = subprocess.Popen(cmd_string, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE, shell=True)
|
||||
out, err = p.communicate()
|
||||
ret_code = p.returncode
|
||||
log.debug('output: ' + out)
|
||||
log.debug('output: {o}'.format(o=out))
|
||||
|
||||
if ret_code == 0:
|
||||
log.debug('error: ' + err)
|
||||
self.read_state_file()
|
||||
return ret_code, out, err
|
||||
else:
|
||||
log.warn('error: {e}'.format(e=err))
|
||||
return ret_code, out.decode('utf-8'), err.decode('utf-8')
|
||||
|
||||
def destroy(self, targets=None, variables=None, **kwargs):
|
||||
variables = self.variables if variables is None else variables
|
||||
targets = self.targets if targets is None else targets
|
||||
def output(self, name):
|
||||
"""
|
||||
https://www.terraform.io/docs/commands/output.html
|
||||
:param name: name of output
|
||||
:return: output value
|
||||
"""
|
||||
ret, out, err = self.cmd('output', name, json='')
|
||||
|
||||
parameters = []
|
||||
parameters += self._generate_targets(targets)
|
||||
parameters += self._generate_var_string(variables)
|
||||
if ret != 0:
|
||||
return None
|
||||
out = out.lstrip()
|
||||
|
||||
parameters = \
|
||||
['terraform', 'destroy', '-force', '-state=%s' % self.state_filename] + \
|
||||
parameters
|
||||
cmd = ' '.join(parameters)
|
||||
return self._run_cmd(cmd)
|
||||
output_dict = json.loads(out)
|
||||
return output_dict['value']
|
||||
|
||||
def refresh(self, targets=None, variables=None):
|
||||
variables = self.variables if variables is None else variables
|
||||
targets = self.targets if targets is None else targets
|
||||
|
||||
parameters = []
|
||||
parameters += self._generate_targets(targets)
|
||||
parameters += self._generate_var_string(variables)
|
||||
parameters = \
|
||||
['terraform', 'refresh', '-state=%s' % self.state_filename] + \
|
||||
parameters
|
||||
cmd = ' '.join(parameters)
|
||||
return self._run_cmd(cmd)
|
||||
|
||||
def read_state_file(self):
|
||||
def read_state_file(self, file_path=None):
|
||||
"""
|
||||
read .tfstate file
|
||||
:param file_path: relative path to working dir
|
||||
:return: states file in dict type
|
||||
"""
|
||||
if os.path.exists(self.state_filename):
|
||||
with open(self.state_filename) as f:
|
||||
json_data = json.load(f)
|
||||
self.state_data = json_data
|
||||
log.debug("state_data=%s" % str(self.state_data))
|
||||
return json_data
|
||||
|
||||
return dict()
|
||||
|
||||
def is_any_aws_instance_alive(self):
|
||||
self.refresh()
|
||||
if not os.path.exists(self.state_filename):
|
||||
log.debug("can't find %s " % self.state_data)
|
||||
return False
|
||||
|
||||
self.read_state_file()
|
||||
try:
|
||||
main_module = self._get_main_module()
|
||||
for resource_key, info in main_module['resources'].items():
|
||||
if 'aws_instance' in resource_key:
|
||||
log.debug("%s is found when read state" % resource_key)
|
||||
return True
|
||||
log.debug("no aws_instance found in resource key")
|
||||
return False
|
||||
except KeyError as err:
|
||||
log.debug(str(err))
|
||||
return False
|
||||
except TypeError as err:
|
||||
log.debug(str(err))
|
||||
return False
|
||||
|
||||
def _get_main_module(self):
|
||||
return self.state_data['modules'][0]
|
||||
|
||||
def get_aws_instances(self):
|
||||
instances = dict()
|
||||
|
||||
try:
|
||||
main_module = self._get_main_module()
|
||||
for resource_key, info in main_module['resources'].items():
|
||||
if 'aws_instance' in resource_key:
|
||||
instances[resource_key] = info
|
||||
except KeyError:
|
||||
return instances
|
||||
except TypeError:
|
||||
return instances
|
||||
|
||||
return instances
|
||||
|
||||
def get_aws_instance(self, resource_name):
|
||||
"""
|
||||
:param resource_name:
|
||||
name of terraform resource, make source count is attached
|
||||
:return: return None if not exist, dict type if exist
|
||||
"""
|
||||
try:
|
||||
return self.get_aws_instances()[resource_name]
|
||||
except KeyError:
|
||||
return None
|
||||
|
||||
def get_output_value(self, output_name):
|
||||
"""
|
||||
|
||||
:param output_name:
|
||||
:return:
|
||||
"""
|
||||
try:
|
||||
main_module = self._get_main_module()
|
||||
return main_module['outputs'][output_name]
|
||||
except KeyError:
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _generate_var_string(d):
|
||||
str_t = []
|
||||
for k, v in d.iteritems():
|
||||
str_t += ['-var'] + ["%s=%s" % (k, v)]
|
||||
|
||||
return str_t
|
||||
|
||||
@staticmethod
|
||||
def _generate_targets(targets):
|
||||
str_t = []
|
||||
for t in targets:
|
||||
str_t += ['-target=%s' % t]
|
||||
return str_t
|
||||
if not file_path:
|
||||
file_path = self.state
|
||||
|
||||
if not file_path:
|
||||
file_path = 'terraform.tfstate'
|
||||
|
||||
if self.working_dir:
|
||||
file_path = os.path.join(self.working_dir, file_path)
|
||||
|
||||
self.tfstate = Tfstate.load_file(file_path)
|
||||
|
|
31
python_terraform/tfstate.py
Normal file
31
python_terraform/tfstate.py
Normal file
|
@ -0,0 +1,31 @@
|
|||
import json
|
||||
import os
|
||||
import logging
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Tfstate(object):
|
||||
def __init__(self, data=None):
|
||||
self.tfstate_file = None
|
||||
self.native_data = data
|
||||
if data:
|
||||
self.__dict__ = data
|
||||
|
||||
@staticmethod
|
||||
def load_file(file_path):
|
||||
"""
|
||||
Read the tfstate file and load its contents, parses then as JSON and put the result into the object
|
||||
"""
|
||||
log.debug('read data from {0}'.format(file_path))
|
||||
if os.path.exists(file_path):
|
||||
with open(file_path) as f:
|
||||
json_data = json.load(f)
|
||||
|
||||
tf_state = Tfstate(json_data)
|
||||
tf_state.tfstate_file = file_path
|
||||
return tf_state
|
||||
|
||||
log.warn('{0} is not exist'.format(file_path))
|
||||
|
||||
return Tfstate()
|
2
setup.cfg
Normal file
2
setup.cfg
Normal file
|
@ -0,0 +1,2 @@
|
|||
[wheel]
|
||||
universal = 1
|
4
setup.py
4
setup.py
|
@ -38,8 +38,8 @@ setup(
|
|||
# 'Development Status :: 1 - Planning',
|
||||
# 'Development Status :: 2 - Pre-Alpha',
|
||||
# 'Development Status :: 3 - Alpha',
|
||||
'Development Status :: 4 - Beta',
|
||||
# 'Development Status :: 5 - Production/Stable',
|
||||
# 'Development Status :: 4 - Beta',
|
||||
'Development Status :: 5 - Production/Stable',
|
||||
# 'Development Status :: 6 - Mature',
|
||||
# 'Development Status :: 7 - Inactive',
|
||||
'Environment :: Console',
|
||||
|
|
8
test/apply_tf/test.tf
Normal file
8
test/apply_tf/test.tf
Normal file
|
@ -0,0 +1,8 @@
|
|||
variable "test_var" {}
|
||||
|
||||
provider "archive" {
|
||||
}
|
||||
|
||||
output "test_output" {
|
||||
value = "${var.test_var}"
|
||||
}
|
|
@ -10,15 +10,4 @@ provider "aws" {
|
|||
resource "aws_instance" "ubuntu-1404" {
|
||||
ami = "ami-9abea4fb"
|
||||
instance_type = "t2.micro"
|
||||
security_groups = ["terraform-salty-splunk"]
|
||||
tags {
|
||||
Name = "python-terraform-test"
|
||||
}
|
||||
// key_name = "${aws_key_pair.key.key_name}"
|
||||
// connection {
|
||||
// type = "ssh"
|
||||
// user = "ubuntu"
|
||||
// key_file = "${var.key_path}"
|
||||
// timeout = "10m"
|
||||
// }
|
||||
}
|
|
@ -1,15 +1,83 @@
|
|||
from python_terraform import Terraform
|
||||
import pytest
|
||||
import os
|
||||
import logging
|
||||
import re
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
ACCESS_KEY = os.environ['PACKER_access_key']
|
||||
SECRET_KEY = os.environ['PACKER_secret_key']
|
||||
|
||||
STRING_CASES = [
|
||||
[
|
||||
lambda x: x.generate_cmd_string('apply', 'the_folder',
|
||||
no_color='',
|
||||
var={'a': 'b', 'c': 'd'}),
|
||||
"terraform apply -var='a=b' -var='c=d' -no-color the_folder"
|
||||
],
|
||||
[
|
||||
lambda x: x.generate_cmd_string('push', 'path',
|
||||
var={'a': 'b'}, vcs=True,
|
||||
token='token',
|
||||
atlas_address='url'),
|
||||
"terraform push -var='a=b' -vcs=true -token=token -atlas-address=url path"
|
||||
],
|
||||
]
|
||||
|
||||
CMD_CASES = [
|
||||
['method', 'expected_output'],
|
||||
[
|
||||
[
|
||||
lambda x: x.cmd('plan', 'aws_tf', no_color='', var={'access_key': ACCESS_KEY, 'secret_key': SECRET_KEY}) ,
|
||||
'Plan: 1 to add, 0 to change, 0 to destroy'
|
||||
]
|
||||
]
|
||||
]
|
||||
|
||||
|
||||
class TestTerraform:
|
||||
def test_apply_and_destory(self):
|
||||
def teardown_method(self, method):
|
||||
""" teardown any state that was previously setup with a setup_method
|
||||
call.
|
||||
"""
|
||||
|
||||
def purge(dir, pattern):
|
||||
for f in os.listdir(dir):
|
||||
if re.search(pattern, f):
|
||||
if os.path.isfile(f):
|
||||
os.remove(os.path.join(dir, f))
|
||||
|
||||
purge('.', '.tfstate')
|
||||
|
||||
@pytest.mark.parametrize([
|
||||
"method", "expected"
|
||||
], STRING_CASES)
|
||||
def test_generate_cmd_string(self, method, expected):
|
||||
tf = Terraform()
|
||||
ret_code, out, err = tf.apply()
|
||||
result = method(tf)
|
||||
|
||||
print out
|
||||
print err
|
||||
# assert ret_code, 0
|
||||
strs = expected.split()
|
||||
for s in strs:
|
||||
assert s in result
|
||||
|
||||
ret_code, out, err = tf.destroy()
|
||||
@pytest.mark.parametrize(*CMD_CASES)
|
||||
def test_cmd(self, method, expected_output):
|
||||
tf = Terraform()
|
||||
ret, out, err = method(tf)
|
||||
assert expected_output in out
|
||||
assert ret == 0
|
||||
|
||||
assert ret_code, 0
|
||||
def test_state_data(self):
|
||||
tf = Terraform(working_dir='test_tfstate_file')
|
||||
tf.read_state_file()
|
||||
assert tf.tfstate.modules[0]['path'] == ['root']
|
||||
|
||||
def test_apply(self):
|
||||
tf = Terraform(working_dir='apply_tf', variables={'test_var': 'test'})
|
||||
tf.apply(var={'test_var': 'test2'})
|
||||
|
||||
def test_get_output(self):
|
||||
tf = Terraform(working_dir='apply_tf', variables={'test_var': 'test'})
|
||||
tf.apply()
|
||||
assert tf.output('test_output') == 'test'
|
||||
|
|
Loading…
Reference in a new issue