Skip to content

Commit

Permalink
Merge pull request #7 from neicnordic/feature/dois
Browse files Browse the repository at this point in the history
Datacite DOIs and REMS integration
  • Loading branch information
blankdots authored Feb 1, 2021
2 parents 3356f81 + 7ca4223 commit 5f113a7
Show file tree
Hide file tree
Showing 14 changed files with 727 additions and 62 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.7-alpine3.11 as BUILD
FROM python:3.7-alpine3.12 as BUILD

RUN apk add --no-cache git gcc musl-dev libffi-dev make gnupg && \
rm -rf /var/cache/apk/*
Expand All @@ -11,7 +11,7 @@ RUN pip install --upgrade pip && \
pip install -r /root/sdaorch/requirements.txt && \
pip install /root/sdaorch

FROM python:3.7-alpine3.11
FROM python:3.7-alpine3.12

LABEL maintainer "NeIC System Developers"
LABEL org.label-schema.schema-version="1.0"
Expand Down
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
amqpstorm
jsonschema
jsonschema
httpx
shortuuid
2 changes: 1 addition & 1 deletion sda_orchestrator/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""SDA Orchestrator service for coordinating messages and mapping file id to dataset id."""

__title__ = "sda_orchestrator"
__version__ = "0.4.0"
__version__ = "0.50"
__author__ = "NeIC System Developers"
66 changes: 49 additions & 17 deletions sda_orchestrator/complete_consume.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
"""Message Broker complete step consumer."""
import json
from sda_orchestrator.utils.rems_ops import REMSHandler
from amqpstorm import Message
from .utils.consumer import Consumer
from .utils.logger import LOG
import os
from .utils.id_ops import generate_dataset_id
from os import environ
from .utils.id_ops import generate_dataset_id, DOIHandler
from jsonschema.exceptions import ValidationError
from .schemas.validate import ValidateJSON, load_schema
import asyncio


class CompleteConsumer(Consumer):
Expand All @@ -20,16 +22,16 @@ def handle_message(self, message: Message) -> None:
LOG.debug(f"MQ Message body: {message.body} .")
LOG.debug(f"Complete Consumer message received: {complete_msg} .")
LOG.info(
f"Received work (corr-id: {message.correlation_id} filepath: {complete_msg['filepath']}, \
user: {complete_msg['user']}, accessionid: {complete_msg['accession_id']}, \
decryptedChecksums: {complete_msg['decrypted_checksums']})",
f"Received work (corr-id: {message.correlation_id} filepath: {complete_msg['filepath']},"
f"user: {complete_msg['user']}, accessionid: {complete_msg['accession_id']},"
f"decryptedChecksums: {complete_msg['decrypted_checksums']})"
)

ValidateJSON(load_schema("ingestion-completion")).validate(complete_msg)

# Send message to mappings queue for dataset to file mapping
accessionID = complete_msg["accession_id"]
datasetID = generate_dataset_id(complete_msg["user"], complete_msg["filepath"])
datasetID = asyncio.run(self._process_datasetID(complete_msg["user"], complete_msg["filepath"]))
self._publish_mappings(message, accessionID, datasetID)

except ValidationError:
Expand All @@ -40,6 +42,38 @@ def handle_message(self, message: Message) -> None:
LOG.error(f"Error occurred in complete consumer: {error}.")
raise

async def _process_datasetID(self, user: str, filepath: str) -> str:
"""Process and generated dataset ID depending on environment variable set.
If we make use of Datacite and REMS we need to check if env vars are set.
First we create a draft DOI then we register in REMS after which we publish
the DOI.
"""
datasetID: str = ""
try:
if (
"DOI_PREFIX" in environ and "DOI_API" in environ and "DOI_USER" in environ and "DOI_KEY" in environ
) and ("REMS_API" in environ and "REMS_USER" in environ and "REMS_KEY" in environ):
doi_handler = DOIHandler()
rems = REMSHandler()
doi_obj = await doi_handler.create_draft_doi(user, filepath)
LOG.info(f"Registered dataset {doi_obj}.")
if doi_obj:
await rems.register_resource(doi_obj["fullDOI"])
else:
LOG.error("Registering a DOI was not possible.")
raise Exception("Registering a DOI was not possible.")

datasetID = doi_obj["fullDOI"]
await doi_handler.set_doi_state("publish", doi_obj["suffix"])
else:
datasetID = generate_dataset_id(user, filepath)
except Exception as error:
LOG.error(f"Could not process datasetID because of: {error}.")
raise
else:
return datasetID

def _publish_mappings(self, message: Message, accessionID: str, datasetID: str) -> None:
"""Publish message with dataset to accession ID mapping."""
properties = {
Expand All @@ -57,15 +91,13 @@ def _publish_mappings(self, message: Message, accessionID: str, datasetID: str)
ValidateJSON(load_schema("dataset-mapping")).validate(json.loads(mappings_msg))

mapping = Message.create(channel, mappings_msg, properties)
mapping.publish(
os.environ.get("MAPPINGS_QUEUE", "mappings"), exchange=os.environ.get("BROKER_EXCHANGE", "sda")
)
mapping.publish(environ.get("MAPPINGS_QUEUE", "mappings"), exchange=environ.get("BROKER_EXCHANGE", "sda"))

channel.close()

LOG.info(
f"Sent the message to mappings queue to set dataset ID {datasetID} for file \
with accessionID {accessionID}."
f"Sent the message to mappings queue to set dataset ID {datasetID} for file"
f"with accessionID {accessionID}."
)

except ValidationError:
Expand All @@ -76,12 +108,12 @@ def _publish_mappings(self, message: Message, accessionID: str, datasetID: str)
def main() -> None:
"""Run the Complete consumer."""
CONSUMER = CompleteConsumer(
hostname=str(os.environ.get("BROKER_HOST")),
port=int(os.environ.get("BROKER_PORT", 5670)),
username=os.environ.get("BROKER_USER", "sda"),
password=os.environ.get("BROKER_PASSWORD", ""),
queue=os.environ.get("COMPLETED_QUEUE", "completed"),
vhost=os.environ.get("BROKER_VHOST", "sda"),
hostname=str(environ.get("BROKER_HOST")),
port=int(environ.get("BROKER_PORT", 5670)),
username=environ.get("BROKER_USER", "sda"),
password=environ.get("BROKER_PASSWORD", ""),
queue=environ.get("COMPLETED_QUEUE", "completed"),
vhost=environ.get("BROKER_VHOST", "sda"),
)
CONSUMER.start()

Expand Down
27 changes: 27 additions & 0 deletions sda_orchestrator/config/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""SDA orchestrator configuration.
Configuration required for DOIs, REMS and other metadata.
"""

from os import environ, strerror
from pathlib import Path
from typing import Dict
import json
import errno

from ..utils.logger import LOG


def parse_config_file(config_file: str) -> Dict:
"""Load JSON schemas."""
file_path = Path(config_file)
if not file_path.is_file():
LOG.error(f"File {file_path} not found")
raise FileNotFoundError(errno.ENOENT, strerror(errno.ENOENT), file_path)
with open(file_path, "r") as fp:
return json.load(fp)


CONFIG_INFO = parse_config_file(
environ.get("CONFIG_FILE", str(Path(__file__).resolve().parent.joinpath("config.json")))
)
127 changes: 127 additions & 0 deletions sda_orchestrator/config/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
{
"rems": {
"organization": {
"id": "NeIC",
"name": {
"fi": "Nordic e-Infrastructure Collaboration",
"en": "Nordic e-Infrastructure Collaboration"
},
"shortName": {
"fi": "NeIC",
"en": "NeIC"
}
},
"license": {
"localizations": {
"en": {
"title": "Creative Commons Attribution 4.0 International (CC BY 4.0)",
"textcontent": "https://creativecommons.org/licenses/by/4.0/"
},
"fi": {
"title": "Nimeä 4.0 Kansainvälinen (CC BY 4.0)",
"textcontent": "https://creativecommons.org/licenses/by/4.0/deed.fi"
}
}
},
"form": {
"title": "Base form",
"fields": [
{
"field/id": "fld1",
"field/type": "header",
"field/title": {
"en": "Base form for registering a resource",
"fi": "Base form for registering a resource"
},
"field/optional": false
},
{
"field/id": "fld2",
"field/type": "description",
"field/title": {
"en": "Title of the application",
"fi": "Title of the application"
},
"field/optional": true,
"field/max-length": 100
},
{
"field/id": "fld3",
"field/type": "option",
"field/title": {
"en": "Select type of submision",
"fi": "Select type of submision"
},
"field/options": [
{
"key": "fega",
"label": {
"en": "FEGA (Federated EGA)",
"fi": "FEGA (Federated EGA)"
}
},
{
"key": "sda",
"label": {
"en": "SDA stand-alone",
"fi": "SDA stand-alone"
}
}
],
"field/optional": false
},
{
"field/id": "fld4",
"field/type": "texta",
"field/title": {
"en": "Stand-alone SDA archive use case description",
"fi": "Stand-alone SDA archive use case description"
},
"field/privacy": "private",
"field/optional": false,
"field/max-length": 200,
"field/visibility": {
"visibility/type": "only-if",
"visibility/field": {
"field/id": "fld3"
},
"visibility/values": [
"sda"
]
}
}
]
},
"workflow": {
"title": "Base workflow"
},
"catalogueItem": {
"baseResourceURL": "https://doi.org/"
}
},
"datacite": {
"creators": [
{
"name": "Nordic e-Infrastructure Collaboration",
"nameType": "Organizational",
"affiliation": [
{
"name": "Nordic e-Infrastructure Collaboration",
"schemeUri": "https://ror.org",
"affiliationIdentifier": "https://ror.org/04jcwf484",
"affiliationIdentifierScheme": "ROR"
}
]
}
],
"publisher": "Nordic e-Infrastructure Collaboration",
"subjects": [
{
"subject": "FOS: Biological sciences",
"subjectScheme": "Fields of Science and Technology (FOS)"
}
],
"resourceURL": "http://data-access.sd.neic.no",
"titlePrefix": "Resource"
}
}
20 changes: 10 additions & 10 deletions sda_orchestrator/inbox_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from amqpstorm import Message
from .utils.consumer import Consumer
from .utils.logger import LOG
import os
from os import environ
from pathlib import Path
from jsonschema.exceptions import ValidationError
from .schemas.validate import ValidateJSON, load_schema
Expand All @@ -21,8 +21,8 @@ def handle_message(self, message: Message) -> None:
LOG.debug(f"MQ Message body: {message.body} .")
LOG.debug(f"Inbox Consumer message received: {inbox_msg} .")
LOG.info(
f"Received work (corr-id: {message.correlation_id} filepath: {inbox_msg['filepath']}, \
user: {inbox_msg['user']} with operation: {inbox_msg['operation']})",
f"Received work (corr-id: {message.correlation_id} filepath: {inbox_msg['filepath']},"
f"user: {inbox_msg['user']} with operation: {inbox_msg['operation']})",
)

if inbox_msg["operation"] == "upload":
Expand Down Expand Up @@ -72,7 +72,7 @@ def _publish_ingest(self, message: Message, inbox_msg: Dict) -> None:

ingest = Message.create(channel, ingest_msg, properties)

ingest.publish(os.environ.get("INGEST_QUEUE", "ingest"), exchange=os.environ.get("BROKER_EXCHANGE", "sda"))
ingest.publish(environ.get("INGEST_QUEUE", "ingest"), exchange=environ.get("BROKER_EXCHANGE", "sda"))
channel.close()

LOG.info(f'Sent the message to ingest queue to trigger ingestion for filepath: {inbox_msg["filepath"]}.')
Expand All @@ -85,12 +85,12 @@ def _publish_ingest(self, message: Message, inbox_msg: Dict) -> None:
def main() -> None:
"""Run the Inbox consumer."""
CONSUMER = InboxConsumer(
hostname=str(os.environ.get("BROKER_HOST")),
port=int(os.environ.get("BROKER_PORT", 5670)),
username=os.environ.get("BROKER_USER", "sda"),
password=os.environ.get("BROKER_PASSWORD", ""),
queue=os.environ.get("INBOX_QUEUE", "inbox"),
vhost=os.environ.get("BROKER_VHOST", "sda"),
hostname=str(environ.get("BROKER_HOST")),
port=int(environ.get("BROKER_PORT", 5670)),
username=environ.get("BROKER_USER", "sda"),
password=environ.get("BROKER_PASSWORD", ""),
queue=environ.get("INBOX_QUEUE", "inbox"),
vhost=environ.get("BROKER_VHOST", "sda"),
)
CONSUMER.start()

Expand Down
12 changes: 6 additions & 6 deletions sda_orchestrator/utils/consumer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Message Broker Consumer class."""

import time
import os
from os import environ
import json
import ssl
from pathlib import Path
Expand Down Expand Up @@ -37,12 +37,12 @@ def __init__(
self.vhost = vhost
self.max_retries = max_retries
self.connection = None
self.ssl = bool(strtobool(os.environ.get("BROKER_SSL", "True")))
self.ssl = bool(strtobool(environ.get("BROKER_SSL", "True")))
context = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS)
context.check_hostname = False
cacertfile = Path(f"{os.environ.get('SSL_CACERT', '/tls/certs/ca.crt')}")
certfile = Path(f"{os.environ.get('SSL_CLIENTCERT', '/tls/certs/orch.crt')}")
keyfile = Path(f"{os.environ.get('SSL_CLIENTKEY', '/tls/certs/orch.key')}")
cacertfile = Path(environ.get("SSL_CACERT", "/tls/certs/ca.crt"))
certfile = Path(environ.get("SSL_CLIENTCERT", "/tls/certs/orch.crt"))
keyfile = Path(environ.get("SSL_CLIENTKEY", "/tls/certs/orch.key"))
context.verify_mode = ssl.CERT_NONE
# Require server verification
if cacertfile.exists():
Expand Down Expand Up @@ -131,7 +131,7 @@ def _error_message(self, message: Message, reason: str) -> None:
ValidateJSON(load_schema("ingestion-user-error")).validate(json.loads(error_msg))

error = Message.create(channel, error_msg, properties)
error.publish(os.environ.get("ERROR_QUEUE", "error"), exchange=os.environ.get("BROKER_EXCHANGE", "sda"))
error.publish(environ.get("ERROR_QUEUE", "error"), exchange=environ.get("BROKER_EXCHANGE", "sda"))

channel.close()

Expand Down
Loading

0 comments on commit 5f113a7

Please sign in to comment.