This commit is contained in:
bom 2023-02-24 10:31:04 +01:00
parent 74925a90b7
commit 0551fe6bfe
12 changed files with 240 additions and 233 deletions

View file

@ -1,69 +0,0 @@
import os
import subprocess as sub
from pathlib import Path
from system_repository import SystemRepository
from domain import ReleaseType
class GitRepository():
def __init__(self):
self.latest_commit = None
self.system_repository = SystemRepository()
@classmethod
def create_from_commit_string(cls, commit_string):
inst = cls()
inst.latest_commit = commit_string
return inst
def get_latest_n_commits(self, n: int):
self.system_repository.run_checked('git', 'log', '--oneline', '--format="%s %b"', f'-n {n}')
return self.system_repository.stdout
def get_latest_commit(self):
output = self.get_latest_n_commits(1)
self.latest_commit = " ".join(output) # returns a list of strings otherwise
return self.latest_commit
def get_release_type_from_latest_commit(self):
if self.latest_commit is None:
self.get_latest_commit()
if ReleaseType.MAJOR.name in self.latest_commit.upper():
return ReleaseType.MAJOR
elif ReleaseType.MINOR.name in self.latest_commit.upper():
return ReleaseType.MINOR
elif ReleaseType.PATCH.name in self.latest_commit.upper():
return ReleaseType.PATCH
elif ReleaseType.SNAPSHOT.name in self.latest_commit.upper():
return ReleaseType.SNAPSHOT
else:
return None
def tag_annotated(self, annotation: str, message: str):
self.system_repository.run_checked('git', 'tag', '-a', annotation, '-m', message)
return self.system_repository.stdout
def tag_annotated(self, annotation: str, message: str, count: int):
self.system_repository.run_checked('git', 'tag', '-a', annotation, '-m', message, f'HEAD~{count}')
return self.system_repository.stdout
def get_current_branch(self):
self.system_repository.run_checked('git', 'branch', '--show-current')
return ''.join(self.system_repository.stdout).rstrip()
def add_file(self, file_path: Path):
self.system_repository.run_checked('git', 'add', file_path)
return self.system_repository.stdout
def commit(self, commit_message: str):
self.system_repository.run_checked('git', 'commit', '-m', commit_message)
return self.system_repository.stdout
def push(self):
self.system_repository.run_checked('git', 'push')
return self.system_repository.stdout
def checkout(self, branch: str):
self.system_repository.run_checked('git', 'checkout', branch)
return self.system_repository.stdout

View file

@ -1,8 +1,107 @@
from domain import Release
from domain import Release, Version, ReleaseType
from infrastructure_api import FileHandler, SystemAPI
from pathlib import Path
class VersionRepository():
def __init__(self, file):
self.file = file
self.file_handler = None
def load_file(self):
self.file_handler = FileHandler.from_file_path(self.file)
return self.file_handler
def write_file(self, version_string):
if self.file_handler is None:
raise Exception('Version was not created by load_file method.')
else:
self.file_handler.write(version_string)
def parse_file(self):
version_list, is_snapshot = self.file_handler.parse()
return version_list, is_snapshot
def get_version(self) -> Version:
self.file_handler = self.load_file()
version_list, is_snapshot = self.parse_file()
version = Version(version_list)
version.is_snapshot = is_snapshot
return version
class ReleaseRepository():
def __init__(self):
pass
def __init__(self, version_repository: VersionRepository):
self.version_repository = version_repository
def get_current_release(self) -> Release:
pass
class ReleaseTypeRepository():
def __init__(self):
pass
class GitRepository():
def __init__(self):
self.latest_commit = None
self.system_repository = SystemAPI()
@classmethod
def create_from_commit_string(cls, commit_string):
inst = cls()
inst.latest_commit = commit_string
return inst
def get_latest_n_commits(self, n: int):
self.system_repository.run_checked('git', 'log', '--oneline', '--format="%s %b"', f'-n {n}')
return self.system_repository.stdout
def get_latest_commit(self):
output = self.get_latest_n_commits(1)
self.latest_commit = " ".join(output) # returns a list of strings otherwise
return self.latest_commit
def get_release_type_from_latest_commit(self):
if self.latest_commit is None:
self.get_latest_commit()
if ReleaseType.MAJOR.name in self.latest_commit.upper():
return ReleaseType.MAJOR
elif ReleaseType.MINOR.name in self.latest_commit.upper():
return ReleaseType.MINOR
elif ReleaseType.PATCH.name in self.latest_commit.upper():
return ReleaseType.PATCH
elif ReleaseType.SNAPSHOT.name in self.latest_commit.upper():
return ReleaseType.SNAPSHOT
else:
return None
def tag_annotated(self, annotation: str, message: str):
self.system_repository.run_checked('git', 'tag', '-a', annotation, '-m', message)
return self.system_repository.stdout
def tag_annotated(self, annotation: str, message: str, count: int):
self.system_repository.run_checked('git', 'tag', '-a', annotation, '-m', message, f'HEAD~{count}')
return self.system_repository.stdout
def get_current_branch(self):
self.system_repository.run_checked('git', 'branch', '--show-current')
return ''.join(self.system_repository.stdout).rstrip()
def add_file(self, file_path: Path):
self.system_repository.run_checked('git', 'add', file_path)
return self.system_repository.stdout
def commit(self, commit_message: str):
self.system_repository.run_checked('git', 'commit', '-m', commit_message)
return self.system_repository.stdout
def push(self):
self.system_repository.run_checked('git', 'push')
return self.system_repository.stdout
def checkout(self, branch: str):
self.system_repository.run_checked('git', 'checkout', branch)
return self.system_repository.stdout

