From 5ea2e3be12715db66547eeca3714f30a74e6f3fe Mon Sep 17 00:00:00 2001 From: Stefan Negru Date: Mon, 14 Dec 2020 11:30:42 +0200 Subject: [PATCH 01/27] import just environ from os nothing else is required --- sda_orchestrator/complete_consume.py | 18 ++++++++---------- sda_orchestrator/inbox_consume.py | 16 ++++++++-------- sda_orchestrator/utils/consumer.py | 12 ++++++------ sda_orchestrator/utils/logger.py | 4 ++-- sda_orchestrator/verified_consume.py | 16 ++++++++-------- 5 files changed, 32 insertions(+), 34 deletions(-) diff --git a/sda_orchestrator/complete_consume.py b/sda_orchestrator/complete_consume.py index abc02e0..18e9aaf 100644 --- a/sda_orchestrator/complete_consume.py +++ b/sda_orchestrator/complete_consume.py @@ -3,7 +3,7 @@ from amqpstorm import Message from .utils.consumer import Consumer from .utils.logger import LOG -import os +from os import environ from .utils.id_ops import generate_dataset_id from jsonschema.exceptions import ValidationError from .schemas.validate import ValidateJSON, load_schema @@ -57,9 +57,7 @@ def _publish_mappings(self, message: Message, accessionID: str, datasetID: str) ValidateJSON(load_schema("dataset-mapping")).validate(json.loads(mappings_msg)) mapping = Message.create(channel, mappings_msg, properties) - mapping.publish( - os.environ.get("MAPPINGS_QUEUE", "mappings"), exchange=os.environ.get("BROKER_EXCHANGE", "sda") - ) + mapping.publish(environ.get("MAPPINGS_QUEUE", "mappings"), exchange=environ.get("BROKER_EXCHANGE", "sda")) channel.close() @@ -76,12 +74,12 @@ def _publish_mappings(self, message: Message, accessionID: str, datasetID: str) def main() -> None: """Run the Complete consumer.""" CONSUMER = CompleteConsumer( - hostname=str(os.environ.get("BROKER_HOST")), - port=int(os.environ.get("BROKER_PORT", 5670)), - username=os.environ.get("BROKER_USER", "sda"), - password=os.environ.get("BROKER_PASSWORD", ""), - queue=os.environ.get("COMPLETED_QUEUE", "completed"), - vhost=os.environ.get("BROKER_VHOST", "sda"), + hostname=str(environ.get("BROKER_HOST")), + port=int(environ.get("BROKER_PORT", 5670)), + username=environ.get("BROKER_USER", "sda"), + password=environ.get("BROKER_PASSWORD", ""), + queue=environ.get("COMPLETED_QUEUE", "completed"), + vhost=environ.get("BROKER_VHOST", "sda"), ) CONSUMER.start() diff --git a/sda_orchestrator/inbox_consume.py b/sda_orchestrator/inbox_consume.py index cd9988a..819cd62 100644 --- a/sda_orchestrator/inbox_consume.py +++ b/sda_orchestrator/inbox_consume.py @@ -4,7 +4,7 @@ from amqpstorm import Message from .utils.consumer import Consumer from .utils.logger import LOG -import os +from os import environ from pathlib import Path from jsonschema.exceptions import ValidationError from .schemas.validate import ValidateJSON, load_schema @@ -72,7 +72,7 @@ def _publish_ingest(self, message: Message, inbox_msg: Dict) -> None: ingest = Message.create(channel, ingest_msg, properties) - ingest.publish(os.environ.get("INGEST_QUEUE", "ingest"), exchange=os.environ.get("BROKER_EXCHANGE", "sda")) + ingest.publish(environ.get("INGEST_QUEUE", "ingest"), exchange=environ.get("BROKER_EXCHANGE", "sda")) channel.close() LOG.info(f'Sent the message to ingest queue to trigger ingestion for filepath: {inbox_msg["filepath"]}.') @@ -85,12 +85,12 @@ def _publish_ingest(self, message: Message, inbox_msg: Dict) -> None: def main() -> None: """Run the Inbox consumer.""" CONSUMER = InboxConsumer( - hostname=str(os.environ.get("BROKER_HOST")), - port=int(os.environ.get("BROKER_PORT", 5670)), - username=os.environ.get("BROKER_USER", "sda"), - password=os.environ.get("BROKER_PASSWORD", ""), - queue=os.environ.get("INBOX_QUEUE", "inbox"), - vhost=os.environ.get("BROKER_VHOST", "sda"), + hostname=str(environ.get("BROKER_HOST")), + port=int(environ.get("BROKER_PORT", 5670)), + username=environ.get("BROKER_USER", "sda"), + password=environ.get("BROKER_PASSWORD", ""), + queue=environ.get("INBOX_QUEUE", "inbox"), + vhost=environ.get("BROKER_VHOST", "sda"), ) CONSUMER.start() diff --git a/sda_orchestrator/utils/consumer.py b/sda_orchestrator/utils/consumer.py index 0ddc32d..ff1952f 100644 --- a/sda_orchestrator/utils/consumer.py +++ b/sda_orchestrator/utils/consumer.py @@ -1,7 +1,7 @@ """Message Broker Consumer class.""" import time -import os +from os import environ import json import ssl from pathlib import Path @@ -37,12 +37,12 @@ def __init__( self.vhost = vhost self.max_retries = max_retries self.connection = None - self.ssl = bool(strtobool(os.environ.get("BROKER_SSL", "True"))) + self.ssl = bool(strtobool(environ.get("BROKER_SSL", "True"))) context = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS) context.check_hostname = False - cacertfile = Path(f"{os.environ.get('SSL_CACERT', '/tls/certs/ca.crt')}") - certfile = Path(f"{os.environ.get('SSL_CLIENTCERT', '/tls/certs/orch.crt')}") - keyfile = Path(f"{os.environ.get('SSL_CLIENTKEY', '/tls/certs/orch.key')}") + cacertfile = Path(f"{environ.get('SSL_CACERT', '/tls/certs/ca.crt')}") + certfile = Path(f"{environ.get('SSL_CLIENTCERT', '/tls/certs/orch.crt')}") + keyfile = Path(f"{environ.get('SSL_CLIENTKEY', '/tls/certs/orch.key')}") context.verify_mode = ssl.CERT_NONE # Require server verification if cacertfile.exists(): @@ -131,7 +131,7 @@ def _error_message(self, message: Message, reason: str) -> None: ValidateJSON(load_schema("ingestion-user-error")).validate(json.loads(error_msg)) error = Message.create(channel, error_msg, properties) - error.publish(os.environ.get("ERROR_QUEUE", "error"), exchange=os.environ.get("BROKER_EXCHANGE", "sda")) + error.publish(environ.get("ERROR_QUEUE", "error"), exchange=environ.get("BROKER_EXCHANGE", "sda")) channel.close() diff --git a/sda_orchestrator/utils/logger.py b/sda_orchestrator/utils/logger.py index 60b5816..89549b9 100644 --- a/sda_orchestrator/utils/logger.py +++ b/sda_orchestrator/utils/logger.py @@ -1,7 +1,7 @@ """Logging formatting.""" import logging -import os +from os import environ # Keeping it simple with the logging formatting @@ -11,5 +11,5 @@ logging.basicConfig(format=FORMAT, datefmt="%Y-%m-%d %H:%M:%S") LOG = logging.getLogger("sda_orchestrator") # By default the logging level would be INFO -log_level = os.environ.get("LOG_LEVEL", "INFO").upper() +log_level = environ.get("LOG_LEVEL", "INFO").upper() LOG.setLevel(log_level) diff --git a/sda_orchestrator/verified_consume.py b/sda_orchestrator/verified_consume.py index a05aec6..ce9696b 100644 --- a/sda_orchestrator/verified_consume.py +++ b/sda_orchestrator/verified_consume.py @@ -4,7 +4,7 @@ from amqpstorm import Message from .utils.consumer import Consumer from .utils.logger import LOG -import os +from os import environ from .utils.id_ops import generate_accession_id from jsonschema.exceptions import ValidationError from .schemas.validate import ValidateJSON, load_schema @@ -65,7 +65,7 @@ def _publish_accessionID(self, message: Message, accessionID: str, verify_msg: D checksum_data = list(filter(lambda x: x["type"] == "sha256", verify_msg["decrypted_checksums"])) decrypted_checksum = checksum_data[0]["value"] accession.publish( - os.environ.get("ACCESSIONIDS_QUEUE", "accessionIDs"), exchange=os.environ.get("BROKER_EXCHANGE", "sda") + environ.get("ACCESSIONIDS_QUEUE", "accessionIDs"), exchange=environ.get("BROKER_EXCHANGE", "sda") ) channel.close() @@ -82,12 +82,12 @@ def _publish_accessionID(self, message: Message, accessionID: str, verify_msg: D def main() -> None: """Run the Verify consumer.""" CONSUMER = VerifyConsumer( - hostname=str(os.environ.get("BROKER_HOST")), - port=int(os.environ.get("BROKER_PORT", 5670)), - username=os.environ.get("BROKER_USER", "sda"), - password=os.environ.get("BROKER_PASSWORD", ""), - queue=os.environ.get("VERIFIED_QUEUE", "verified"), - vhost=os.environ.get("BROKER_VHOST", "sda"), + hostname=str(environ.get("BROKER_HOST")), + port=int(environ.get("BROKER_PORT", 5670)), + username=environ.get("BROKER_USER", "sda"), + password=environ.get("BROKER_PASSWORD", ""), + queue=environ.get("VERIFIED_QUEUE", "verified"), + vhost=environ.get("BROKER_VHOST", "sda"), ) CONSUMER.start() From 1d5721f88f399dce8683ce17e8693605c0945fde Mon Sep 17 00:00:00 2001 From: Stefan Negru Date: Mon, 11 Jan 2021 09:06:21 +0200 Subject: [PATCH 02/27] configure DOI Handler for creating datacite dois We make use of httpx for creating requests modified the way datasets ids are generated we make use of shortuuid to generate ids that are human readable and usable. see https://blog.datacite.org/cool-dois/ Configuration can be loaded externally as JSON --- requirements.txt | 4 +- sda_orchestrator/config/__init__.py | 22 ++++ sda_orchestrator/config/config.json | 27 +++++ sda_orchestrator/utils/id_ops.py | 158 ++++++++++++++++++++++++++-- setup.py | 2 +- 5 files changed, 205 insertions(+), 8 deletions(-) create mode 100644 sda_orchestrator/config/__init__.py create mode 100644 sda_orchestrator/config/config.json diff --git a/requirements.txt b/requirements.txt index c8de5ab..964d737 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,4 @@ amqpstorm -jsonschema \ No newline at end of file +jsonschema +httpx +shortuuid \ No newline at end of file diff --git a/sda_orchestrator/config/__init__.py b/sda_orchestrator/config/__init__.py new file mode 100644 index 0000000..ad011a2 --- /dev/null +++ b/sda_orchestrator/config/__init__.py @@ -0,0 +1,22 @@ +"""SDA orchestrator configuration. + +Configuration required for DOIs, REMS and other metadata. +""" + +from os import environ +from pathlib import Path +from typing import Dict +import json + + +def parse_config_file(config_file: str) -> Dict: + """Load JSON schemas.""" + file_path = Path(config_file) + + with open(str(file_path), "r") as fp: + return json.load(fp) + + +CONFIG_INFO = parse_config_file( + environ.get("CONFIG_FILE", str(Path(__file__).resolve().parent.joinpath("config.json"))) +) diff --git a/sda_orchestrator/config/config.json b/sda_orchestrator/config/config.json new file mode 100644 index 0000000..04e8b63 --- /dev/null +++ b/sda_orchestrator/config/config.json @@ -0,0 +1,27 @@ +{ + "rems": {}, + "datacite": { + "creators": [ + { + "name": "Nordic e-Infrastructure Collaboration", + "nameType": "Organizational", + "affiliation": [ + { + "name": "Nordic e-Infrastructure Collaboration", + "schemeUri": "https://ror.org", + "affiliationIdentifier": "https://ror.org/04jcwf484", + "affiliationIdentifierScheme": "ROR" + } + ] + } + ], + "publisher": "Nordic e-Infrastructure Collaboration", + "subjects": [ + { + "subject": "FOS: Biological sciences", + "subjectScheme": "Fields of Science and Technology (FOS)" + } + ], + "resourceURL": "http://data-access.sd.csc.fi/" + } +} \ No newline at end of file diff --git a/sda_orchestrator/utils/id_ops.py b/sda_orchestrator/utils/id_ops.py index 33acbff..1e433aa 100644 --- a/sda_orchestrator/utils/id_ops.py +++ b/sda_orchestrator/utils/id_ops.py @@ -1,26 +1,46 @@ """Fetching IDs for files and datasets.""" from pathlib import Path +from typing import Dict, Union from uuid import uuid4 +from os import environ +from datetime import date +import shortuuid +import re +from .logger import LOG +from ..config import CONFIG_INFO -def generate_dataset_id(user: str, inbox_path: str) -> str: +from httpx import Headers, AsyncClient + + +def generate_dataset_id(user: str, inbox_path: str, ns: Union[str, None] = None) -> str: """Map accession ID to dataset. Generate dataset id based on folder or user. + We keep the email domain as users might have same name on different domains + and that could indicate different datasets. """ file_path = Path(inbox_path) file_path_parts = file_path.parts dataset = "" + ns = ns if ns else "urn:neic:" + + # add trailing slash if it does not exist + if ns.startswith(("http://", "https://")): + if ns[len(ns) - 1] != "/": + ns = ns + "/" # if a file it is submited in the root directory the dataset - # is the urn:default: + # is then ns: # otherwise we take the root directory and construct the path - # urn:dir: + # ns:- if len(file_path_parts) <= 2: - dataset = f"urn:default:{user}" + dataset = f"{ns}{user}" else: # if it is / then we take the next value - dataset = f"urn:dir:{file_path_parts[1]}" + dataset = f"{ns}{user}-{file_path_parts[1]}" + + LOG.debug(f"generated dataset id as: {dataset}") return dataset @@ -28,4 +48,130 @@ def generate_dataset_id(user: str, inbox_path: str) -> str: def generate_accession_id() -> str: """Generate Stable ID.""" accessionID = uuid4() - return accessionID.urn + + urn = accessionID.urn + LOG.debug(f"generated accession id as: {urn}") + return urn + + +class DOIHandler: + """Handler for DOI registration at Datacite. + + The workflow consists of create a short uuid based on user and folder/root directory + where the file was uploaded. Based on this information we group files into dataset + It is recommended that first step is to create a draft DOI as it can later be removed easier, + in case of an error. + + ``create_draft_doi`` generates the identifier using a 10 chars shortuuid from, which guarantee + uniqueness based on the way we generate the dataset ID. + + The ``set_doi_state`` can also be used to create a draft DOI, however its use is dependent on generating + a doi_suffix externally. + """ + + def __init__(self) -> None: + """Define DOI credentials and config.""" + self.doi_prefix = environ.get("DOI_PREFIX", "") + self.doi_api = environ.get("DOI_API", "") + self.doi_user = environ.get("DOI_USER", "") + self.doi_key = environ.get("DOI_KEY", "") + self.ns_url = f"https://doi.org/{self.doi_prefix}" + + async def create_draft_doi(self, user: str, inbox_path: str) -> Union[Dict, None]: + """Create an auto-generated draft DOI. + + We are using just the prefix for the DOI so that it will be autogenerated. + """ + dataset = generate_dataset_id(user, inbox_path, self.ns_url) + suffix = shortuuid.uuid(name=dataset)[:10] + doi_suffix = f"{suffix[:4]}-{suffix[4:]}" + + headers = Headers({"Content-Type": "application/json"}) + draft_doi_payload = {"data": {"type": "dois", "attributes": {"doi": f"{self.doi_prefix}/{doi_suffix}"}}} + async with AsyncClient() as client: + response = await client.post( + self.doi_api, auth=(self.doi_user, self.doi_key), json=draft_doi_payload, headers=headers + ) + json_response = response.json() + + doi_data = None + if response.status_code == 200: + LOG.debug(f"DOI draft created and response was: {json_response}") + LOG.info(f"DOI draft created with doi: {json_response['data']['attributes']['doi']}.") + doi_data = { + "suffix": json_response["data"]["attributes"]["suffix"], + "fullDOI": json_response["data"]["attributes"]["doi"], + } + else: + doi_data = self._check_errors(json_response, doi_suffix) + + return doi_data + + async def set_doi_state(self, state: str, doi_suffix: str) -> Union[Dict, None]: + """Set DOI and associated metadata. + + :param state: can be publish, register or hide, or even draft if preferred . + :param doi: DOI to do operations on. + """ + publish_data_payload = { + "data": { + "id": f"{self.doi_prefix}/{doi_suffix}", + "type": "dois", + "attributes": { + "event": state, + "doi": f"{self.doi_prefix}/{doi_suffix}", + "titles": [{"title": f"{CONFIG_INFO['datacite']['titlePrefix']}", "lang": "en"}], + "publisher": CONFIG_INFO["datacite"]["publisher"], + # will be current year + "publicationYear": date.today().year, + # resource type is predefined as dataset + "types": { + "ris": "DATA", + "bibtex": "misc", + "citeproc": "dataset", + "schemaOrg": "Dataset", + "resourceTypeGeneral": "Dataset", + }, + "subjects": CONFIG_INFO["datacite"]["subjects"], + "url": CONFIG_INFO["datacite"]["resourceURL"], + "schemaVersion": "https://schema.datacite.org/meta/kernel-4.3/", + }, + } + } + headers = Headers({"Content-Type": "application/json"}) + async with AsyncClient() as client: + response = await client.post( + self.doi_api, auth=(self.doi_user, self.doi_key), json=publish_data_payload, headers=headers + ) + json_response = response.json() + doi_data = None + if response.status_code == 200: + LOG.debug(f"DOI created with state: {state} and response was: {json_response}") + LOG.info(f"DOI created with doi: {json_response['data']['attributes']['doi']} with state {state}.") + doi_data = { + "suffix": json_response["data"]["attributes"]["suffix"], + "fullDOI": json_response["data"]["attributes"]["doi"], + } + else: + doi_data = self._check_errors(json_response, doi_suffix) + + return doi_data + + def _check_errors(self, json_response: Dict, doi_suffix: str) -> Union[Dict, None]: + errors = json_response["errors"] + doi_data = None + if len(errors) == 1: + error_msg = errors[0]["title"] if "title" in errors[0] else errors[0]["detail"] + if errors[0]["source"] == "doi" and error_msg == "This DOI has already been taken": + LOG.info("DOI already taken, we will associate the submission to this doi dataset.") + doi_data = { + "suffix": doi_suffix, + "fullDOI": f"{self.doi_prefix}/{doi_suffix}", + } + else: + LOG.error(f"Error occurred: {errors}") + raise Exception(f"{error_msg}") + elif len(errors) > 1: + LOG.error(f"Errors occurred: {errors}") + raise Exception(f"Multiple errors occurred: {errors}") + return doi_data diff --git a/setup.py b/setup.py index 7e650e0..ff5629e 100644 --- a/setup.py +++ b/setup.py @@ -44,7 +44,7 @@ "License :: OSI Approved :: Apache Software License", "Programming Language :: Python :: 3.7", ], - install_requires=["amqpstorm", "jsonschema"], + install_requires=["amqpstorm", "jsonschema", "httpx", "shortuuid"], extras_require={ "test": ["coverage", "coveralls", "pytest", "pytest-cov", "tox"], }, From 7f0bb4a50d5b4211bd131e3714fa28de037f488e Mon Sep 17 00:00:00 2001 From: Stefan Negru Date: Tue, 12 Jan 2021 18:10:33 +0200 Subject: [PATCH 03/27] refactor var names --- sda_orchestrator/utils/id_ops.py | 43 ++++++++++++++++---------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/sda_orchestrator/utils/id_ops.py b/sda_orchestrator/utils/id_ops.py index 1e433aa..05a1573 100644 --- a/sda_orchestrator/utils/id_ops.py +++ b/sda_orchestrator/utils/id_ops.py @@ -92,18 +92,18 @@ async def create_draft_doi(self, user: str, inbox_path: str) -> Union[Dict, None response = await client.post( self.doi_api, auth=(self.doi_user, self.doi_key), json=draft_doi_payload, headers=headers ) - json_response = response.json() + draft_resp = response.json() doi_data = None if response.status_code == 200: - LOG.debug(f"DOI draft created and response was: {json_response}") - LOG.info(f"DOI draft created with doi: {json_response['data']['attributes']['doi']}.") + LOG.debug(f"DOI draft created and response was: {draft_resp}") + LOG.info(f"DOI draft created with doi: {draft_resp['data']['attributes']['doi']}.") doi_data = { - "suffix": json_response["data"]["attributes"]["suffix"], - "fullDOI": json_response["data"]["attributes"]["doi"], + "suffix": draft_resp["data"]["attributes"]["suffix"], + "fullDOI": draft_resp["data"]["attributes"]["doi"], } else: - doi_data = self._check_errors(json_response, doi_suffix) + doi_data = self._check_errors(draft_resp, doi_suffix) return doi_data @@ -143,35 +143,36 @@ async def set_doi_state(self, state: str, doi_suffix: str) -> Union[Dict, None]: response = await client.post( self.doi_api, auth=(self.doi_user, self.doi_key), json=publish_data_payload, headers=headers ) - json_response = response.json() + publish_resp = response.json() doi_data = None if response.status_code == 200: - LOG.debug(f"DOI created with state: {state} and response was: {json_response}") - LOG.info(f"DOI created with doi: {json_response['data']['attributes']['doi']} with state {state}.") + LOG.debug(f"DOI created with state: {state} and response was: {publish_resp}") + LOG.info(f"DOI created with doi: {publish_resp['data']['attributes']['doi']} with state {state}.") doi_data = { - "suffix": json_response["data"]["attributes"]["suffix"], - "fullDOI": json_response["data"]["attributes"]["doi"], + "suffix": publish_resp["data"]["attributes"]["suffix"], + "fullDOI": publish_resp["data"]["attributes"]["doi"], } else: - doi_data = self._check_errors(json_response, doi_suffix) + LOG.error(f"DOI API request failed with code: {response.status_code}") + doi_data = self._check_errors(publish_resp, doi_suffix) return doi_data - def _check_errors(self, json_response: Dict, doi_suffix: str) -> Union[Dict, None]: - errors = json_response["errors"] + def _check_errors(self, response: Dict, doi_suffix: str) -> Union[Dict, None]: + errors_resp = response["errors"] doi_data = None - if len(errors) == 1: - error_msg = errors[0]["title"] if "title" in errors[0] else errors[0]["detail"] - if errors[0]["source"] == "doi" and error_msg == "This DOI has already been taken": + if len(errors_resp) == 1: + error_msg = errors_resp[0]["title"] if "title" in errors_resp[0] else errors_resp[0]["detail"] + if errors_resp[0]["source"] == "doi" and error_msg == "This DOI has already been taken": LOG.info("DOI already taken, we will associate the submission to this doi dataset.") doi_data = { "suffix": doi_suffix, "fullDOI": f"{self.doi_prefix}/{doi_suffix}", } else: - LOG.error(f"Error occurred: {errors}") + LOG.error(f"Error occurred: {errors_resp}") raise Exception(f"{error_msg}") - elif len(errors) > 1: - LOG.error(f"Errors occurred: {errors}") - raise Exception(f"Multiple errors occurred: {errors}") + elif len(errors_resp) > 1: + LOG.error(f"Multiple errors occurred: {errors_resp}") + raise Exception(f"Multiple errors occurred: {errors_resp}") return doi_data From 47914a1dbca4899ec7ac5f9cff29a7b125ee0f6e Mon Sep 17 00:00:00 2001 From: Stefan Negru Date: Fri, 15 Jan 2021 16:14:28 +0200 Subject: [PATCH 04/27] id_ops fixes for unused import an fixing tests --- sda_orchestrator/utils/id_ops.py | 1 - tests/test_id_ops.py | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/sda_orchestrator/utils/id_ops.py b/sda_orchestrator/utils/id_ops.py index 05a1573..0e29b12 100644 --- a/sda_orchestrator/utils/id_ops.py +++ b/sda_orchestrator/utils/id_ops.py @@ -6,7 +6,6 @@ from os import environ from datetime import date import shortuuid -import re from .logger import LOG from ..config import CONFIG_INFO diff --git a/tests/test_id_ops.py b/tests/test_id_ops.py index d30e5fd..e15fdb0 100644 --- a/tests/test_id_ops.py +++ b/tests/test_id_ops.py @@ -16,12 +16,12 @@ def setUp(self): def test_map_simple_file(self): """Test if we can map a single file.""" result = generate_dataset_id("user", "user/txt1.c4gh") - self.assertEqual("urn:default:user", result) + self.assertEqual("urn:neic:user", result) def test_map_simple_file_dir(self): """Test if dir scheme affects urn.""" result = generate_dataset_id("user", "user/smth/smth2/txt9.c4gh") - self.assertEqual("urn:dir:smth", result) + self.assertEqual("urn:neic:user-smth", result) @patch( "sda_orchestrator.utils.id_ops.uuid4", From 1b9309f57555756b53e97fea3ea1e635664ef2fd Mon Sep 17 00:00:00 2001 From: Stefan Negru Date: Fri, 15 Jan 2021 16:14:37 +0200 Subject: [PATCH 05/27] base rems configuration --- sda_orchestrator/config/config.json | 98 ++++++++++++++++++++++++++++- 1 file changed, 97 insertions(+), 1 deletion(-) diff --git a/sda_orchestrator/config/config.json b/sda_orchestrator/config/config.json index 04e8b63..0a4538a 100644 --- a/sda_orchestrator/config/config.json +++ b/sda_orchestrator/config/config.json @@ -1,5 +1,101 @@ { - "rems": {}, + "rems": { + "organization": { + "id": "NeIC", + "name": { + "fi": "Nordic e-Infrastructure Collaboration", + "en": "Nordic e-Infrastructure Collaboration" + }, + "shortName": { + "fi": "NeIC", + "en": "NeIC" + } + }, + "license": { + "localizations": { + "en": { + "title": "Creative Commons Attribution 4.0 International (CC BY 4.0)", + "textcontent": "https://creativecommons.org/licenses/by/4.0/" + }, + "fi": { + "title": "Nimeä 4.0 Kansainvälinen (CC BY 4.0)", + "textcontent": "https://creativecommons.org/licenses/by/4.0/deed.fi" + } + } + }, + "form": { + "title": "Base form", + "fields": [ + { + "field/id": "fld1", + "field/type": "header", + "field/title": { + "en": "Base form for registering a resource", + "fi": "Base form for registering a resource" + }, + "field/optional": false + }, + { + "field/id": "fld2", + "field/type": "description", + "field/title": { + "en": "Title of the application", + "fi": "Title of the application" + }, + "field/optional": true, + "field/max-length": 100 + }, + { + "field/id": "fld3", + "field/type": "option", + "field/title": { + "en": "Select type of submision", + "fi": "Select type of submision" + }, + "field/options": [ + { + "key": "fega", + "label": { + "en": "FEGA (Federated EGA)", + "fi": "FEGA (Federated EGA)" + } + }, + { + "key": "sda", + "label": { + "en": "SDA stand-alone", + "fi": "SDA stand-alone" + } + } + ], + "field/optional": false + }, + { + "field/id": "fld4", + "field/type": "texta", + "field/title": { + "en": "Stand-alone SDA archive use case description", + "fi": "Stand-alone SDA archive use case description" + }, + "field/privacy": "private", + "field/optional": false, + "field/max-length": 200, + "field/visibility": { + "visibility/type": "only-if", + "visibility/field": { + "field/id": "fld3" + }, + "visibility/values": [ + "sda" + ] + } + } + ] + }, + "workflow": { + "title": "Base workflow" + } + }, "datacite": { "creators": [ { From da0d462ca8b345a7ea554118ff8c8325faa83907 Mon Sep 17 00:00:00 2001 From: Stefan Negru Date: Fri, 15 Jan 2021 16:16:35 +0200 Subject: [PATCH 06/27] adding rems operation handling for registering a resource --- sda_orchestrator/utils/rems_ops.py | 307 +++++++++++++++++++++++++++++ 1 file changed, 307 insertions(+) create mode 100644 sda_orchestrator/utils/rems_ops.py diff --git a/sda_orchestrator/utils/rems_ops.py b/sda_orchestrator/utils/rems_ops.py new file mode 100644 index 0000000..b9f48ff --- /dev/null +++ b/sda_orchestrator/utils/rems_ops.py @@ -0,0 +1,307 @@ +"""Handle registration of DOI in REMS.""" + +from os import environ +from .logger import LOG + +from ..config import CONFIG_INFO + +from httpx import Headers, AsyncClient + + +class REMSHandler: + """Register Dataset in REMS. + + We will use the DOI of the dataset to create a resource in REMS, + enable that resource. We will attach the resources as the catalogue item. + """ + + def __init__(self) -> None: + """Define DOI credentials and config. + + :param state: can be publish, register, hide or draft. + """ + self.rems_api = environ.get("REMS_API", "") + self.rems_user = environ.get("REMS_USER", "") + self.rems_key = environ.get("REMS_KEY", "") + self.config = CONFIG_INFO["rems"] + self.headers = Headers( + { + "Content-Type": "application/json", + "Accept": "application/json", + "x-rems-api-key": self.rems_key, + "x-rems-user-id": self.rems_user, + } + ) + + async def register_resource(self, doi: str) -> None: + """Register a resource and its dependencies for making it usable, in REMS. + + To make a resource accessible by a user one needs: + + - a license and an organization (every object needs to belong to an organization) + - an organization needs name and short name + - a catalogue item is needed to make a resource findable + - a catalogue item needs form, resource, workflow and license + - a workflow needs to be of type default and required a form id, organization and title + and a list of users/handlers who can process applications + - a form requires organization, title and fields + """ + try: + await self._organization() + license_id = await self._license() + form_id = await self._form() + workflow_id = await self._workflow() + + resource_id = await self._resource(doi, license_id) + await self._catalogue_item(form_id, resource_id, workflow_id, doi) + except Exception: + raise + + async def _process_create(self, resource: str, payload: dict) -> int: + """Process creation of a REMS resource endpoint in a similar fashion so that we can retrieve its id.""" + async with AsyncClient() as client: + response = await client.post(f"{self.rems_api}/api/{resource}/create", json=payload, headers=self.headers) + if response.status_code == 200: + _resp = response.json() + if isinstance(_resp["success"], bool) and _resp["success"]: + _id = _resp["id"] + LOG.info(f"Created {resource} with id {_id}.") + else: + LOG.error(f"Error occurred when creating {resource}.") + raise Exception(f"Error occurred when creating {resource}.") + else: + LOG.error(f"Error occurred when creating {resource} got HTTP status: {response.status_code}") + raise Exception(f"Error occurred when creating {resource} got HTTP status: {response.status_code}") + + return _id + + async def _organization(self) -> None: + """Create organization if it does not exist. + + The organization id is established in configuration and we use that to + assign other resources to it as well + """ + org_exists = False + org = self.config["organization"] + org_payload = { + "archive": False, + "enabled": True, + "organization/id": org["id"], + "organization/short-name": org["name"], + "organization/name": org["shortName"], + "organization/owners": [{"userid": self.rems_user}], + } + + async with AsyncClient() as client: + response = await client.get(f"{self.rems_api}/api/organizations/{org['id']}", headers=self.headers) + if response.status_code == 200: + org_resp = response.json() + if org_resp["organization/id"] == org["id"]: + org_exists = True + LOG.info(f"Organization with id {org['id']} exists") + else: + LOG.error(f"Retrieving organizations failed with HTTP status: {response.status_code}") + + if not org_exists: + await self._process_create("organizations", org_payload) + + async def _license(self) -> int: + """Get or create license if one does not exist. + + We check from existing licenses if one exists with the same URL, if yes use that if not + create a new license with that URL for our organization. + """ + license_exists = False + license_id = 0 + license_payload = { + "licensetype": "link", + "organization": {"organization/id": self.config["organization"]["id"]}, + "localizations": self.config["license"]["localizations"], + } + + async with AsyncClient() as client: + response = await client.get( + f"{self.rems_api}/api/licenses", + headers=self.headers, + ) + if response.status_code == 200: + license_resp = response.json() + for lnc in license_resp: + if ( + lnc["organization"]["organization/id"] == self.config["organization"]["id"] + and lnc["localizations"]["en"]["title"] == self.config["license"]["localizations"]["en"]["title"] + ): + license_exists = True + license_id = lnc["id"] + LOG.info( + f"License {self.config['license']['localizations']['en']['title']} with id {license_id} exists." + ) + else: + LOG.error(f"Retrieving licenses failed with HTTP status: {response.status_code}") + + if not license_exists: + license_id = await self._process_create("licenses", license_payload) + + return license_id + + async def _workflow(self) -> int: + """Create base worflow if one does not exist.""" + workflow_exists = False + workflow_id = 0 + workflow_payload = { + "organization": {"organization/id": self.config["organization"]["id"]}, + "title": self.config["workflow"]["title"], + "type": "workflow/default", + "handlers": [self.rems_user], + } + + async with AsyncClient() as client: + response = await client.get( + f"{self.rems_api}/api/workflows", + headers=self.headers, + ) + if response.status_code == 200: + workflow_resp = response.json() + for wkf in workflow_resp: + if ( + wkf["organization"]["organization/id"] == self.config["organization"]["id"] + and wkf["title"] == self.config["workflow"]["title"] + ): + workflow_exists = True + workflow_id = wkf["id"] + LOG.info(f"Workflow {self.config['workflow']['title']} with id {workflow_id} exists.") + else: + LOG.error(f"Retrieving workflows failed with HTTP status: {response.status_code}") + + if not workflow_exists: + workflow_id = await self._process_create("workflows", workflow_payload) + + return workflow_id + + async def _form(self) -> int: + """Create a basic form if one does not exist used in the application of a resource.""" + form_exists = False + form_id = 0 + form_payload = { + "organization": {"organization/id": self.config["organization"]["id"]}, + "form/title": self.config["form"]["title"], + "form/fields": self.config["form"]["fields"], + } + + async with AsyncClient() as client: + response = await client.get( + f"{self.rems_api}/api/forms", + headers=self.headers, + ) + if response.status_code == 200: + from_resp = response.json() + for form in from_resp: + if ( + form["organization"]["organization/id"] == self.config["organization"]["id"] + and form["form/title"] == self.config["form"]["title"] + ): + form_exists = True + form_id = form["form/id"] + LOG.info(f"Form {self.config['form']['title']} with id {form_id} exists.") + else: + LOG.error(f"Retrieving forms failed with HTTP status: {response.status_code}") + if not form_exists: + form_id = await self._process_create("forms", form_payload) + + return form_id + + async def _catalogue_item(self, form_id: int, resource_id: int, workflow_id: int, doi: str) -> None: + """Create catalogue item to associate a resource to it.""" + item_exists = False + item_payload = { + "form": form_id, + "resid": resource_id, + "wfid": workflow_id, + "organization": {"organization/id": self.config["organization"]["id"]}, + "localizations": { + "fi": { + "title": f"Catalogue item for resource {resource_id}", + "infourl": f"https://doi.org/{doi}", + }, + "en": { + "title": f"Catalogue item for resource {resource_id}", + "infourl": f"https://doi.org/{doi}", + }, + }, + "enabled": True, + "archived": False, + } + + async with AsyncClient() as client: + response = await client.get( + f"{self.rems_api}/api/catalogue-items", + headers=self.headers, + ) + if response.status_code == 200: + item_resp = response.json() + for item in item_resp: + if ( + item["organization"]["organization/id"] == self.config["organization"]["id"] + and item["wfid"] == workflow_id + and item["resid"] == doi + and item["formid"] == form_id + ): + item_exists = True + LOG.info(f"Catalogue Item for resource with DOI {doi} exists with id {item['id']}.") + else: + LOG.error(f"Retrieving catalogue items failed with HTTP status: {response.status_code}") + if not item_exists: + await self._process_create("catalogue-items", item_payload) + + async def _resource(self, doi: str, license_id: int) -> int: + """Create a resource and point it to DataCite DOI.""" + resource_exists = False + resource_id = 0 + resource_payload = { + "resid": doi, + "organization": {"organization/id": self.config["organization"]["id"]}, + "licenses": [license_id], + } + + async with AsyncClient() as client: + response = await client.get( + f"{self.rems_api}/api/resources", + headers=self.headers, + ) + if response.status_code == 200: + resource_resp = response.json() + for res in resource_resp: + if res["organization"]["organization/id"] == self.config["organization"]["id"] and res["resid"] == doi: + resource_exists = True + resource_id = res["id"] + LOG.info(f"Resource for DOI {doi} exists with id {resource_id}.") + else: + LOG.error( + f"Retrieving resources failed with HTTP status: {response.status_code} and response: {response.json()}" + ) + + if not resource_exists: + resource_id = await self._process_create("resources", resource_payload) + + return resource_id + + async def _enable_resource(self, resource_id: int) -> None: + """Enable a resource in REMS so that it can be used. + + This might not be required but good to keep arround if a use case presents. + """ + resource_payload = {"id": resource_id, "enabled": True} + async with AsyncClient() as client: + response = await client.put( + f"{self.rems_api}/api/resources/enabled", json=resource_payload, headers=self.headers + ) + if response.status_code == 200: + _resp = response.json() + if isinstance(_resp["success"], bool) and _resp["success"]: + LOG.info(f"Enabled resource with id {resource_id}.") + else: + LOG.error("Error occurred when enabling resource.") + raise Exception("Error occurred when enabling resource.") + else: + LOG.error(f"Error occurred when enabling resource got HTTP status: {response.status_code}") + raise Exception(f"Error occurred when enabling resource got HTTP status: {response.status_code}") From 98bb42758b9097144a9fa7c36ec578dc548275d9 Mon Sep 17 00:00:00 2001 From: Stefan Negru Date: Mon, 18 Jan 2021 14:43:37 +0200 Subject: [PATCH 07/27] open will also accept a Path so no need for str. Co-authored-by: Pontus Frehult --- sda_orchestrator/config/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sda_orchestrator/config/__init__.py b/sda_orchestrator/config/__init__.py index ad011a2..5477e56 100644 --- a/sda_orchestrator/config/__init__.py +++ b/sda_orchestrator/config/__init__.py @@ -13,7 +13,7 @@ def parse_config_file(config_file: str) -> Dict: """Load JSON schemas.""" file_path = Path(config_file) - with open(str(file_path), "r") as fp: + with open(file_path, "r") as fp: return json.load(fp) From 0de89c7f8a73dfe3119ac2e98a7268813514a423 Mon Sep 17 00:00:00 2001 From: Stefan Negru Date: Tue, 19 Jan 2021 12:51:47 +0200 Subject: [PATCH 08/27] add more information about rems resources. Correct typo and add more information on why the need to register all those resources in REMS before making a resource accessible. --- sda_orchestrator/utils/rems_ops.py | 34 ++++++++++++++++++++++-------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/sda_orchestrator/utils/rems_ops.py b/sda_orchestrator/utils/rems_ops.py index b9f48ff..67c7be7 100644 --- a/sda_orchestrator/utils/rems_ops.py +++ b/sda_orchestrator/utils/rems_ops.py @@ -13,6 +13,12 @@ class REMSHandler: We will use the DOI of the dataset to create a resource in REMS, enable that resource. We will attach the resources as the catalogue item. + + The main function is register resource, that registers resource in REMS under the doi + used to register at Datacite. We reuse existing resources if they exist. + + The default config should be changed depending per installation, current config is NeIC + specific. """ def __init__(self) -> None: @@ -38,13 +44,23 @@ async def register_resource(self, doi: str) -> None: To make a resource accessible by a user one needs: - - a license and an organization (every object needs to belong to an organization) - - an organization needs name and short name - - a catalogue item is needed to make a resource findable - - a catalogue item needs form, resource, workflow and license - - a workflow needs to be of type default and required a form id, organization and title - and a list of users/handlers who can process applications - - a form requires organization, title and fields + - Every object in REMS needs to belong to an organisation, so have (at least) one. + - An organisation needs an internal id, a name and a short name. + - Every application has to be processed by a workflow, so have (at least) one. + - A workflow needs to have an organisation, title, type, and list of users + – handlers – who can process applications. + - Use default workflow type. Decider type is for e.g. governmental use only. + - Optionally a workflow can have a form, but generally it is not needed and should + only be used if absolutely required. + - Every application must have a form which an applicant can fill in + - A form must belong to an organisation, but is otherwise a free-form. + - Every dataset should have one or more licenses. + - A license needs to have an organisation, type, and a name. Depending on type, it may require more data. + - Every dataset is registered as a resource. + - A resource belongs to an organisation, must have an identifier, and should have one or more licenses. + - Every dataset is findable by a catalog item. + - A catalog item bundles all of the above together and is an object of interest to the applicants. + - A catalog item has 1-n mapping: every workflow, form and resource can belong to n catalog items. """ try: await self._organization() @@ -145,7 +161,7 @@ async def _license(self) -> int: return license_id async def _workflow(self) -> int: - """Create base worflow if one does not exist.""" + """Create base workflow if one does not exist.""" workflow_exists = False workflow_id = 0 workflow_payload = { @@ -288,7 +304,7 @@ async def _resource(self, doi: str, license_id: int) -> int: async def _enable_resource(self, resource_id: int) -> None: """Enable a resource in REMS so that it can be used. - This might not be required but good to keep arround if a use case presents. + This might not be required, but good to keep arround if a use case presents. """ resource_payload = {"id": resource_id, "enabled": True} async with AsyncClient() as client: From aaa65dcda8dfa44acaaf891c268ec48091e93984 Mon Sep 17 00:00:00 2001 From: Stefan Negru Date: Tue, 19 Jan 2021 12:54:18 +0200 Subject: [PATCH 09/27] refactor handling of json error response from Datacite --- sda_orchestrator/utils/id_ops.py | 56 ++++++++++++++++++-------------- 1 file changed, 32 insertions(+), 24 deletions(-) diff --git a/sda_orchestrator/utils/id_ops.py b/sda_orchestrator/utils/id_ops.py index 0e29b12..5acae76 100644 --- a/sda_orchestrator/utils/id_ops.py +++ b/sda_orchestrator/utils/id_ops.py @@ -10,7 +10,7 @@ from .logger import LOG from ..config import CONFIG_INFO -from httpx import Headers, AsyncClient +from httpx import Headers, AsyncClient, Response, DecodingError def generate_dataset_id(user: str, inbox_path: str, ns: Union[str, None] = None) -> str: @@ -91,10 +91,9 @@ async def create_draft_doi(self, user: str, inbox_path: str) -> Union[Dict, None response = await client.post( self.doi_api, auth=(self.doi_user, self.doi_key), json=draft_doi_payload, headers=headers ) - draft_resp = response.json() - doi_data = None if response.status_code == 200: + draft_resp = response.json() LOG.debug(f"DOI draft created and response was: {draft_resp}") LOG.info(f"DOI draft created with doi: {draft_resp['data']['attributes']['doi']}.") doi_data = { @@ -102,7 +101,8 @@ async def create_draft_doi(self, user: str, inbox_path: str) -> Union[Dict, None "fullDOI": draft_resp["data"]["attributes"]["doi"], } else: - doi_data = self._check_errors(draft_resp, doi_suffix) + LOG.error(f"DOI API create draft request failed with code: {response.status_code}") + doi_data = self._check_errors(response, doi_suffix) return doi_data @@ -142,9 +142,9 @@ async def set_doi_state(self, state: str, doi_suffix: str) -> Union[Dict, None]: response = await client.post( self.doi_api, auth=(self.doi_user, self.doi_key), json=publish_data_payload, headers=headers ) - publish_resp = response.json() doi_data = None if response.status_code == 200: + publish_resp = response.json() LOG.debug(f"DOI created with state: {state} and response was: {publish_resp}") LOG.info(f"DOI created with doi: {publish_resp['data']['attributes']['doi']} with state {state}.") doi_data = { @@ -153,25 +153,33 @@ async def set_doi_state(self, state: str, doi_suffix: str) -> Union[Dict, None]: } else: LOG.error(f"DOI API request failed with code: {response.status_code}") - doi_data = self._check_errors(publish_resp, doi_suffix) + doi_data = self._check_errors(response, doi_suffix) return doi_data - def _check_errors(self, response: Dict, doi_suffix: str) -> Union[Dict, None]: - errors_resp = response["errors"] - doi_data = None - if len(errors_resp) == 1: - error_msg = errors_resp[0]["title"] if "title" in errors_resp[0] else errors_resp[0]["detail"] - if errors_resp[0]["source"] == "doi" and error_msg == "This DOI has already been taken": - LOG.info("DOI already taken, we will associate the submission to this doi dataset.") - doi_data = { - "suffix": doi_suffix, - "fullDOI": f"{self.doi_prefix}/{doi_suffix}", - } - else: - LOG.error(f"Error occurred: {errors_resp}") - raise Exception(f"{error_msg}") - elif len(errors_resp) > 1: - LOG.error(f"Multiple errors occurred: {errors_resp}") - raise Exception(f"Multiple errors occurred: {errors_resp}") - return doi_data + def _check_errors(self, response: Response, doi_suffix: str) -> Union[Dict, None]: + try: + errors_resp = response.json()["errors"] + except DecodingError: + LOG.error("Decoding JSON error response was not possible.") + raise + except Exception as e: + LOG.error(f"Unknown exception occured with content: {e}.") + raise + else: + doi_data = None + if len(errors_resp) == 1: + error_msg = errors_resp[0]["title"] if "title" in errors_resp[0] else errors_resp[0]["detail"] + if errors_resp[0]["source"] == "doi" and error_msg == "This DOI has already been taken": + LOG.info("DOI already taken, we will associate the submission to this doi dataset.") + doi_data = { + "suffix": doi_suffix, + "fullDOI": f"{self.doi_prefix}/{doi_suffix}", + } + else: + LOG.error(f"Error occurred: {errors_resp}") + raise Exception(f"{error_msg}") + elif len(errors_resp) > 1: + LOG.error(f"Multiple errors occurred: {errors_resp}") + raise Exception(f"Multiple errors occurred: {errors_resp}") + return doi_data From 2608593f077f50028bbf59074ce2afd148dbb50e Mon Sep 17 00:00:00 2001 From: Stefan Negru Date: Tue, 19 Jan 2021 13:18:43 +0200 Subject: [PATCH 10/27] make catalogue item resource url configurable remove Finnish as part of default localization for catalogue item --- sda_orchestrator/config/config.json | 3 +++ sda_orchestrator/utils/rems_ops.py | 6 +----- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/sda_orchestrator/config/config.json b/sda_orchestrator/config/config.json index 0a4538a..1a8f7d2 100644 --- a/sda_orchestrator/config/config.json +++ b/sda_orchestrator/config/config.json @@ -94,6 +94,9 @@ }, "workflow": { "title": "Base workflow" + }, + "catalogueItem": { + "baseResourceURL": "https://doi.org/" } }, "datacite": { diff --git a/sda_orchestrator/utils/rems_ops.py b/sda_orchestrator/utils/rems_ops.py index 67c7be7..cef05c4 100644 --- a/sda_orchestrator/utils/rems_ops.py +++ b/sda_orchestrator/utils/rems_ops.py @@ -235,13 +235,9 @@ async def _catalogue_item(self, form_id: int, resource_id: int, workflow_id: int "wfid": workflow_id, "organization": {"organization/id": self.config["organization"]["id"]}, "localizations": { - "fi": { - "title": f"Catalogue item for resource {resource_id}", - "infourl": f"https://doi.org/{doi}", - }, "en": { "title": f"Catalogue item for resource {resource_id}", - "infourl": f"https://doi.org/{doi}", + "infourl": f"{self.config['catalogueItem']['baseResourceURL']}{doi}", }, }, "enabled": True, From d3e7c48340fc72ca7742743d3df14c29bca003e8 Mon Sep 17 00:00:00 2001 From: Stefan Negru Date: Tue, 19 Jan 2021 13:27:10 +0200 Subject: [PATCH 11/27] integrating REMS and Datacite DOIs in as part of dataset --- sda_orchestrator/complete_consume.py | 32 ++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/sda_orchestrator/complete_consume.py b/sda_orchestrator/complete_consume.py index 18e9aaf..12acf1e 100644 --- a/sda_orchestrator/complete_consume.py +++ b/sda_orchestrator/complete_consume.py @@ -1,12 +1,14 @@ """Message Broker complete step consumer.""" import json +from sda_orchestrator.utils.rems_ops import REMSHandler from amqpstorm import Message from .utils.consumer import Consumer from .utils.logger import LOG from os import environ -from .utils.id_ops import generate_dataset_id +from .utils.id_ops import generate_dataset_id, DOIHandler from jsonschema.exceptions import ValidationError from .schemas.validate import ValidateJSON, load_schema +import asyncio class CompleteConsumer(Consumer): @@ -29,7 +31,7 @@ def handle_message(self, message: Message) -> None: # Send message to mappings queue for dataset to file mapping accessionID = complete_msg["accession_id"] - datasetID = generate_dataset_id(complete_msg["user"], complete_msg["filepath"]) + datasetID = asyncio.run(self._process_datasetID(complete_msg["user"], complete_msg["filepath"])) self._publish_mappings(message, accessionID, datasetID) except ValidationError: @@ -40,6 +42,32 @@ def handle_message(self, message: Message) -> None: LOG.error(f"Error occurred in complete consumer: {error}.") raise + async def _process_datasetID(self, user: str, filepath: str) -> str: + """Process and generated dataset ID depending on environment variable set. + + If we make use of Datacite and REMS we need to check if env vars are set. + """ + datasetID: str = "" + try: + if ( + "DOI_PREFIX" in environ and "DOI_API" in environ and "DOI_USER" in environ and "DOI_KEY" in environ + ) and ("REMS_API" in environ and "REMS_USER" in environ and "REMS_KEY" in environ): + doi_handler = DOIHandler() + rems = REMSHandler() + doi_obj = await doi_handler.create_draft_doi(user, filepath) + if doi_obj: + rems.register_resource(doi_obj["fullDOI"]) + else: + LOG.error("Registering a DOI was not possible.") + raise Exception("Registering a DOI was not possible.") + else: + datasetID = generate_dataset_id(user, filepath) + except Exception as error: + LOG.error(f"Could not process datasetID because of: {error}.") + raise + else: + return datasetID + def _publish_mappings(self, message: Message, accessionID: str, datasetID: str) -> None: """Publish message with dataset to accession ID mapping.""" properties = { From 440254d0994cfab7be397cc6e238dcda2b985467 Mon Sep 17 00:00:00 2001 From: Stefan Negru Date: Tue, 19 Jan 2021 17:35:54 +0200 Subject: [PATCH 12/27] fix weird removal of last char Co-authored-by: Teemu Kataja --- sda_orchestrator/utils/id_ops.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sda_orchestrator/utils/id_ops.py b/sda_orchestrator/utils/id_ops.py index 5acae76..31e8ede 100644 --- a/sda_orchestrator/utils/id_ops.py +++ b/sda_orchestrator/utils/id_ops.py @@ -27,8 +27,7 @@ def generate_dataset_id(user: str, inbox_path: str, ns: Union[str, None] = None) # add trailing slash if it does not exist if ns.startswith(("http://", "https://")): - if ns[len(ns) - 1] != "/": - ns = ns + "/" + ns = ns.rstrip("/") + "/" # if a file it is submited in the root directory the dataset # is then ns: # otherwise we take the root directory and construct the path From 9773bb91ae80abc36a6598349067abc322b6b2a1 Mon Sep 17 00:00:00 2001 From: Stefan Negru Date: Tue, 19 Jan 2021 17:37:49 +0200 Subject: [PATCH 13/27] fix unnecessary f string --- sda_orchestrator/utils/consumer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sda_orchestrator/utils/consumer.py b/sda_orchestrator/utils/consumer.py index ff1952f..0801e97 100644 --- a/sda_orchestrator/utils/consumer.py +++ b/sda_orchestrator/utils/consumer.py @@ -40,9 +40,9 @@ def __init__( self.ssl = bool(strtobool(environ.get("BROKER_SSL", "True"))) context = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS) context.check_hostname = False - cacertfile = Path(f"{environ.get('SSL_CACERT', '/tls/certs/ca.crt')}") - certfile = Path(f"{environ.get('SSL_CLIENTCERT', '/tls/certs/orch.crt')}") - keyfile = Path(f"{environ.get('SSL_CLIENTKEY', '/tls/certs/orch.key')}") + cacertfile = Path(environ.get("SSL_CACERT", "/tls/certs/ca.crt")) + certfile = Path(environ.get("SSL_CLIENTCERT", "/tls/certs/orch.crt")) + keyfile = Path(environ.get("SSL_CLIENTKEY", "/tls/certs/orch.key")) context.verify_mode = ssl.CERT_NONE # Require server verification if cacertfile.exists(): From 039b26939afb7513817c798f325861b41551ec05 Mon Sep 17 00:00:00 2001 From: Stefan Negru Date: Tue, 19 Jan 2021 17:46:20 +0200 Subject: [PATCH 14/27] raise filenot found if file does not exist --- sda_orchestrator/config/__init__.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sda_orchestrator/config/__init__.py b/sda_orchestrator/config/__init__.py index 5477e56..e314003 100644 --- a/sda_orchestrator/config/__init__.py +++ b/sda_orchestrator/config/__init__.py @@ -3,16 +3,21 @@ Configuration required for DOIs, REMS and other metadata. """ -from os import environ +from os import environ, strerror from pathlib import Path from typing import Dict import json +import errno + +from ..utils.logger import LOG def parse_config_file(config_file: str) -> Dict: """Load JSON schemas.""" file_path = Path(config_file) - + if not file_path.is_file(): + LOG.error(f"File {file_path} not found") + raise FileNotFoundError(errno.ENOENT, strerror(errno.ENOENT), file_path) with open(file_path, "r") as fp: return json.load(fp) From 84742c41c3fd347b4cded6173b34dd5bcaedc972 Mon Sep 17 00:00:00 2001 From: Stefan Negru Date: Fri, 22 Jan 2021 10:32:15 +0200 Subject: [PATCH 15/27] default datacite resource url pointing to neic --- sda_orchestrator/config/config.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sda_orchestrator/config/config.json b/sda_orchestrator/config/config.json index 1a8f7d2..51fc068 100644 --- a/sda_orchestrator/config/config.json +++ b/sda_orchestrator/config/config.json @@ -121,6 +121,6 @@ "subjectScheme": "Fields of Science and Technology (FOS)" } ], - "resourceURL": "http://data-access.sd.csc.fi/" + "resourceURL": "http://data-access.sd.neic.no" } } \ No newline at end of file From 969fa836b14785a5a3be0b8857c138a4919f7bf7 Mon Sep 17 00:00:00 2001 From: Stefan Negru Date: Fri, 29 Jan 2021 13:10:14 +0200 Subject: [PATCH 16/27] add config package to install --- setup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index ff5629e..fc5d128 100644 --- a/setup.py +++ b/setup.py @@ -16,11 +16,12 @@ author_email="", description="SDA orchestrator", long_description="", - packages=["sda_orchestrator", "sda_orchestrator/utils", "sda_orchestrator/schemas"], + packages=["sda_orchestrator", "sda_orchestrator/utils", "sda_orchestrator/schemas", "sda_orchestrator/config"], # If any package contains *.json, include them: package_data={ "": [ "schemas/*.json", + "config/*.json", ] }, entry_points={ From b68dbaf9e1e741a3c55e7a8899452b0bcff02ebe Mon Sep 17 00:00:00 2001 From: Stefan Negru Date: Fri, 29 Jan 2021 13:11:00 +0200 Subject: [PATCH 17/27] overall improve logging formatting --- sda_orchestrator/complete_consume.py | 11 ++++++----- sda_orchestrator/inbox_consume.py | 4 ++-- sda_orchestrator/utils/id_ops.py | 2 +- sda_orchestrator/verified_consume.py | 10 +++++----- 4 files changed, 14 insertions(+), 13 deletions(-) diff --git a/sda_orchestrator/complete_consume.py b/sda_orchestrator/complete_consume.py index 12acf1e..f552fd3 100644 --- a/sda_orchestrator/complete_consume.py +++ b/sda_orchestrator/complete_consume.py @@ -22,9 +22,9 @@ def handle_message(self, message: Message) -> None: LOG.debug(f"MQ Message body: {message.body} .") LOG.debug(f"Complete Consumer message received: {complete_msg} .") LOG.info( - f"Received work (corr-id: {message.correlation_id} filepath: {complete_msg['filepath']}, \ - user: {complete_msg['user']}, accessionid: {complete_msg['accession_id']}, \ - decryptedChecksums: {complete_msg['decrypted_checksums']})", + f"Received work (corr-id: {message.correlation_id} filepath: {complete_msg['filepath']}," + f"user: {complete_msg['user']}, accessionid: {complete_msg['accession_id']}," + f"decryptedChecksums: {complete_msg['decrypted_checksums']})" ) ValidateJSON(load_schema("ingestion-completion")).validate(complete_msg) @@ -55,6 +55,7 @@ async def _process_datasetID(self, user: str, filepath: str) -> str: doi_handler = DOIHandler() rems = REMSHandler() doi_obj = await doi_handler.create_draft_doi(user, filepath) + LOG.info(f"Registered dataset {doi_obj}.") if doi_obj: rems.register_resource(doi_obj["fullDOI"]) else: @@ -90,8 +91,8 @@ def _publish_mappings(self, message: Message, accessionID: str, datasetID: str) channel.close() LOG.info( - f"Sent the message to mappings queue to set dataset ID {datasetID} for file \ - with accessionID {accessionID}." + f"Sent the message to mappings queue to set dataset ID {datasetID} for file" + f"with accessionID {accessionID}." ) except ValidationError: diff --git a/sda_orchestrator/inbox_consume.py b/sda_orchestrator/inbox_consume.py index 819cd62..3343d45 100644 --- a/sda_orchestrator/inbox_consume.py +++ b/sda_orchestrator/inbox_consume.py @@ -21,8 +21,8 @@ def handle_message(self, message: Message) -> None: LOG.debug(f"MQ Message body: {message.body} .") LOG.debug(f"Inbox Consumer message received: {inbox_msg} .") LOG.info( - f"Received work (corr-id: {message.correlation_id} filepath: {inbox_msg['filepath']}, \ - user: {inbox_msg['user']} with operation: {inbox_msg['operation']})", + f"Received work (corr-id: {message.correlation_id} filepath: {inbox_msg['filepath']}," + f"user: {inbox_msg['user']} with operation: {inbox_msg['operation']})", ) if inbox_msg["operation"] == "upload": diff --git a/sda_orchestrator/utils/id_ops.py b/sda_orchestrator/utils/id_ops.py index 31e8ede..35598f0 100644 --- a/sda_orchestrator/utils/id_ops.py +++ b/sda_orchestrator/utils/id_ops.py @@ -38,7 +38,7 @@ def generate_dataset_id(user: str, inbox_path: str, ns: Union[str, None] = None) # if it is / then we take the next value dataset = f"{ns}{user}-{file_path_parts[1]}" - LOG.debug(f"generated dataset id as: {dataset}") + LOG.debug(f"generated dataset temp id as: {dataset}") return dataset diff --git a/sda_orchestrator/verified_consume.py b/sda_orchestrator/verified_consume.py index ce9696b..4944587 100644 --- a/sda_orchestrator/verified_consume.py +++ b/sda_orchestrator/verified_consume.py @@ -21,9 +21,9 @@ def handle_message(self, message: Message) -> None: LOG.debug(f"MQ Message body: {message.body} .") LOG.debug(f"Verify Consumer message received: {verify_msg} .") LOG.info( - f"Received work (corr-id: {message.correlation_id} filepath: {verify_msg['filepath']}, \ - user: {verify_msg['user']}, \ - decryptedChecksums: {verify_msg['decrypted_checksums']})", + f"Received work (corr-id: {message.correlation_id} filepath: {verify_msg['filepath']}," + f"user: {verify_msg['user']}," + f"decryptedChecksums: {verify_msg['decrypted_checksums']})" ) ValidateJSON(load_schema("ingestion-accession-request")).validate(verify_msg) @@ -70,8 +70,8 @@ def _publish_accessionID(self, message: Message, accessionID: str, verify_msg: D channel.close() LOG.info( - f'Sent the message to accessionIDs queue to set accession ID for file {verify_msg["filepath"]} \ - with checksum {decrypted_checksum}.' + f"Sent the message to accessionIDs queue to set accession ID for file {verify_msg['filepath']}" + f"with checksum {decrypted_checksum}." ) except ValidationError: From 2653f3a7096f99be058c6203ccdbd0b2624c9faf Mon Sep 17 00:00:00 2001 From: Stefan Negru Date: Fri, 29 Jan 2021 13:11:15 +0200 Subject: [PATCH 18/27] need to await rems register resource --- sda_orchestrator/complete_consume.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sda_orchestrator/complete_consume.py b/sda_orchestrator/complete_consume.py index f552fd3..61a4b93 100644 --- a/sda_orchestrator/complete_consume.py +++ b/sda_orchestrator/complete_consume.py @@ -57,7 +57,7 @@ async def _process_datasetID(self, user: str, filepath: str) -> str: doi_obj = await doi_handler.create_draft_doi(user, filepath) LOG.info(f"Registered dataset {doi_obj}.") if doi_obj: - rems.register_resource(doi_obj["fullDOI"]) + await rems.register_resource(doi_obj["fullDOI"]) else: LOG.error("Registering a DOI was not possible.") raise Exception("Registering a DOI was not possible.") From e749d7fff04f2b822c731d2f1cf4bea004c94b25 Mon Sep 17 00:00:00 2001 From: Stefan Negru Date: Fri, 29 Jan 2021 13:11:26 +0200 Subject: [PATCH 19/27] doi response is 201 not 200 --- sda_orchestrator/utils/id_ops.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sda_orchestrator/utils/id_ops.py b/sda_orchestrator/utils/id_ops.py index 35598f0..6e32ed2 100644 --- a/sda_orchestrator/utils/id_ops.py +++ b/sda_orchestrator/utils/id_ops.py @@ -91,7 +91,7 @@ async def create_draft_doi(self, user: str, inbox_path: str) -> Union[Dict, None self.doi_api, auth=(self.doi_user, self.doi_key), json=draft_doi_payload, headers=headers ) doi_data = None - if response.status_code == 200: + if response.status_code == 201: draft_resp = response.json() LOG.debug(f"DOI draft created and response was: {draft_resp}") LOG.info(f"DOI draft created with doi: {draft_resp['data']['attributes']['doi']}.") @@ -142,7 +142,7 @@ async def set_doi_state(self, state: str, doi_suffix: str) -> Union[Dict, None]: self.doi_api, auth=(self.doi_user, self.doi_key), json=publish_data_payload, headers=headers ) doi_data = None - if response.status_code == 200: + if response.status_code == 201: publish_resp = response.json() LOG.debug(f"DOI created with state: {state} and response was: {publish_resp}") LOG.info(f"DOI created with doi: {publish_resp['data']['attributes']['doi']} with state {state}.") From 0b1766ca8382261bf0735e45dfbe8eaa9cc5c50d Mon Sep 17 00:00:00 2001 From: Stefan Negru Date: Fri, 29 Jan 2021 13:11:40 +0200 Subject: [PATCH 20/27] bump apline image version --- Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 0cb14b9..f3781ab 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.7-alpine3.11 as BUILD +FROM python:3.7-alpine3.12 as BUILD RUN apk add --no-cache git gcc musl-dev libffi-dev make gnupg && \ rm -rf /var/cache/apk/* @@ -11,7 +11,7 @@ RUN pip install --upgrade pip && \ pip install -r /root/sdaorch/requirements.txt && \ pip install /root/sdaorch -FROM python:3.7-alpine3.11 +FROM python:3.7-alpine3.12 LABEL maintainer "NeIC System Developers" LABEL org.label-schema.schema-version="1.0" From 3a6dce7e723f2f917d4a9a4b2152b8519c4cc0ae Mon Sep 17 00:00:00 2001 From: Stefan Negru Date: Sun, 31 Jan 2021 19:09:08 +0200 Subject: [PATCH 21/27] organization needs custom id processing --- sda_orchestrator/utils/rems_ops.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sda_orchestrator/utils/rems_ops.py b/sda_orchestrator/utils/rems_ops.py index cef05c4..49d9caa 100644 --- a/sda_orchestrator/utils/rems_ops.py +++ b/sda_orchestrator/utils/rems_ops.py @@ -73,14 +73,14 @@ async def register_resource(self, doi: str) -> None: except Exception: raise - async def _process_create(self, resource: str, payload: dict) -> int: + async def _process_create(self, resource: str, payload: dict, resp_key: str = "id") -> int: """Process creation of a REMS resource endpoint in a similar fashion so that we can retrieve its id.""" async with AsyncClient() as client: response = await client.post(f"{self.rems_api}/api/{resource}/create", json=payload, headers=self.headers) if response.status_code == 200: _resp = response.json() if isinstance(_resp["success"], bool) and _resp["success"]: - _id = _resp["id"] + _id = _resp[resp_key] LOG.info(f"Created {resource} with id {_id}.") else: LOG.error(f"Error occurred when creating {resource}.") @@ -100,7 +100,7 @@ async def _organization(self) -> None: org_exists = False org = self.config["organization"] org_payload = { - "archive": False, + "archived": False, "enabled": True, "organization/id": org["id"], "organization/short-name": org["name"], @@ -119,7 +119,7 @@ async def _organization(self) -> None: LOG.error(f"Retrieving organizations failed with HTTP status: {response.status_code}") if not org_exists: - await self._process_create("organizations", org_payload) + await self._process_create("organizations", org_payload, "organization/id") async def _license(self) -> int: """Get or create license if one does not exist. From ccd4e4cbcb4ad16e72b54b91107df090bbcc7b2c Mon Sep 17 00:00:00 2001 From: Stefan Negru Date: Sun, 31 Jan 2021 19:09:37 +0200 Subject: [PATCH 22/27] forgotten return dataset id --- sda_orchestrator/complete_consume.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sda_orchestrator/complete_consume.py b/sda_orchestrator/complete_consume.py index 61a4b93..4c68d85 100644 --- a/sda_orchestrator/complete_consume.py +++ b/sda_orchestrator/complete_consume.py @@ -61,6 +61,8 @@ async def _process_datasetID(self, user: str, filepath: str) -> str: else: LOG.error("Registering a DOI was not possible.") raise Exception("Registering a DOI was not possible.") + + datasetID = doi_obj["fullDOI"] else: datasetID = generate_dataset_id(user, filepath) except Exception as error: From 33802dfb57298d98cbdde73df15f86eb3cffc137 Mon Sep 17 00:00:00 2001 From: Stefan Negru Date: Sun, 31 Jan 2021 20:38:13 +0200 Subject: [PATCH 23/27] bump version to 0.5.0 --- sda_orchestrator/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sda_orchestrator/__init__.py b/sda_orchestrator/__init__.py index b5af6d2..9470904 100644 --- a/sda_orchestrator/__init__.py +++ b/sda_orchestrator/__init__.py @@ -1,5 +1,5 @@ """SDA Orchestrator service for coordinating messages and mapping file id to dataset id.""" __title__ = "sda_orchestrator" -__version__ = "0.4.0" +__version__ = "0.50" __author__ = "NeIC System Developers" From 61f11f20f50ae830f4a9617249d8dd5630be54dd Mon Sep 17 00:00:00 2001 From: Stefan Negru Date: Sun, 31 Jan 2021 20:53:46 +0200 Subject: [PATCH 24/27] publish DOI after registration in REMS catalogue --- sda_orchestrator/complete_consume.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sda_orchestrator/complete_consume.py b/sda_orchestrator/complete_consume.py index 4c68d85..31eefc9 100644 --- a/sda_orchestrator/complete_consume.py +++ b/sda_orchestrator/complete_consume.py @@ -46,6 +46,8 @@ async def _process_datasetID(self, user: str, filepath: str) -> str: """Process and generated dataset ID depending on environment variable set. If we make use of Datacite and REMS we need to check if env vars are set. + First we create a draft DOI then we register in REMS after which we publish + the DOI. """ datasetID: str = "" try: @@ -63,6 +65,7 @@ async def _process_datasetID(self, user: str, filepath: str) -> str: raise Exception("Registering a DOI was not possible.") datasetID = doi_obj["fullDOI"] + await doi_handler.set_doi_state("publish", doi_obj["suffix"]) else: datasetID = generate_dataset_id(user, filepath) except Exception as error: From f6da2c2af837e4a584fb21dd1c90049bf061b346 Mon Sep 17 00:00:00 2001 From: Stefan Negru Date: Mon, 1 Feb 2021 11:43:32 +0200 Subject: [PATCH 25/27] missing config for titlePrefix --- sda_orchestrator/config/config.json | 3 ++- sda_orchestrator/utils/id_ops.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sda_orchestrator/config/config.json b/sda_orchestrator/config/config.json index 51fc068..2700765 100644 --- a/sda_orchestrator/config/config.json +++ b/sda_orchestrator/config/config.json @@ -121,6 +121,7 @@ "subjectScheme": "Fields of Science and Technology (FOS)" } ], - "resourceURL": "http://data-access.sd.neic.no" + "resourceURL": "http://data-access.sd.neic.no", + "titlePrefix": "Resource" } } \ No newline at end of file diff --git a/sda_orchestrator/utils/id_ops.py b/sda_orchestrator/utils/id_ops.py index 6e32ed2..2e7eb75 100644 --- a/sda_orchestrator/utils/id_ops.py +++ b/sda_orchestrator/utils/id_ops.py @@ -118,7 +118,7 @@ async def set_doi_state(self, state: str, doi_suffix: str) -> Union[Dict, None]: "attributes": { "event": state, "doi": f"{self.doi_prefix}/{doi_suffix}", - "titles": [{"title": f"{CONFIG_INFO['datacite']['titlePrefix']}", "lang": "en"}], + "titles": [{"title": f"{CONFIG_INFO['datacite']['titlePrefix']} {doi_suffix}", "lang": "en"}], "publisher": CONFIG_INFO["datacite"]["publisher"], # will be current year "publicationYear": date.today().year, From 160fca6a1e153f014bd9fe776c7f928cf38cdee4 Mon Sep 17 00:00:00 2001 From: Stefan Negru Date: Mon, 1 Feb 2021 12:58:38 +0200 Subject: [PATCH 26/27] update resource don't post it --- sda_orchestrator/utils/id_ops.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/sda_orchestrator/utils/id_ops.py b/sda_orchestrator/utils/id_ops.py index 2e7eb75..168fe91 100644 --- a/sda_orchestrator/utils/id_ops.py +++ b/sda_orchestrator/utils/id_ops.py @@ -63,8 +63,8 @@ class DOIHandler: ``create_draft_doi`` generates the identifier using a 10 chars shortuuid from, which guarantee uniqueness based on the way we generate the dataset ID. - The ``set_doi_state`` can also be used to create a draft DOI, however its use is dependent on generating - a doi_suffix externally. + The ``set_doi_state`` is dependent on generating a doi_suffix as draft. + We do this if errors ocurr in registering the resource in REMS """ def __init__(self) -> None: @@ -120,6 +120,7 @@ async def set_doi_state(self, state: str, doi_suffix: str) -> Union[Dict, None]: "doi": f"{self.doi_prefix}/{doi_suffix}", "titles": [{"title": f"{CONFIG_INFO['datacite']['titlePrefix']} {doi_suffix}", "lang": "en"}], "publisher": CONFIG_INFO["datacite"]["publisher"], + "creators": CONFIG_INFO["datacite"]["creators"], # will be current year "publicationYear": date.today().year, # resource type is predefined as dataset @@ -138,11 +139,14 @@ async def set_doi_state(self, state: str, doi_suffix: str) -> Union[Dict, None]: } headers = Headers({"Content-Type": "application/json"}) async with AsyncClient() as client: - response = await client.post( - self.doi_api, auth=(self.doi_user, self.doi_key), json=publish_data_payload, headers=headers + response = await client.put( + f"{self.doi_api}/{self.doi_prefix}/{doi_suffix}", + auth=(self.doi_user, self.doi_key), + json=publish_data_payload, + headers=headers, ) doi_data = None - if response.status_code == 201: + if response.status_code == 200: publish_resp = response.json() LOG.debug(f"DOI created with state: {state} and response was: {publish_resp}") LOG.info(f"DOI created with doi: {publish_resp['data']['attributes']['doi']} with state {state}.") From 7ca4223a6b07443bb1f3b7a59322539a4cdca869 Mon Sep 17 00:00:00 2001 From: Stefan Negru Date: Mon, 1 Feb 2021 12:58:41 +0200 Subject: [PATCH 27/27] reflect DOI in catalogue item --- sda_orchestrator/utils/rems_ops.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sda_orchestrator/utils/rems_ops.py b/sda_orchestrator/utils/rems_ops.py index 49d9caa..2ccdf82 100644 --- a/sda_orchestrator/utils/rems_ops.py +++ b/sda_orchestrator/utils/rems_ops.py @@ -236,7 +236,7 @@ async def _catalogue_item(self, form_id: int, resource_id: int, workflow_id: int "organization": {"organization/id": self.config["organization"]["id"]}, "localizations": { "en": { - "title": f"Catalogue item for resource {resource_id}", + "title": f"Catalogue item for resource {doi}", "infourl": f"{self.config['catalogueItem']['baseResourceURL']}{doi}", }, },