introduce Credentials

merge-requests/12/head
Michael Jerger 12 months ago
parent 73e73e8d3d
commit 4baa012918

@ -39,6 +39,18 @@ classDiagram
release_current_branch
version
}
class Credentials {
}
class CredentialMapping {
name
gopass_path
gopass_field
gopass_type()
name_for_input()
name_for_environment ()
}
class BuildFile {
<<AggregateRoot>>
@ -60,10 +72,12 @@ classDiagram
Devops *-- "0..1" Image: spcialized_builds
Devops *-- "0..1" C4k: spcialized_builds
Devops *-- "0..1" Release: mixins
Devops *-- "0..1" Credentials: mixins
Release o-- "0..1" BuildFile: primary_build_file
Release o-- "0..n" BuildFile: secondary_build_files
BuildFile *-- "1" Version
C4k *-- DnsRecord
Credentials *-- "0..n" CredentialMapping: mappings
```

@ -4,4 +4,5 @@ setuptools
dda-python-terraform==2.0.1
packaging
boto3
pyyaml
pyyaml
inflection

@ -48,31 +48,31 @@ class C4kBuild(DevopsBuild):
def __init__(self, project, config):
super().__init__(project, config)
self.execution_api = ExecutionApi()
devops = self.repo.get_devops(self.project)
devops = self.devops_repo.get_devops(self.project)
if BuildType.C4K not in devops.specialized_builds:
raise ValueError(f"C4kBuild requires BuildType.C4K")
def update_runtime_config(self, dns_record: DnsRecord):
devops = self.repo.get_devops(self.project)
devops = self.devops_repo.get_devops(self.project)
devops.specialized_builds[BuildType.C4K].update_runtime_config(dns_record)
self.repo.set_devops(self.project, devops)
self.devops_repo.set_devops(self.project, devops)
def write_c4k_config(self):
devops = self.repo.get_devops(self.project)
devops = self.devops_repo.get_devops(self.project)
path = devops.build_path() + "/out_c4k_config.yaml"
self.file_api.write_yaml_to_file(
path, devops.specialized_builds[BuildType.C4K].config()
)
def write_c4k_auth(self):
devops = self.repo.get_devops(self.project)
devops = self.devops_repo.get_devops(self.project)
path = devops.build_path() + "/out_c4k_auth.yaml"
self.file_api.write_yaml_to_file(
path, devops.specialized_builds[BuildType.C4K].auth()
)
def c4k_apply(self, dry_run=False):
devops = self.repo.get_devops(self.project)
devops = self.devops_repo.get_devops(self.project)
return self.execution_api.execute(
devops.specialized_builds[BuildType.C4K].command(devops), dry_run
)

@ -3,6 +3,7 @@ from .devops_factory import DevopsFactory
from .image import Image
from .c4k import C4k
from .release import Release, EnvironmentKeys
from .credentials import Credentials, CredentialMapping, GopassType
from .version import Version
from .build_file import BuildFileType, BuildFile
from .init_service import InitService

@ -1,7 +1,6 @@
import deprecation
from enum import Enum
from typing import List, TypedDict
import logging
import deprecation
@ -16,6 +15,7 @@ class BuildType(Enum):
class MixinType(Enum):
RELEASE = 0
CREDENTIALS = 1
class ReleaseType(Enum):

@ -0,0 +1,56 @@
import deprecation
from enum import Enum
from typing import List, TypedDict
from inflection import underscore
import deprecation
from .common import (
Validateable,
)
class GopassType(Enum):
FIELD = 0
PASSWORD = 1
class CredentialMapping(Validateable):
def __init__(self, mapping: dict):
self.name = mapping.get("name", None)
self.gopass_field = mapping.get("gopass_field", None)
self.gopass_path = mapping.get("gopass_path", None)
def validate(self) -> List[str]:
result = []
result += self.__validate_is_not_empty__("gopass_path")
if not self.name and not self.gopass_field:
result.append(f"Either name or gopass field has to be defined.")
return result
def gopass_type(self):
if self.gopass_field:
return GopassType.FIELD
else:
return GopassType.PASSWORD
def name_for_input(self):
if self.name:
return self.name
else:
return underscore(self.gopass_field)
def name_for_environment(self):
return self.name_for_input().upper()
class Credentials(Validateable):
def __init__(self, input: dict):
input_mappings = input.get("credentials_mapping", [])
self.mappings = []
for input_mapping in input_mappings:
self.mappings.append(CredentialMapping(input_mapping))
def validate(self) -> List[str]:
result = []
for mapping in self.mappings:
result += mapping.validate()
return result

@ -25,6 +25,19 @@ class DevopsFactory:
mixins = {}
if MixinType.RELEASE in mixin_types:
mixins[MixinType.RELEASE] = Release(input, version)
if MixinType.CREDENTIALS in mixin_types:
if BuildType.C4K in build_types:
default_mappings = [
{
"gopass_path": "server/meissa/grafana-cloud",
"gopass_field": "grafana-cloud-user",
},
{
"gopass_path": "server/meissa/grafana-cloud",
"name": "grafana_cloud_password",
}
]
mixins[MixinType.CREDENTIALS] = Credentials(input, version)
devops = Devops(input, specialized_builds=specialized_builds, mixins=mixins)

@ -11,7 +11,7 @@ class Version(Validateable):
snapshot_parsed = input_str.split("-")
version_str = snapshot_parsed[0]
suffix_str = None
if len(snapshot_parsed) > 1:
if len(snapshot_parsed) > 1:
suffix_str = snapshot_parsed[1]
version_no_parsed = [int(x) for x in version_str.split(".")]
return cls(
@ -21,9 +21,9 @@ class Version(Validateable):
)
def __init__(
self,
version_list: list,
snapshot_suffix: Optional[str] = None,
self,
version_list: list,
snapshot_suffix: Optional[str] = None,
version_str: Optional[str] = None,
):
self.version_list = version_list
@ -33,6 +33,9 @@ class Version(Validateable):
def __eq__(self, other):
return other and self.to_string() == other.to_string()
def __hash__(self) -> int:
return self.to_string().__hash__()
def is_snapshot(self):
return not self.snapshot_suffix == None
@ -47,17 +50,27 @@ class Version(Validateable):
result += self.__validate_is_not_empty__("version_list")
if self.version_list and len(self.version_list) < 3:
result += [f"version_list must have at least 3 levels."]
if self.version_list and self.version_string and self.to_string() != self.version_string:
result += [f"version_string not parsed correct. Input was {self.version_string} parsed was {self.to_string()}"]
if (
self.version_list
and self.version_string
and self.to_string() != self.version_string
):
result += [
f"version_string not parsed correct. Input was {self.version_string} parsed was {self.to_string()}"
]
return result
def create_bump(self, snapshot_suffix: str = None):
new_version_list = self.version_list.copy()
if self.is_snapshot():
return Version(new_version_list, snapshot_suffix=self.snapshot_suffix, version_str=None)
return Version(
new_version_list, snapshot_suffix=self.snapshot_suffix, version_str=None
)
else:
new_version_list[2] += 1
return Version(new_version_list, snapshot_suffix=snapshot_suffix, version_str=None)
return Version(
new_version_list, snapshot_suffix=snapshot_suffix, version_str=None
)
def create_patch(self):
new_version_list = self.version_list.copy()
@ -78,7 +91,7 @@ class Version(Validateable):
def create_major(self):
new_version_list = self.version_list.copy()
if self.is_snapshot() and new_version_list[2] == 0 and new_version_list[1] == 0 :
if self.is_snapshot() and new_version_list[2] == 0 and new_version_list[1] == 0:
return Version(new_version_list, snapshot_suffix=None, version_str=None)
else:
new_version_list[2] = 0

@ -0,0 +1,107 @@
import pytest
from pathlib import Path
from src.main.python.ddadevops.domain import (
CredentialMapping,
Credentials,
GopassType,
MixinType,
)
from .helper import build_devops
def test_should_create_mapping():
sut = CredentialMapping(
{
"gopass_path": "server/meissa/grafana-cloud",
"gopass_field": "grafana-cloud-user",
}
)
assert "grafana_cloud_user" == sut.name_for_input()
assert "GRAFANA_CLOUD_USER" == sut.name_for_environment()
assert GopassType.FIELD == sut.gopass_type()
sut = CredentialMapping(
{
"gopass_path": "server/meissa/grafana-cloud",
"name": "grafana_cloud_password",
}
)
assert "grafana_cloud_password" == sut.name_for_input()
assert "GRAFANA_CLOUD_PASSWORD" == sut.name_for_environment()
assert GopassType.PASSWORD == sut.gopass_type()
def test_should_validate_CredentialMapping():
sut = CredentialMapping(
{
"gopass_path": "server/meissa/grafana-cloud",
"gopass_field": "grafana-cloud-user",
}
)
assert sut.is_valid()
sut = CredentialMapping(
{
"gopass_path": "server/meissa/grafana-cloud",
"name": "grafana_cloud_user",
}
)
assert sut.is_valid()
sut = CredentialMapping(
{
"gopass_path": "server/meissa/grafana-cloud",
}
)
assert not sut.is_valid()
def test_should_create_credentials():
sut = Credentials(
{
"credentials_mapping": [
{
"gopass_path": "server/meissa/grafana-cloud",
"gopass_field": "grafana-cloud-user",
},
{
"gopass_path": "server/meissa/grafana-cloud",
"name": "grafana_cloud_password",
},
],
}
)
assert sut
def test_should_validate_credentials():
sut = Credentials(
{
"credentials_mapping": [
{
"gopass_path": "server/meissa/grafana-cloud",
"gopass_field": "grafana-cloud-user",
},
{
"gopass_path": "server/meissa/grafana-cloud",
"name": "grafana_cloud_password",
},
],
}
)
assert sut.is_valid()
sut = Credentials(
{
"credentials_mapping": [
{
"gopass_path": "server/meissa/grafana-cloud",
"gopass_field": "grafana-cloud-user",
},
{
"gopass_path": "server/meissa/grafana-cloud"
},
],
}
)
assert not sut.is_valid()

@ -6,4 +6,4 @@ from .helper import build_devops
def test_devops_buildpath():
sut = build_devops({'module': "cloud", 'name': "meissa"})
assert "../../../target/meissa/cloud" == sut.build_path()
assert "root_path/target/meissa/cloud" == sut.build_path()

@ -2,22 +2,20 @@ import os
from pybuilder.core import Project
from src.main.python.ddadevops.domain import DnsRecord
from src.main.python.ddadevops.c4k_build import C4kBuild, add_c4k_mixin_config
from .domain.test_helper import devops_config
from .domain.helper import devops_config
def test_c4k_mixin(tmp_path):
build_dir = "build"
project_name = "testing-project"
module_name = "c4k-test"
tmp_path_str = str(tmp_path)
str_tmp_path = str(tmp_path)
project = Project(str_tmp_path, name="name")
project = Project(tmp_path_str, name=project_name)
sut = C4kBuild(
project,
devops_config(
{
"project_root_path": str_tmp_path,
"mixin_types": [],
"build_types": ["C4K"],
"project_root_path": tmp_path_str,
"module": "c4k-test",
"c4k_config": {"a": 1, "b": 2},
"c4k_auth": {"c": 3, "d": 4},
@ -28,7 +26,7 @@ def test_c4k_mixin(tmp_path):
)
sut.initialize_build_dir()
assert sut.build_path() == f"{tmp_path_str}/target/name/c4k-test"
assert sut.build_path() == f"{str_tmp_path}/target/name/c4k-test"
sut.update_runtime_config(DnsRecord("test.de", ipv6="::1"))
sut.write_c4k_config()

Loading…
Cancel
Save