This commit is contained in:
Michael Jerger 2023-05-26 17:03:56 +02:00
parent 0b577597e8
commit 158d8a3b37

View file

@ -1,81 +1,81 @@
from boto3 import Session # from boto3 import Session
from .python_util import execute # from .python_util import execute
from .aws_backend_properties_mixin import AwsBackendPropertiesMixin # from .aws_backend_properties_mixin import AwsBackendPropertiesMixin
def add_aws_mfa_mixin_config(config, account_id, region, # def add_aws_mfa_mixin_config(config, account_id, region,
mfa_role='developer', mfa_account_prefix='', # mfa_role='developer', mfa_account_prefix='',
mfa_login_account_suffix='main'): # mfa_login_account_suffix='main'):
config.update({'AwsMfaMixin': # config.update({'AwsMfaMixin':
{'account_id': account_id, # {'account_id': account_id,
'region': region, # 'region': region,
'mfa_role': mfa_role, # 'mfa_role': mfa_role,
'mfa_account_prefix': mfa_account_prefix, # 'mfa_account_prefix': mfa_account_prefix,
'mfa_login_account_suffix': mfa_login_account_suffix}}) # 'mfa_login_account_suffix': mfa_login_account_suffix}})
return config # return config
class AwsMfaMixin(AwsBackendPropertiesMixin): # class AwsMfaMixin(AwsBackendPropertiesMixin):
def __init__(self, project, config): # def __init__(self, project, config):
super().__init__(project, config) # super().__init__(project, config)
project.build_depends_on('boto3') # project.build_depends_on('boto3')
project.build_depends_on('mfa') # project.build_depends_on('mfa')
aws_mfa_mixin_config = config['AwsMfaMixin'] # aws_mfa_mixin_config = config['AwsMfaMixin']
self.account_id = aws_mfa_mixin_config['account_id'] # self.account_id = aws_mfa_mixin_config['account_id']
self.region = aws_mfa_mixin_config['region'] # self.region = aws_mfa_mixin_config['region']
self.mfa_role = aws_mfa_mixin_config['mfa_role'] # self.mfa_role = aws_mfa_mixin_config['mfa_role']
self.mfa_account_prefix = aws_mfa_mixin_config['mfa_account_prefix'] # self.mfa_account_prefix = aws_mfa_mixin_config['mfa_account_prefix']
self.mfa_login_account_suffix = aws_mfa_mixin_config['mfa_login_account_suffix'] # self.mfa_login_account_suffix = aws_mfa_mixin_config['mfa_login_account_suffix']
def project_vars(self): # def project_vars(self):
ret = super().project_vars() # ret = super().project_vars()
ret.update({'account_name': self.account_name, # ret.update({'account_name': self.account_name,
'account_id': self.account_id, # 'account_id': self.account_id,
'region': self.region, # 'region': self.region,
'mfa_role': self.mfa_role, # 'mfa_role': self.mfa_role,
'mfa_account_prefix': self.mfa_account_prefix, # 'mfa_account_prefix': self.mfa_account_prefix,
'mfa_login_account_suffix': self.mfa_login_account_suffix}) # 'mfa_login_account_suffix': self.mfa_login_account_suffix})
return ret # return ret
def get_username_from_account(self, p_account_name): # def get_username_from_account(self, p_account_name):
login_id = execute(r'cat ~/.aws/accounts | grep -A 2 "\[' + p_account_name + # login_id = execute(r'cat ~/.aws/accounts | grep -A 2 "\[' + p_account_name +
r'\]" | grep username | awk -F= \'{print $2}\'', shell=True) # r'\]" | grep username | awk -F= \'{print $2}\'', shell=True)
return login_id # return login_id
def get_account_id_from_account(self, p_account_name): # def get_account_id_from_account(self, p_account_name):
account_id = execute(r'cat ~/.aws/accounts | grep -A 2 "\[' + p_account_name + # account_id = execute(r'cat ~/.aws/accounts | grep -A 2 "\[' + p_account_name +
r'\]" | grep account | awk -F= \'{print $2}\'', shell=True) # r'\]" | grep account | awk -F= \'{print $2}\'', shell=True)
return account_id # return account_id
def get_mfa(self, mfa_path='aws'): # def get_mfa(self, mfa_path='aws'):
mfa_token = execute('mfa otp ' + mfa_path, shell=True) # mfa_token = execute('mfa otp ' + mfa_path, shell=True)
return mfa_token # return mfa_token
def write_aws_config(self, to_profile, key, secret): # def write_aws_config(self, to_profile, key, secret):
execute('aws configure --profile ' + to_profile + # execute('aws configure --profile ' + to_profile +
' set ' + key + ' ' + secret, shell=True) # ' set ' + key + ' ' + secret, shell=True)
def get_mfa_session(self): # def get_mfa_session(self):
from_account_name = self.mfa_account_prefix + self.mfa_login_account_suffix # from_account_name = self.mfa_account_prefix + self.mfa_login_account_suffix
from_account_id = self.get_account_id_from_account(from_account_name) # from_account_id = self.get_account_id_from_account(from_account_name)
to_account_name = self.mfa_account_prefix + self.account_name # to_account_name = self.mfa_account_prefix + self.account_name
to_account_id = self.get_account_id_from_account(to_account_name) # to_account_id = self.get_account_id_from_account(to_account_name)
login_id = self.get_username_from_account(from_account_name) # login_id = self.get_username_from_account(from_account_name)
mfa_token = self.get_mfa() # mfa_token = self.get_mfa()
ses = Session(profile_name=from_account_name) # ses = Session(profile_name=from_account_name)
sts_client = ses.client('sts') # sts_client = ses.client('sts')
response = sts_client.assume_role( # response = sts_client.assume_role(
RoleArn='arn:aws:iam::' + to_account_id + ':role/' + self.mfa_role, # RoleArn='arn:aws:iam::' + to_account_id + ':role/' + self.mfa_role,
RoleSessionName=to_account_id + '-' + self.account_name + '-' + self.mfa_role, # RoleSessionName=to_account_id + '-' + self.account_name + '-' + self.mfa_role,
SerialNumber='arn:aws:iam::' + from_account_id + ':mfa/' + login_id, # SerialNumber='arn:aws:iam::' + from_account_id + ':mfa/' + login_id,
TokenCode=mfa_token # TokenCode=mfa_token
) # )
self.write_aws_config(to_account_name, 'aws_access_key_id', # self.write_aws_config(to_account_name, 'aws_access_key_id',
response['Credentials']['AccessKeyId']) # response['Credentials']['AccessKeyId'])
self.write_aws_config(to_account_name, 'aws_secret_access_key', # self.write_aws_config(to_account_name, 'aws_secret_access_key',
response['Credentials']['SecretAccessKey']) # response['Credentials']['SecretAccessKey'])
self.write_aws_config(to_account_name, 'aws_session_token', # self.write_aws_config(to_account_name, 'aws_session_token',
response['Credentials']['SessionToken']) # response['Credentials']['SessionToken'])
print('got token') # print('got token')