improve some linting

pull/1/head
Michael Jerger 11 months ago
parent cb41ad0719
commit 5d2596bd3f

@ -8,8 +8,8 @@ class ReleaseService:
def __init__( def __init__(
self, self,
git_api: GitApi, git_api: GitApi,
build_file_repository: BuildFileRepository,
artifact_deployment_api: ArtifactDeploymentApi, artifact_deployment_api: ArtifactDeploymentApi,
build_file_repository: BuildFileRepository,
): ):
self.git_api = git_api self.git_api = git_api
self.artifact_deployment_api = artifact_deployment_api self.artifact_deployment_api = artifact_deployment_api
@ -63,7 +63,10 @@ class ReleaseService:
def publish_artifacts(self, release: Release): def publish_artifacts(self, release: Release):
for artifact_path in release.release_artifacts: for artifact_path in release.release_artifacts:
self.artifact_deployment_api.calculate_checksums(artifact_path) self.artifact_deployment_api.calculate_checksums(artifact_path)
self.artifact_deployment_api.create_forgejo_release(release.forgejo_release_api_endpoint) # create release self.artifact_deployment_api.create_forgejo_release(
release.forgejo_release_api_endpoint(),
)
# create release
# add artifacts to release # add artifacts to release
pass pass

@ -17,9 +17,7 @@ class Release(Validateable):
self.release_primary_build_file = inp.get( self.release_primary_build_file = inp.get(
"release_primary_build_file", "./project.clj" "release_primary_build_file", "./project.clj"
) )
self.release_artifacts = inp.get( self.release_artifacts = inp.get("release_artifacts", [])
"release_artifacts", []
)
self.release_secondary_build_files = inp.get( self.release_secondary_build_files = inp.get(
"release_secondary_build_files", [] "release_secondary_build_files", []
) )
@ -59,7 +57,9 @@ class Release(Validateable):
and self.release_type != ReleaseType.NONE and self.release_type != ReleaseType.NONE
and self.release_main_branch != self.release_current_branch and self.release_main_branch != self.release_current_branch
): ):
result.append(f"Releases are allowed only on {self.release_main_branch}") result.append(
f"Releases are allowed only on {self.release_main_branch}"
)
return result return result
def build_files(self) -> List[str]: def build_files(self) -> List[str]:
@ -68,10 +68,19 @@ class Release(Validateable):
return result return result
def forgejo_release_api_endpoint(self): def forgejo_release_api_endpoint(self):
if self.release_artifact_server_url == None or self.release_organisation == None or self.release_repository_name == None: if (
raise RuntimeError("when doing artifact release, release_artifact_server_url, release_organisation, release_repository_name may not be None.") self.release_artifact_server_url is None
or self.release_organisation is None
server_url = self.release_artifact_server_url.removeprefix("/").removesuffix("/") or self.release_repository_name is None
):
raise RuntimeError(
"when doing artifact release, release_artifact_server_url, "+
"release_organisation, release_repository_name may not be None."
)
server_url = self.release_artifact_server_url.removeprefix("/").removesuffix(
"/"
)
organisation = self.release_organisation.removeprefix("/").removesuffix("/") organisation = self.release_organisation.removeprefix("/").removesuffix("/")
repository = self.release_repository_name.removeprefix("/").removesuffix("/") repository = self.release_repository_name.removeprefix("/").removesuffix("/")
return f"{server_url}/api/v1/repos/{organisation}/{repository}/releases" return f"{server_url}/api/v1/repos/{organisation}/{repository}/releases"