View file

@ -1,6 +1,7 @@
from abc import ABC, abstractmethod
import json
import re
import subprocess as sub
class FileHandler(ABC):
@ -153,3 +154,24 @@ class ClojureFileHandler(FileHandler):
clj_file.write(version_substitute)
clj_file.write(clj_rest)
clj_file.truncate()
class SystemAPI():
def __init__(self):
self.stdout = [""]
self.stderr = [""]
def run(self, args):
stream = sub.Popen(args,
stdout=sub.PIPE,
stderr=sub.PIPE,
text=True,
encoding="UTF-8")
self.stdout = stream.stdout.readlines()
self.stderr = stream.stderr.readlines()
def run_checked(self, *args):
self.run(args)
if len(self.stderr) > 0:
raise Exception(f"Command failed with: {self.stderr}")

View file

@ -2,9 +2,8 @@ import copy
from ddadevops import DevopsBuild
from ddadevops import execute
from ddadevops import gopass_field_from_path, gopass_password_from_path
from version_repository import VersionRepository
from infrastructure import VersionRepository, GitRepository
from services import InitReleaseService, PrepareReleaseService, TagAndPushReleaseService
from git_repository import GitRepository
from domain import ReleaseType, Version
def create_release_mixin_config(config_file, main_branch) -> dict:

View file

@ -1,8 +1,6 @@
from pathlib import Path
from version_repository import VersionRepository
from release_type import ReleaseType
from git_repository import GitRepository
from domain import Version
from infrastructure import VersionRepository, GitRepository
from domain import Version, ReleaseType
class InitReleaseService():

View file

@ -1,22 +0,0 @@
import subprocess as sub
class SystemRepository():
def __init__(self):
self.stdout = [""]
self.stderr = [""]
def run(self, args):
stream = sub.Popen(args,
stdout=sub.PIPE,
stderr=sub.PIPE,
text=True,
encoding="UTF-8")
self.stdout = stream.stdout.readlines()
self.stderr = stream.stderr.readlines()
def run_checked(self, *args):
self.run(args)
if len(self.stderr) > 0:
raise Exception(f"Command failed with: {self.stderr}")

53
test/test_domain.py Normal file
View file

@ -0,0 +1,53 @@
from pathlib import Path
import sys
import os
# getting the name of the directory
# where the this file is present.
current = os.path.dirname(os.path.realpath(__file__))
# Getting the parent directory name
# where the current directory is present.
parent = os.path.dirname(current)
# adding the parent directory to
# the sys.path.
sys.path.append(parent)
# now we can import the module in the parent
# directory.
from domain import Version, ReleaseType
from infrastructure import VersionRepository
def test_version():
version = Version([1, 2, 3])
version.increment(ReleaseType.SNAPSHOT)
assert version.get_version_string() == "1.2.3-SNAPSHOT"
assert version.version_list == [1, 2, 3]
assert version.is_snapshot
version = Version([1, 2, 3])
version.increment(ReleaseType.BUMP)
assert version.get_version_string() == "1.2.4-SNAPSHOT"
assert version.version_list == [1, 2, 4]
assert version.is_snapshot
version = Version([1, 2, 3])
version.increment(ReleaseType.PATCH)
assert version.get_version_string() == "1.2.4"
assert version.version_list == [1, 2, 4]
assert not version.is_snapshot
version = Version([1, 2, 3])
version.increment(ReleaseType.MINOR)
assert version.get_version_string() == "1.3.0"
assert version.version_list == [1, 3, 0]
assert not version.is_snapshot
version = Version([1, 2, 3])
version.increment(ReleaseType.MAJOR)
assert version.get_version_string() == "2.0.0"
assert version.version_list == [2, 0, 0]
assert not version.is_snapshot

View file

