diff --git a/src/hermes/commands/init/base.py b/src/hermes/commands/init/base.py index 1844cdb..0626eaf 100644 --- a/src/hermes/commands/init/base.py +++ b/src/hermes/commands/init/base.py @@ -5,8 +5,12 @@ import argparse import os import subprocess +import sys import requests import toml +from enum import Enum, auto +from urllib.parse import urlparse +from pathlib import Path from pydantic import BaseModel from hermes.commands.base import HermesCommand import hermes.commands.init.oauth_github as oauth_github @@ -16,13 +20,34 @@ import hermes.commands.init.slim_click as sc +TUTORIAL_URL = "https://docs.software-metadata.pub/en/latest/tutorials/automated-publication-with-ci.html" + + +class GitHoster(Enum): + Empty = auto() + GitHub = auto() + GitLab = auto() + + +class DepositPlatform(Enum): + Empty = auto() + Zenodo = auto() + ZenodoSandbox = auto() + + +DepositPlatformChars: dict[DepositPlatform, str] = { + DepositPlatform.Zenodo: "z", + DepositPlatform.ZenodoSandbox: "s" + } + + class HermesInitFolderInfo: def __init__(self): self.absolute_path: str = "" self.has_git: bool = False self.git_remote_url: str = "" - self.uses_github: bool = False - self.uses_gitlab: bool = False + self.git_base_url: str = "" + self.used_git_hoster: GitHoster = GitHoster.Empty self.has_hermes_toml: bool = False self.has_gitignore: bool = False self.has_citation_cff = False @@ -43,20 +68,27 @@ def scout_current_folder() -> HermesInitFolderInfo: info.absolute_path = str(current_dir) info.has_git = os.path.isdir(os.path.join(current_dir, ".git")) if info.has_git: - remote_info = subprocess.run(['git', 'remote', '-v'], capture_output=True, text=True, check=True).stdout - for line in remote_info.splitlines(): - if line.startswith("origin"): - whitespace_split = line.split() - if len(whitespace_split) > 1: - info.git_remote_url = whitespace_split[1] - break - branch_info = subprocess.run(['git', 'branch'], capture_output=True, text=True, check=True).stdout + git_remote = str(subprocess.run(['git', 'remote'], capture_output=True, text=True).stdout).strip() + sc.echo(f"git remote = {git_remote}", debug=True) + info.git_remote_url = str(subprocess.run(['git', 'remote', 'get-url', git_remote], + capture_output=True, text=True).stdout).strip() + sc.echo(f"git remote url = {info.git_remote_url}", debug=True) + branch_info = str(subprocess.run(['git', 'branch'], capture_output=True, text=True).stdout) for line in branch_info.splitlines(): if line.startswith("*"): info.current_branch = line.split()[1].strip() break - info.uses_github = "github" in info.git_remote_url - info.uses_gitlab = "gitlab" in info.git_remote_url + if info.git_remote_url: + parsed_url = urlparse(info.git_remote_url) + info.git_base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" + sc.echo(f"git base url = {info.git_base_url}", debug=True) + if "github.com" in info.git_remote_url: + info.used_git_hoster = GitHoster.GitHub + elif "gitlab.com" in info.git_remote_url: + info.used_git_hoster = GitHoster.GitLab + elif "401" in subprocess.run(['curl', info.git_base_url + "/api/v4/version"], + capture_output=True, text=True).stdout: + info.used_git_hoster = GitHoster.GitLab info.has_hermes_toml = os.path.isfile(os.path.join(current_dir, "hermes.toml")) info.has_gitignore = os.path.isfile(os.path.join(current_dir, ".gitignore")) info.has_citation_cff = os.path.isfile(os.path.join(current_dir, "CITATION.cff")) @@ -69,14 +101,19 @@ def wait_until_the_user_is_done(): pass -def download_file_from_url(url, filepath): +def download_file_from_url(url, filepath, append: bool = False): with requests.get(url, stream=True) as r: r.raise_for_status() - with open(filepath, 'wb') as f: + with open(filepath, 'ab' if append else 'wb') as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) +def string_in_file(file_path, search_string: str) -> bool: + with open(file_path, 'r', encoding='utf-8') as file: + return any(search_string in line for line in file) + + class HermesInitSettings(BaseModel): """Configuration of the ``init`` command.""" pass @@ -92,6 +129,7 @@ def __init__(self, parser: argparse.ArgumentParser): self.folder_info: HermesInitFolderInfo = HermesInitFolderInfo() self.tokens: dict[str: str] = {} self.setup_method: str = "" + self.deposit_platform: DepositPlatform = DepositPlatform.Empty def init_command_parser(self, command_parser: argparse.ArgumentParser) -> None: command_parser.add_argument('--only-set-refresh-token', action='store_true', default=False, @@ -104,9 +142,13 @@ def load_settings(self, args: argparse.Namespace): pass def refresh_folder_info(self): + sc.echo("Scanning folder...", debug=True) self.folder_info = scout_current_folder() + sc.echo("Scan complete.", debug=True) def __call__(self, args: argparse.Namespace) -> None: + sc.echo("Starting hermes init...") + # Test if init is possible and wanted. If not: sys.exit self.test_initialization(args) @@ -116,6 +158,9 @@ def __call__(self, args: argparse.Namespace) -> None: # Creating the citation File self.create_citation_cff() + # Choosing desired deposit platform + self.choose_deposit_platform() + # Adding .hermes to the .gitignore self.update_gitignore() @@ -124,10 +169,10 @@ def __call__(self, args: argparse.Namespace) -> None: # Choosing setup method self.setup_method = sc.choose("How do you want to connect with Zenodo / GitHub?", - [("o", "using OAuth (default)"), ("m", "doing it manually")], default="o") + {"o": "using OAuth", "m": "doing it manually"}, default="o") - # Getting Zenodo token - self.get_zenodo_token() + # Connect with deposit platform + self.connect_deposit_platform() # Adding the token to the git secrets & changing action workflow settings self.configure_git_project() @@ -138,30 +183,33 @@ def test_initialization(self, args: argparse.Namespace): # Abort if git is not installed if not is_git_installed(): sc.echo("Git is currently not installed. It is mandatory for HERMES to have git installed.") - return + sys.exit() # Look at the current folder self.refresh_folder_info() # Only set the refresh-token (this is being used after the deposit) if args.only_refresh_token: - if self.folder_info.uses_github: - zenodo_refresh_token = "REFRESH_TOKEN:" + os.environ.get('ZENODO_TOKEN_REFRESH') - github_secrets.create_secret(self.folder_info.git_remote_url, "ZENODO_SANDBOX", - zenodo_refresh_token, args.github_token) - return + match self.folder_info.used_git_hoster: + case GitHoster.GitHub: + zenodo_refresh_token = "REFRESH_TOKEN:" + os.environ.get('ZENODO_TOKEN_REFRESH') + github_secrets.create_secret(self.folder_info.git_remote_url, "ZENODO_SANDBOX", + zenodo_refresh_token, args.github_token) + sys.exit() # Abort if there is no git if not self.folder_info.has_git: sc.echo("The current directory has no `.git` subdirectory. " "Please execute `hermes init` in the root directory of your git project.") - return + sys.exit() # Abort if neither GitHub nor gitlab is used - if not (self.folder_info.uses_github or self.folder_info.uses_gitlab): + if self.folder_info.used_git_hoster == GitHoster.Empty: sc.echo("Your git project ({}) is not connected to github or gitlab. It is mandatory for HERMES to " "use one of those hosting services.".format(self.folder_info.git_remote_url)) - return + sys.exit() + else: + sc.echo(f"Git project using {self.folder_info.used_git_hoster.name} detected.") sc.echo(f"Starting to initialize HERMES in {self.folder_info.absolute_path}") @@ -170,7 +218,7 @@ def test_initialization(self, args: argparse.Namespace): sc.echo("The current directory already has a `hermes.toml`. " "It seems like HERMES was already initialized for this project.") if not sc.confirm("Do you want to initialize Hermes anyway? "): - return + sys.exit() def create_hermes_toml(self): default_values = { @@ -216,6 +264,7 @@ def create_citation_cff(self): def update_gitignore(self): if not self.folder_info.has_gitignore: + # noinspection PyUnusedLocal with open(".gitignore", 'w') as file: pass sc.echo("A new `.gitignore` file was created.") @@ -232,68 +281,135 @@ def update_gitignore(self): sc.echo("Added `.hermes/` to the `.gitignore` file.") def create_ci_template(self): - if self.folder_info.uses_github: - # TODO Replace this later with the link to the real templates (not the feature branch) - github_ci_template_raw_url = ("https://raw.githubusercontent.com/softwarepub/ci-templates/feature/" - "init-command/TEMPLATE_hermes_github_to_zenodo.yml") - github_folder_path = os.path.join(os.getcwd(), ".github") - if not os.path.isdir(github_folder_path): - os.mkdir(github_folder_path) - workflows_folder_path = os.path.join(github_folder_path, "workflows") - if not os.path.isdir(workflows_folder_path): - os.mkdir(workflows_folder_path) - ci_file_path = os.path.join(workflows_folder_path, "hermes_github_to_zenodo.yml") - download_file_from_url(github_ci_template_raw_url, ci_file_path) - sc.echo(f"GitHub CI yml file was created at {ci_file_path}") - - def get_zenodo_token(self): - self.tokens["zenodo"] = "" + match self.folder_info.used_git_hoster: + case GitHoster.GitHub: + # TODO Replace this later with the link to the real templates (not the feature branch) + template_url = ("https://raw.githubusercontent.com/softwarepub/ci-templates/refs/heads/" + "feature/init-command/TEMPLATE_hermes_github_to_zenodo.yml") + ci_file_folder = ".github/workflows" + ci_file_name = "hermes_github.yml" + Path(ci_file_folder).mkdir(parents=True, exist_ok=True) + ci_file_path = Path(ci_file_folder) / ci_file_name + download_file_from_url(template_url, ci_file_path) + sc.echo(f"GitHub CI file was created at {ci_file_path}") + case GitHoster.GitLab: + gitlab_ci_template_url = ("https://raw.githubusercontent.com/softwarepub/ci-templates/refs/heads/" + "feature/init-command/TEMPLATE_hermes_gitlab_to_zenodo.yml") + hermes_ci_template_url = ("https://raw.githubusercontent.com/softwarepub/ci-templates/refs/heads/" + "feature/init-command/gitlab/hermes-ci.yml") + gitlab_ci_path = Path(".gitlab-ci.yml") + Path(".gitlab").mkdir(parents=True, exist_ok=True) + hermes_ci_path = Path(".gitlab") / "hermes-ci.yml" + if gitlab_ci_path.exists(): + if string_in_file(gitlab_ci_path, "hermes-ci.yml"): + sc.echo(f"It seems like your {gitlab_ci_path} file is already configured for hermes.") + else: + download_file_from_url(gitlab_ci_template_url, gitlab_ci_path, append=True) + sc.echo(f"{gitlab_ci_path} was updated.") + else: + download_file_from_url(gitlab_ci_template_url, gitlab_ci_path) + sc.echo(f"{gitlab_ci_path} was created.") + download_file_from_url(hermes_ci_template_url, hermes_ci_path) + sc.echo(f"{hermes_ci_path} was created.") + + def get_zenodo_token(self, sandbox: bool = True): + self.tokens[self.deposit_platform] = "" if self.setup_method == "o": - self.tokens["zenodo"] = oauth_zenodo.get_refresh_token() - if self.tokens["zenodo"]: + oauth_zenodo.setup(sandbox) + self.tokens[self.deposit_platform] = oauth_zenodo.get_refresh_token() + if self.tokens[self.deposit_platform]: sc.echo("OAuth at Zenodo was successful.") - sc.echo(self.tokens["zenodo"], debug=True) + sc.echo(self.tokens[self.deposit_platform], debug=True) else: sc.echo("Something went wrong while doing OAuth. You'll have to do it manually instead.") - if self.setup_method == "m" or self.tokens["zenodo"] == '': - zenodo_token_url = "https://sandbox.zenodo.org/account/settings/applications/tokens/new/" + if self.setup_method == "m" or self.tokens[self.deposit_platform] == '': + zenodo_token_url = "https://sandbox.zenodo.org/account/settings/applications/tokens/new/" if sandbox else \ + "https://zenodo.org/account/settings/applications/tokens/new/" sc.echo("{} and create an access token.".format( sc.create_console_hyperlink(zenodo_token_url, "Open this link") )) if self.setup_method == "m": sc.press_enter_to_continue() else: - self.tokens["zenodo"] = sc.answer("Then enter the token here: ") + self.tokens[self.deposit_platform] = sc.answer("Then enter the token here: ") def configure_git_project(self): - if self.folder_info.uses_github: - oauth_success = False - if self.setup_method == "o": - self.tokens["github"] = oauth_github.get_access_token() - if self.tokens["github"]: - sc.echo("OAuth at GitHub was successful.") - sc.echo(self.tokens["github"], debug=True) - github_secrets.create_secret(self.folder_info.git_remote_url, "ZENODO_SANDBOX", - secret_value=self.tokens["zenodo"], token=self.tokens["github"]) - github_permissions.allow_actions(self.folder_info.git_remote_url, token=self.tokens["github"]) - oauth_success = True - else: - sc.echo("Something went wrong while doing OAuth. You'll have to do it manually instead.") - if not oauth_success: - sc.echo("Now add {} to your {} under the name ZENODO_SANDBOX.".format( - "the token ({})".format(self.tokens["zenodo"]) if self.tokens["zenodo"] else "the token", - sc.create_console_hyperlink( - self.folder_info.git_remote_url.replace(".git", "/settings/secrets/actions"), - "project's GitHub secrets" - ) - )) - sc.press_enter_to_continue() - sc.echo("Next go to your {} and check the checkbox which reads:".format( - sc.create_console_hyperlink(self.folder_info.git_remote_url.replace(".git", "/settings/actions"), - "project settings") - )) - sc.echo("Allow GitHub Actions to create and approve pull requests") - sc.press_enter_to_continue() - sc.echo("Good job!") - elif self.folder_info.uses_gitlab: - print("GITLAB INIT NOT IMPLEMENTED YET") + match self.folder_info.used_git_hoster: + case GitHoster.GitHub: + oauth_success = False + if self.setup_method == "o": + self.tokens[GitHoster.GitHub] = oauth_github.get_access_token() + if self.tokens[GitHoster.GitHub]: + sc.echo("OAuth at GitHub was successful.") + sc.echo(self.tokens[GitHoster.GitHub], debug=True) + github_secrets.create_secret(self.folder_info.git_remote_url, "ZENODO_SANDBOX", + secret_value=self.tokens[self.deposit_platform], + token=self.tokens[GitHoster.GitHub]) + github_permissions.allow_actions(self.folder_info.git_remote_url, + token=self.tokens[GitHoster.GitHub]) + oauth_success = True + else: + sc.echo("Something went wrong while doing OAuth. You'll have to do it manually instead.") + if not oauth_success: + sc.echo("Add the {} token{} to your {} under the name ZENODO_SANDBOX.".format( + self.deposit_platform.name, + f" ({self.tokens[self.deposit_platform]})" if self.tokens[self.deposit_platform] else "", + sc.create_console_hyperlink( + self.folder_info.git_remote_url.replace(".git", "/settings/secrets/actions"), + "project's GitHub secrets" + ) + )) + sc.press_enter_to_continue() + sc.echo("Next open your {} and check the checkbox which reads:".format( + sc.create_console_hyperlink( + self.folder_info.git_remote_url.replace(".git", "/settings/actions"), + "project settings" + ) + )) + sc.echo("Allow GitHub Actions to create and approve pull requests") + sc.press_enter_to_continue() + sc.echo("Good job!") + case GitHoster.GitLab: + oauth_success = False + if self.setup_method == "o": + # TODO oauth for gitlab + pass + if not oauth_success: + sc.echo("Go to your {} and create a new token.".format( + sc.create_console_hyperlink( + self.folder_info.git_remote_url.replace(".git", "/-/settings/ci_cd"), + "project's access tokens") + )) + sc.echo("It needs to have the 'developer' role and 'write_repository' scope.") + sc.press_enter_to_continue() + sc.echo("Open your {} and go to 'Variables'".format( + sc.create_console_hyperlink( + self.folder_info.git_remote_url.replace(".git", "/-/settings/ci_cd"), + "project's ci settings") + )) + sc.echo("Then, add that token as variable with key HERMES_PUSH_TOKEN.") + sc.echo("(For your safety, you should set the visibility to 'Masked'.)") + sc.press_enter_to_continue() + sc.echo("Next, add the {} token{} as variable with key ZENODO_TOKEN.".format( + self.deposit_platform.name, + f" ({self.tokens[self.deposit_platform]})" if self.tokens[self.deposit_platform] else "" + )) + sc.echo("(For your safety, you should set the visibility to 'Masked'.)") + sc.press_enter_to_continue() + + def choose_deposit_platform(self): + deposit_platform_char = sc.choose( + "Where do you want to publish the software?", + {DepositPlatformChars[dp]: dp.name for dp in DepositPlatformChars.keys()}, + default=DepositPlatformChars[DepositPlatform.ZenodoSandbox] + ) + self.deposit_platform = next((dp for dp, c in DepositPlatformChars.items() if c == deposit_platform_char), + DepositPlatform.Empty) + + def connect_deposit_platform(self): + assert self.deposit_platform != DepositPlatform.Empty + match self.deposit_platform: + case DepositPlatform.Zenodo: + self.get_zenodo_token(False) + case DepositPlatform.ZenodoSandbox: + self.get_zenodo_token(True) diff --git a/src/hermes/commands/init/oauth_zenodo.py b/src/hermes/commands/init/oauth_zenodo.py index 7684251..a8d3e7a 100644 --- a/src/hermes/commands/init/oauth_zenodo.py +++ b/src/hermes/commands/init/oauth_zenodo.py @@ -5,7 +5,7 @@ from hermes.commands.init.oauth_process import OauthProcess -USING_SANDBOX = True +USING_SANDBOX_AS_DEFAULT = True local_port = 8334 @@ -20,10 +20,21 @@ real_token_url = 'https://zenodo.org/oauth/token' scope = "deposit:write deposit:actions" -client_id = sandbox_client_id if USING_SANDBOX else real_client_id -client_secret = sandbox_client_secret if USING_SANDBOX else real_client_secret -authorize_url = sandbox_authorize_url if USING_SANDBOX else real_authorize_url -token_url = sandbox_token_url if USING_SANDBOX else real_token_url +client_id = sandbox_client_id if USING_SANDBOX_AS_DEFAULT else real_client_id +client_secret = sandbox_client_secret if USING_SANDBOX_AS_DEFAULT else real_client_secret +authorize_url = sandbox_authorize_url if USING_SANDBOX_AS_DEFAULT else real_authorize_url +token_url = sandbox_token_url if USING_SANDBOX_AS_DEFAULT else real_token_url + + +def setup(using_sandbox: bool = True): + global client_id + global client_secret + global authorize_url + global token_url + client_id = sandbox_client_id if using_sandbox else real_client_id + client_secret = sandbox_client_secret if using_sandbox else real_client_secret + authorize_url = sandbox_authorize_url if using_sandbox else real_authorize_url + token_url = sandbox_token_url if using_sandbox else real_token_url def oauth_process() -> OauthProcess: diff --git a/src/hermes/commands/init/slim_click.py b/src/hermes/commands/init/slim_click.py index 25fbd74..0731197 100644 --- a/src/hermes/commands/init/slim_click.py +++ b/src/hermes/commands/init/slim_click.py @@ -7,15 +7,21 @@ """ PRINT_DEBUG = False +"""If this is true, echo() will print texts with debug=True.""" -def echo(text, debug: bool = False): +def echo(text: str, debug: bool = False): + """ + :param text: The printed text. + :param debug: If debug, the text will only be printed when slim_click.PRINT_DEBUG is true. + """ text = str(text) if (not debug) or PRINT_DEBUG: print(text) def confirm(text: str, default: bool = True) -> bool: + """The user gets to decide between yes (y) and no (n). The answer will be returned as bool.""" while True: _answer = input(text + (" [Y/n]" if default else " [y/N]") + ": ").lower() if _answer == "y": @@ -29,6 +35,7 @@ def confirm(text: str, default: bool = True) -> bool: def answer(text: str) -> str: + """Returns the user's response to the given text. It is just a wrapper for input().""" return input(text) @@ -36,19 +43,25 @@ def press_enter_to_continue(text: str = "Press ENTER to continue") -> None: input(text) -def choose(text: str, options: list[tuple[str, str]], default: str = "") -> str: - default = default.lower() +def choose(text: str, options: dict[str, str], default: str = "") -> str: + """ + The user gets to make a choice between predefined answers. + + :param text: Displayed text / question + :param options: Dict with possible answers {char -> description} + :param default: Selected answer (char) if the user doesn't enter anything + """ + default = default.lower().strip() + assert default in options.keys(), "Default char should be a key in the options dict." print(text) - for o in options: - char = o[0].lower() - description = o[1] + for char, description in options.items(): + char = char.lower().strip() if char == default.lower(): - description += " [default]" - description = o[1] + description += " (default)" print(f"[{char}] {description}") while True: - _answer = input("Your choice: ").lower() - if _answer in list(zip(*options))[0]: + _answer = input("Your choice: ").lower().strip() + if _answer in options.keys(): return _answer elif _answer == "" and default != "": return default @@ -57,7 +70,9 @@ def choose(text: str, options: list[tuple[str, str]], default: str = "") -> str: USE_FANCY_HYPERLINKS = False +"""If true links will be hidden in the console. Not all consoles support this however.""" def create_console_hyperlink(url: str, word: str) -> str: + """Use this to have a consistent display of hyperlinks.""" return f"\033]8;;{url}\033\\{word}\033]8;;\033\\" if USE_FANCY_HYPERLINKS else f"{word} ({url})"