@ -65,16 +65,12 @@ class ImageApi:
def dockerhub_login(self, username: str, password: str): def dockerhub_login(self, username: str, password: str):
self.execution_api.execute_secure( self.execution_api.execute_secure(
f"docker login --username {username} --password {password}", f"docker login --username {username} --password {password}",
"docker login --username ***** --password *****" "docker login --username ***** --password *****",
) )
def dockerhub_publish(self, name: str, username: str, tag: str): def dockerhub_publish(self, name: str, username: str, tag: str):
self.execution_api.execute_live( self.execution_api.execute_live(f"docker tag {name} {username}/{name}:{tag}")
f"docker tag {name} {username}/{name}:{tag}" self.execution_api.execute_live(f"docker push {username}/{name}:{tag}")
)
self.execution_api.execute_live(
f"docker push {username}/{name}:{tag}"
)
def test(self, name: str, path: Path): def test(self, name: str, path: Path):
self.execution_api.execute_live( self.execution_api.execute_live(
@ -95,14 +91,24 @@ class ExecutionApi:
check=check, check=check,
stdout=PIPE, stdout=PIPE,
stderr=PIPE, stderr=PIPE,
text=True).stdout text=True,
).stdout
output = output.rstrip() output = output.rstrip()
except CalledProcessError as exc: except CalledProcessError as exc:
print(f"Command failed with code: {exc.returncode} and message: {exc.stderr}") print(
f"Command failed with code: {exc.returncode} and message: {exc.stderr}"
)
raise exc raise exc
return output return output
def execute_secure(self, command: str, sanitized_command: str, dry_run=False, shell=True, check=True): def execute_secure(
self,
command: str,
sanitized_command: str,
dry_run=False,
shell=True,
check=True,
):
try: try:
output = self.execute(command, dry_run, shell, check) output = self.execute(command, dry_run, shell, check)
return output return output
@ -213,25 +219,40 @@ class ArtifactDeploymentApi:
self.execution_api = ExecutionApi() self.execution_api = ExecutionApi()
def create_forgejo_release(self, target_url: str, tag: str, token: str): def create_forgejo_release(self, target_url: str, tag: str, token: str):
return self.execution_api.execute_secure(f'curl -X "POST" "{target_url}" ' return self.execution_api.execute_secure(
+ '-H "accept: application/json" -H "Content-Type: application/json" ' f'curl -X "POST" "{target_url}" '
+ f'-d "{{ "body": "Provides files for release {tag} Attention: The "Source Code"-files below are not up-to-date!", "tag_name": "{tag}"}}" ' # noqa: E501 + '-H "accept: application/json" -H "Content-Type: application/json" '
+ f'-H "Authorization: token {token}"', + f'-d "{{ "body": "Provides files for release {tag} Attention: The "Source Code"-files below are not up-to-date!", "tag_name": "{tag}"}}" ' # noqa: E501
sanitized_command=f'curl -X "POST" "{target_url}" ' + f'-H "Authorization: token {token}"',
+ '-H "accept: application/json" -H "Content-Type: application/json" ' sanitized_command=f'curl -X "POST" "{target_url}" '
+ f'-d "{{ "body": "Provides files for release {tag} Attention: The "Source Code"-files below are not up-to-date!", "tag_name": "{tag}"}}" ') # noqa: E501 + '-H "accept: application/json" -H "Content-Type: application/json" '
+ f'-d "{{ "body": "Provides files for release {tag} Attention: The "Source Code"-files below are not up-to-date!", "tag_name": "{tag}"}}" ',
def post_asset(self, target_url: str, release_id: str, attachment: str, attachment_type: str, token: str): ) # noqa: E501
return self.execution_api.execute_secure(f'curl -X "POST" "{target_url}/{release_id}/assets" ' # {target_url}/{release_id}/assets move to Domain
+ f'-H "accept: application/json" -H "Authorization: token {token}" ' def post_asset(
+ '-H "Content-Type: multipart/form-data" ' self,
+ f'-F "attachment=@{attachment};type={attachment_type}"', target_url: str,
sanitized_command=f'curl -X "POST" "{target_url}/{release_id}/assets" ' # see above release_id: str,
+ '-H "accept: application/json" ' attachment: str,
+ '-H "Content-Type: multipart/form-data" ' attachment_type: str,
+ f'-F "attachment=@{attachment};type={attachment_type}"') token: str,
):
return self.execution_api.execute_secure(
f'curl -X "POST" "{target_url}/{release_id}/assets" ' # {target_url}/{release_id}/assets move to Domain
+ f'-H "accept: application/json" -H "Authorization: token {token}" '
+ '-H "Content-Type: multipart/form-data" '
+ f'-F "attachment=@{attachment};type={attachment_type}"',
sanitized_command=f'curl -X "POST" "{target_url}/{release_id}/assets" ' # see above
+ '-H "accept: application/json" '
+ '-H "Content-Type: multipart/form-data" '
+ f'-F "attachment=@{attachment};type={attachment_type}"',
)
def calculate_checksums(self, artifact_path: str): def calculate_checksums(self, artifact_path: str):
# self.execution_api.execute(f"find {artifact_path} -type f -exec sha256sum {{}}; | sort > {artifact_path} sha256sum.lst") relevant für provs # self.execution_api.execute(f"find {artifact_path} -type f -exec sha256sum {{}}; | sort > {artifact_path} sha256sum.lst") relevant für provs
self.execution_api(f"sha256sum {artifact_path} > {artifact_path}.sha256",) self.execution_api(
self.execution_api(f"sha512sum {artifact_path} > {artifact_path}.sha512",) f"sha256sum {artifact_path} > {artifact_path}.sha256",
)
self.execution_api(
f"sha512sum {artifact_path} > {artifact_path}.sha512",
)

Loading…
Cancel
Save