@ -1,64 +0,0 @@
from pathlib import Path
import sys
import os
# getting the name of the directory
# where the this file is present.
current = os.path.dirname(os.path.realpath(__file__))
# Getting the parent directory name
# where the current directory is present.
parent = os.path.dirname(current)
# adding the parent directory to
# the sys.path.
sys.path.append(parent)
# now we can import the module in the parent
# directory.
from git_repository import GitRepository
from release_type import ReleaseType
def test_git_repository():
# init
commit_string = "Major bla"
repo = GitRepository.create_from_commit_string(commit_string)
release_type = repo.get_release_type_from_latest_commit()
#test
assert release_type == ReleaseType.MAJOR
# init
commit_string = "MINOR bla"
repo = GitRepository.create_from_commit_string(commit_string)
release_type = repo.get_release_type_from_latest_commit()
#test
assert release_type == ReleaseType.MINOR
# init
commit_string = "PATCH bla"
repo = GitRepository.create_from_commit_string(commit_string)
release_type = repo.get_release_type_from_latest_commit()
# test
assert release_type == ReleaseType.PATCH
# init
commit_string = "SNAPSHOT bla"
repo = GitRepository.create_from_commit_string(commit_string)
release_type = repo.get_release_type_from_latest_commit()
#test
assert release_type == ReleaseType.SNAPSHOT
# init
commit_string = "bla"
repo = GitRepository.create_from_commit_string(commit_string)
release_type = repo.get_release_type_from_latest_commit()
#test
assert release_type == None

View file

@ -17,42 +17,51 @@ sys.path.append(parent)
# now we can import the module in the parent
# directory.
from version import Version
from version_repository import VersionRepository
from release_type import ReleaseType
from infrastructure import GitRepository, VersionRepository, ReleaseRepository
from domain import ReleaseType, Release
def test_version():
version = Version([1, 2, 3])
def test_git_repository():
version.increment(ReleaseType.SNAPSHOT)
assert version.get_version_string() == "1.2.3-SNAPSHOT"
assert version.version_list == [1, 2, 3]
assert version.is_snapshot
# init
commit_string = "Major bla"
repo = GitRepository.create_from_commit_string(commit_string)
release_type = repo.get_release_type_from_latest_commit()
version = Version([1, 2, 3])
version.increment(ReleaseType.BUMP)
assert version.get_version_string() == "1.2.4-SNAPSHOT"
assert version.version_list == [1, 2, 4]
assert version.is_snapshot
#test
assert release_type == ReleaseType.MAJOR
version = Version([1, 2, 3])
version.increment(ReleaseType.PATCH)
assert version.get_version_string() == "1.2.4"
assert version.version_list == [1, 2, 4]
assert not version.is_snapshot
version = Version([1, 2, 3])
version.increment(ReleaseType.MINOR)
assert version.get_version_string() == "1.3.0"
assert version.version_list == [1, 3, 0]
assert not version.is_snapshot
# init
commit_string = "MINOR bla"
repo = GitRepository.create_from_commit_string(commit_string)
release_type = repo.get_release_type_from_latest_commit()
version = Version([1, 2, 3])
version.increment(ReleaseType.MAJOR)
assert version.get_version_string() == "2.0.0"
assert version.version_list == [2, 0, 0]
assert not version.is_snapshot
#test
assert release_type == ReleaseType.MINOR
# init
commit_string = "PATCH bla"
repo = GitRepository.create_from_commit_string(commit_string)
release_type = repo.get_release_type_from_latest_commit()
# test
assert release_type == ReleaseType.PATCH
# init
commit_string = "SNAPSHOT bla"
repo = GitRepository.create_from_commit_string(commit_string)
release_type = repo.get_release_type_from_latest_commit()
#test
assert release_type == ReleaseType.SNAPSHOT
# init
commit_string = "bla"
repo = GitRepository.create_from_commit_string(commit_string)
release_type = repo.get_release_type_from_latest_commit()
#test
assert release_type == None
def test_gradle(tmp_path):
# init
@ -125,3 +134,18 @@ def test_python(tmp_path):
# check
assert '3.1.3-SNAPSHOT' in f.read_text()
def test_release_repository(tmp_path):
# init
file_name = 'config.json'
with open(f'test/resources/{file_name}', 'r') as gradle_file:
contents = gradle_file.read()
f = tmp_path / file_name
f.write_text(contents)
# test
sut = ReleaseRepository(VersionRepository(f))
current_release = sut.get_current_release()
assert current_release.version is not None

View file

@ -19,7 +19,6 @@ sys.path.append(parent)
# directory.
from pybuilder.core import task, init, Project
from version import *
from release_mixin import ReleaseMixin, create_release_mixin_config
MAIN_BRANCH = 'main'

View file

@ -18,8 +18,8 @@ sys.path.append(parent)
# directory.
from services import InitReleaseService
from release_type import ReleaseType
from version_repository import VersionRepository
from domain import ReleaseType
from infrastructure import VersionRepository
def test_init_release_service(tmp_path):
# init

View file

@ -1,32 +0,0 @@
from file_handlers import FileHandler
from domain import Version
class VersionRepository():
def __init__(self, file):
self.file = file
self.file_handler = None
def load_file(self):
self.file_handler = FileHandler.from_file_path(self.file)
return self.file_handler
def write_file(self, version_string):
if self.file_handler is None:
raise Exception('Version was not created by load_file method.')
else:
self.file_handler.write(version_string)
def parse_file(self):
version_list, is_snapshot = self.file_handler.parse()
return version_list, is_snapshot
def get_version(self):
self.file_handler = self.load_file()
version_list, is_snapshot = self.parse_file()
version = Version(version_list)
version.is_snapshot = is_snapshot
return version