From 96645002e46f1c24f2bd17a316827904dfc4efaf Mon Sep 17 00:00:00 2001 From: j23414 Date: Thu, 17 Nov 2022 08:42:39 -0800 Subject: [PATCH] Remove bin/scripts duplication If a script does not need to be modified for a pathogen ingest, pull script during runtime instead of maintaining a potentially diverging copy. Use a permalink for each script to allow us to version the software we use in this workflow without being affected by upstream changes until we want to bump the version. This design adds more maintenance to this workflow, but it also protects users against unexpected issues that are outside of their control. Discussed in https://github.com/nextstrain/ebola/pull/6#discussion_r1048835183 However, this commit gets reverted later based on discussions. --- ingest/bin/apply-geolocation-rules | 234 ------------------ ingest/bin/cloudfront-invalidate | 42 ---- ingest/bin/csv-to-ndjson | 17 -- ingest/bin/download-from-s3 | 34 --- ingest/bin/fasta-to-ndjson | 86 ------- ingest/bin/join-metadata-and-clades.py | 77 ------ ingest/bin/merge-user-metadata | 55 ---- ingest/bin/ndjson-to-tsv-and-fasta | 66 ----- ingest/bin/notify-on-diff | 35 --- ingest/bin/notify-on-job-fail | 21 -- ingest/bin/notify-on-job-start | 24 -- ingest/bin/notify-on-record-change | 54 ---- ingest/bin/notify-slack | 58 ----- ingest/bin/reverse_reversed_sequences.py | 29 --- ingest/bin/s3-object-exists | 9 - ingest/bin/sha256sum | 16 -- ingest/bin/transform-authors | 66 ----- ingest/bin/transform-date-fields | 154 ------------ ingest/bin/transform-field-names | 48 ---- ingest/bin/transform-genbank-location | 43 ---- ingest/bin/transform-strain-names | 50 ---- ingest/bin/transform-string-fields | 84 ------- ingest/bin/trigger | 56 ----- ingest/bin/trigger-on-new-data | 30 --- ingest/bin/upload-to-s3 | 76 ------ .../snakemake_rules/fetch_sequences.smk | 21 +- .../snakemake_rules/slack_notifications.smk | 34 +++ ingest/workflow/snakemake_rules/transform.smk | 46 +++- .../snakemake_rules/trigger_rebuild.smk | 18 ++ ingest/workflow/snakemake_rules/upload.smk | 21 ++ 30 files changed, 138 insertions(+), 1466 deletions(-) delete mode 100755 ingest/bin/apply-geolocation-rules delete mode 100755 ingest/bin/cloudfront-invalidate delete mode 100755 ingest/bin/csv-to-ndjson delete mode 100755 ingest/bin/download-from-s3 delete mode 100755 ingest/bin/fasta-to-ndjson delete mode 100644 ingest/bin/join-metadata-and-clades.py delete mode 100755 ingest/bin/merge-user-metadata delete mode 100755 ingest/bin/ndjson-to-tsv-and-fasta delete mode 100755 ingest/bin/notify-on-diff delete mode 100755 ingest/bin/notify-on-job-fail delete mode 100755 ingest/bin/notify-on-job-start delete mode 100755 ingest/bin/notify-on-record-change delete mode 100755 ingest/bin/notify-slack delete mode 100644 ingest/bin/reverse_reversed_sequences.py delete mode 100755 ingest/bin/s3-object-exists delete mode 100755 ingest/bin/sha256sum delete mode 100755 ingest/bin/transform-authors delete mode 100755 ingest/bin/transform-date-fields delete mode 100755 ingest/bin/transform-field-names delete mode 100755 ingest/bin/transform-genbank-location delete mode 100755 ingest/bin/transform-strain-names delete mode 100755 ingest/bin/transform-string-fields delete mode 100755 ingest/bin/trigger delete mode 100755 ingest/bin/trigger-on-new-data delete mode 100755 ingest/bin/upload-to-s3 diff --git a/ingest/bin/apply-geolocation-rules b/ingest/bin/apply-geolocation-rules deleted file mode 100755 index 79f34f5b..00000000 --- a/ingest/bin/apply-geolocation-rules +++ /dev/null @@ -1,234 +0,0 @@ -#!/usr/bin/env python3 -""" -Applies user curated geolocation rules to the geolocation fields in the NDJSON -records from stdin. The modified records are output to stdout. This does not do -any additional transformations on top of the user curations. -""" -import argparse -import json -from collections import defaultdict -from sys import exit, stderr, stdin, stdout - - -class CyclicGeolocationRulesError(Exception): - pass - - -def load_geolocation_rules(geolocation_rules_file): - """ - Loads the geolocation rules from the provided *geolocation_rules_file*. - Returns the rules as a dict: - { - regions: { - countries: { - divisions: { - locations: corrected_geolocations_tuple - } - } - } - } - """ - geolocation_rules = defaultdict(lambda: defaultdict(lambda: defaultdict(dict))) - with open(geolocation_rules_file, 'r') as rules_fh: - for line in rules_fh: - # ignore comments - if line.lstrip()[0] == '#': - continue - - row = line.strip('\n').split('\t') - # Skip lines that cannot be split into raw and annotated geolocations - if len(row) != 2: - print( - f"WARNING: Could not decode geolocation rule {line!r}.", - "Please make sure rules are formatted as", - "'region/country/division/locationregion/country/division/location'.", - file=stderr) - continue - - # remove trailing comments - row[-1] = row[-1].partition('#')[0].rstrip() - raw , annot = tuple( row[0].split('/') ) , tuple( row[1].split('/') ) - - # Skip lines where raw or annotated geolocations cannot be split into 4 fields - if len(raw) != 4: - print( - f"WARNING: Could not decode the raw geolocation {row[0]!r}.", - "Please make sure it is formatted as 'region/country/division/location'.", - file=stderr - ) - continue - - if len(annot) != 4: - print( - f"WARNING: Could not decode the annotated geolocation {row[1]!r}.", - "Please make sure it is formatted as 'region/country/division/location'.", - file=stderr - ) - continue - - - geolocation_rules[raw[0]][raw[1]][raw[2]][raw[3]] = annot - - return geolocation_rules - - -def get_annotated_geolocation(geolocation_rules, raw_geolocation, rule_traversal = None): - """ - Gets the annotated geolocation for the *raw_geolocation* in the provided - *geolocation_rules*. - - Recursively traverses the *geolocation_rules* until we get the annotated - geolocation, which must be a Tuple. Returns `None` if there are no - applicable rules for the provided *raw_geolocation*. - - Rules are applied in the order of region, country, division, location. - First checks the provided raw values for geolocation fields, then if there - are not matches, tries to use general rules marked with '*'. - """ - # Always instantiate the rule traversal as an empty list if not provided, - # e.g. the first call of this recursive function - if rule_traversal is None: - rule_traversal = [] - - current_rules = geolocation_rules - # Traverse the geolocation rules based using the rule_traversal values - for field_value in rule_traversal: - current_rules = current_rules.get(field_value) - # If we hit `None`, then we know there are no matching rules, so stop the rule traversal - if current_rules is None: - break - - # We've found the tuple of the annotated geolocation - if isinstance(current_rules, tuple): - return current_rules - - # We've reach the next level of geolocation rules, - # so try to traverse the rules with the next target in raw_geolocation - if isinstance(current_rules, dict): - next_traversal_target = raw_geolocation[len(rule_traversal)] - rule_traversal.append(next_traversal_target) - return get_annotated_geolocation(geolocation_rules, raw_geolocation, rule_traversal) - - # We did not find any matching rule for the last traversal target - if current_rules is None: - # If we've used all general rules and we still haven't found a match, - # then there are no applicable rules for this geolocation - if all(value == '*' for value in rule_traversal): - return None - - # If we failed to find matching rule with a general rule as the last - # traversal target, then delete all trailing '*'s to reset rule_traversal - # to end with the last index that is currently NOT a '*' - # [A, *, B, *] => [A, *, B] - # [A, B, *, *] => [A, B] - # [A, *, *, *] => [A] - if rule_traversal[-1] == '*': - # Find the index of the first of the consecutive '*' from the - # end of the rule_traversal - # [A, *, B, *] => first_consecutive_general_rule_index = 3 - # [A, B, *, *] => first_consecutive_general_rule_index = 2 - # [A, *, *, *] => first_consecutive_general_rule_index = 1 - for index, field_value in reversed(list(enumerate(rule_traversal))): - if field_value == '*': - first_consecutive_general_rule_index = index - else: - break - - rule_traversal = rule_traversal[:first_consecutive_general_rule_index] - - # Set the final value to '*' in hopes that by moving to a general rule, - # we can find a matching rule. - rule_traversal[-1] = '*' - - return get_annotated_geolocation(geolocation_rules, raw_geolocation, rule_traversal) - - -def transform_geolocations(geolocation_rules, geolocation): - """ - Transform the provided *geolocation* by looking it up in the provided - *geolocation_rules*. - - This will use all rules that apply to the geolocation and rules will - be applied in the order of region, country, division, location. - - Returns the original geolocation if no geolocation rules apply. - - Raises a `CyclicGeolocationRulesError` if more than 1000 rules have - been applied to the raw geolocation. - """ - transformed_values = geolocation - rules_applied = 0 - continue_to_apply = True - - while continue_to_apply: - annotated_values = get_annotated_geolocation(geolocation_rules, transformed_values) - - # Stop applying rules if no annotated values were found - if annotated_values is None: - continue_to_apply = False - else: - rules_applied += 1 - - if rules_applied > 1000: - raise CyclicGeolocationRulesError( - "ERROR: More than 1000 geolocation rules applied on the same entry {geolocation!r}." - ) - - # Create a new list of values for comparison to previous values - new_values = list(transformed_values) - for index, value in enumerate(annotated_values): - # Keep original value if annotated value is '*' - if value != '*': - new_values[index] = value - - # Stop applying rules if this rule did not change the values, - # since this means we've reach rules with '*' that no longer change values - if new_values == transformed_values: - continue_to_apply = False - - transformed_values = new_values - - return transformed_values - - -if __name__ == '__main__': - parser = argparse.ArgumentParser( - description=__doc__, - formatter_class=argparse.ArgumentDefaultsHelpFormatter - ) - parser.add_argument("--region-field", default="region", - help="Field that contains regions in NDJSON records.") - parser.add_argument("--country-field", default="country", - help="Field that contains countries in NDJSON records.") - parser.add_argument("--division-field", default="division", - help="Field that contains divisions in NDJSON records.") - parser.add_argument("--location-field", default="location", - help="Field that contains location in NDJSON records.") - parser.add_argument("--geolocation-rules", metavar="TSV", required=True, - help="TSV file of geolocation rules with the format: " + - "'' where the raw and annotated geolocations " + - "are formatted as '///'. " + - "If creating a general rule, then the raw field value can be substituted with '*'." + - "Lines starting with '#' will be ignored as comments." + - "Trailing '#' will be ignored as comments.") - - args = parser.parse_args() - - location_fields = [args.region_field, args.country_field, args.division_field, args.location_field] - - geolocation_rules = load_geolocation_rules(args.geolocation_rules) - - for record in stdin: - record = json.loads(record) - - try: - annotated_values = transform_geolocations(geolocation_rules, [record[field] for field in location_fields]) - except CyclicGeolocationRulesError as e: - print(e, file=stderr) - exit(1) - - for index, field in enumerate(location_fields): - record[field] = annotated_values[index] - - json.dump(record, stdout, allow_nan=False, indent=None, separators=',:') - print() diff --git a/ingest/bin/cloudfront-invalidate b/ingest/bin/cloudfront-invalidate deleted file mode 100755 index dec48529..00000000 --- a/ingest/bin/cloudfront-invalidate +++ /dev/null @@ -1,42 +0,0 @@ -#!/bin/bash -# Originally from @tsibley's gist: https://gist.github.com/tsibley/a66262d341dedbea39b02f27e2837ea8 -set -euo pipefail - -main() { - local domain="$1" - shift - local paths=("$@") - local distribution invalidation - - echo "-> Finding CloudFront distribution" - distribution=$( - aws cloudfront list-distributions \ - --query "DistributionList.Items[?contains(Aliases.Items, \`$domain\`)] | [0].Id" \ - --output text - ) - - if [[ -z $distribution || $distribution == None ]]; then - exec >&2 - echo "Unable to find CloudFront distribution id for $domain" - echo - echo "Are your AWS CLI credentials for the right account?" - exit 1 - fi - - echo "-> Creating CloudFront invalidation for distribution $distribution" - invalidation=$( - aws cloudfront create-invalidation \ - --distribution-id "$distribution" \ - --paths "${paths[@]}" \ - --query Invalidation.Id \ - --output text - ) - - echo "-> Waiting for CloudFront invalidation $invalidation to complete" - echo " Ctrl-C to stop waiting." - aws cloudfront wait invalidation-completed \ - --distribution-id "$distribution" \ - --id "$invalidation" -} - -main "$@" diff --git a/ingest/bin/csv-to-ndjson b/ingest/bin/csv-to-ndjson deleted file mode 100755 index 86e84127..00000000 --- a/ingest/bin/csv-to-ndjson +++ /dev/null @@ -1,17 +0,0 @@ -#!/usr/bin/env python3 -""" -Copied from "bin/csv-to-ndjson" in nextstrain/ncov-ingest: -https://github.com/nextstrain/ncov-ingest/blob/2a5f255329ee5bdf0cabc8b8827a700c92becbe4/bin/csv-to-ndjson - -Convert CSV on stdin to NDJSON on stdout. -""" -import csv -import json -from sys import stdin, stdout - -# 200 MiB; default is 128 KiB -csv.field_size_limit(200 * 1024 * 1024) - -for row in csv.DictReader(stdin): - json.dump(row, stdout, allow_nan = False, indent = None, separators = ',:') - print() diff --git a/ingest/bin/download-from-s3 b/ingest/bin/download-from-s3 deleted file mode 100755 index c9dbab52..00000000 --- a/ingest/bin/download-from-s3 +++ /dev/null @@ -1,34 +0,0 @@ -#!/bin/bash -# Originally copied from nextstrain/ncov-ingest repo -set -euo pipefail - -bin="$(dirname "$0")" - -main() { - local src="${1:?A source s3:// URL is required as the first argument.}" - local dst="${2:?A destination file path is required as the second argument.}" - - local s3path="${src#s3://}" - local bucket="${s3path%%/*}" - local key="${s3path#*/}" - - local src_hash dst_hash no_hash=0000000000000000000000000000000000000000000000000000000000000000 - dst_hash="$("$bin/sha256sum" < "$dst" || true)" - src_hash="$(aws s3api head-object --bucket "$bucket" --key "$key" --query Metadata.sha256sum --output text 2>/dev/null || echo "$no_hash")" - - echo "[ INFO] Downloading $src → $dst" - if [[ $src_hash != "$dst_hash" ]]; then - aws s3 cp --no-progress "$src" - | - if [[ "$src" == *.gz ]]; then - gunzip -cfq - elif [[ "$src" == *.xz ]]; then - xz -T0 -dcq - else - cat - fi > "$dst" - else - echo "[ INFO] Files are identical, skipping download" - fi -} - -main "$@" diff --git a/ingest/bin/fasta-to-ndjson b/ingest/bin/fasta-to-ndjson deleted file mode 100755 index 1ee9f8f6..00000000 --- a/ingest/bin/fasta-to-ndjson +++ /dev/null @@ -1,86 +0,0 @@ -#!/usr/bin/env python3 -""" -Parse delimited fields from FASTA header into NDJSON format to stdout. -The output NDJSON records are guaranteed to have at least two fields: - 1. strain - 2. sequence - -Uses the `augur.io.read_sequences` function to read the FASTA file, -so `augur` must be installed in the environment running the script. -""" - -import argparse -import json -import sys - -from augur.io import read_sequences - - -if __name__ == '__main__': - parser = argparse.ArgumentParser( - description=__doc__, - formatter_class=argparse.ArgumentDefaultsHelpFormatter - ) - parser.add_argument("--fasta", required=True, - help="FASTA file to be transformed into NDJSON format") - parser.add_argument("--fields", nargs="+", - help="Fields in the FASTA header, listed in the same order as the header. " + - "These will be used as the keys in the final NDJSON output. " + - "One of the fields must be 'strain'. " + - "These cannot include the field 'sequence' as this field is reserved for the genomic sequence.") - parser.add_argument("--separator", default='|', - help="Field separator in the FASTA header") - parser.add_argument("--exclude", nargs="*", - help="List of fields to exclude from final NDJSON record. " - "These cannot include 'strain' or 'sequence'.") - - args = parser.parse_args() - - fasta_fields = [field.lower() for field in args.fields] - - exclude_fields = [] - if args.exclude: - exclude_fields = [field.lower() for field in args.exclude] - - passed_checks = True - - if 'strain' not in fasta_fields: - print("ERROR: FASTA fields must include a 'strain' field.", file=sys.stderr) - passed_checks = False - - if 'sequence' in fasta_fields: - print("ERROR: FASTA fields cannot include a 'sequence' field.", file=sys.stderr) - passed_checks = False - - if 'strain' in exclude_fields: - print("ERROR: The field 'strain' cannot be excluded from the output.", file=sys.stderr) - passed_checks = False - - if 'sequence' in exclude_fields: - print("ERROR: The field 'sequence' cannot be excluded from the output.", file=sys.stderr) - passed_checks = False - - missing_fields = [field for field in exclude_fields if field not in fasta_fields] - if missing_fields: - print(f"ERROR: The following exclude fields do not match any FASTA fields: {missing_fields}", file=sys.stderr) - passed_checks = False - - if not passed_checks: - print("ERROR: Failed to parse FASTA file into NDJSON records.","See detailed errors above.", file=sys.stderr) - sys.exit(1) - - sequences = read_sequences(args.fasta) - - for sequence in sequences: - field_values = [ - value.strip() - for value in sequence.description.split(args.separator) - ] - record = dict(zip(fasta_fields, field_values)) - record['sequence'] = str(sequence.seq).upper() - - for field in exclude_fields: - del record[field] - - json.dump(record, sys.stdout, allow_nan=False, indent=None, separators=',:') - print() diff --git a/ingest/bin/join-metadata-and-clades.py b/ingest/bin/join-metadata-and-clades.py deleted file mode 100644 index 99ed7328..00000000 --- a/ingest/bin/join-metadata-and-clades.py +++ /dev/null @@ -1,77 +0,0 @@ -#!/usr/bin/env python3 -import argparse -import re -import sys -import pandas as pd - -NEXTCLADE_JOIN_COLUMN_NAME = 'seqName' -VALUE_MISSING_DATA = '?' - -column_map = { - "clade": "clade", - "outbreak": "outbreak", - "lineage": "lineage", - "coverage": "coverage", - "totalMissing": "missing_data", - "totalSubstitutions": "divergence", - "totalNonACGTNs": "nonACGTN", - "qc.missingData.status": "QC_missing_data", - "qc.mixedSites.status": "QC_mixed_sites", - "qc.privateMutations.status": "QC_rare_mutations", - "qc.frameShifts.status": "QC_frame_shifts", - "qc.stopCodons.status": "QC_stop_codons", - "frameShifts": "frame_shifts", - "isReverseComplement": "is_reverse_complement", -# "deletions": "deletions", -# "insertions": "insertions" -# "substitutions": "substitutions", -# "aaSubstitutions": "aaSubstitutions" -} - - -def parse_args(): - parser = argparse.ArgumentParser( - description="Joins metadata file with Nextclade clade output", - ) - parser.add_argument("--metadata") - parser.add_argument("--nextclade") - parser.add_argument("--id-field") - parser.add_argument("-o", default=sys.stdout) - return parser.parse_args() - -def main(): - args = parse_args() - - metadata = pd.read_csv(args.metadata, index_col=args.id_field, - sep='\t', low_memory=False, na_filter = False) - - # Read and rename clade column to be more descriptive - clades = pd.read_csv(args.nextclade, index_col=NEXTCLADE_JOIN_COLUMN_NAME, - sep='\t', low_memory=False, na_filter = False) \ - .rename(columns=column_map) - - clades.index = clades.index.map(lambda x: re.sub(" \|.*", "", x)) - - # Select columns in column map - clades = clades[list(column_map.values())] - - # Separate long from short columns - short_metadata = metadata.iloc[:,:-2].copy() - long_metadata = metadata.iloc[:,-2:].copy() - - # Concatenate on columns - result = pd.merge( - short_metadata, clades, - left_index=True, - right_index=True, - how='left' - ) - - # Add long columns to back - result = pd.concat([result, long_metadata], axis=1) - - result.to_csv(args.o, index_label=args.id_field, sep='\t') - - -if __name__ == '__main__': - main() diff --git a/ingest/bin/merge-user-metadata b/ingest/bin/merge-user-metadata deleted file mode 100755 index 341c2dfa..00000000 --- a/ingest/bin/merge-user-metadata +++ /dev/null @@ -1,55 +0,0 @@ -#!/usr/bin/env python3 -""" -Merges user curated annotations with the NDJSON records from stdin, with the user -curations overwriting the existing fields. The modified records are output -to stdout. This does not do any additional transformations on top of the user -curations. -""" -import argparse -import csv -import json -from collections import defaultdict -from sys import exit, stdin, stderr, stdout - - -if __name__ == '__main__': - parser = argparse.ArgumentParser( - description=__doc__, - formatter_class=argparse.ArgumentDefaultsHelpFormatter - ) - parser.add_argument("--annotations", metavar="TSV", required=True, - help="Manually curated annotations TSV file. " + - "The TSV should not have a header and should have exactly three columns: " + - "id to match existing metadata, field name, and field value. " + - "If there are multiple annotations for the same id and field, then the last value is used. " + - "Lines starting with '#' are treated as comments. " + - "Any '#' after the field value are treated as comments.") - parser.add_argument("--id-field", default="accession", - help="The ID field in the metadata to use to merge with the annotations.") - - args = parser.parse_args() - - annotations = defaultdict(dict) - with open(args.annotations, 'r') as annotations_fh: - csv_reader = csv.reader(annotations_fh, delimiter='\t') - for row in csv_reader: - if not row or row[0].lstrip()[0] == '#': - continue - elif len(row) != 3: - print("WARNING: Could not decode annotation line " + "\t".join(row), file=stderr) - continue - id, field, value = row - annotations[id][field] = value.partition('#')[0].rstrip() - - for record in stdin: - record = json.loads(record) - - record_id = record.get(args.id_field) - if record_id is None: - print(f"ERROR: ID field {args.id_field!r} does not exist in record", file=stderr) - exit(1) - - record.update(annotations.get(record_id, {})) - - json.dump(record, stdout, allow_nan=False, indent=None, separators=',:') - print() diff --git a/ingest/bin/ndjson-to-tsv-and-fasta b/ingest/bin/ndjson-to-tsv-and-fasta deleted file mode 100755 index 017bcc00..00000000 --- a/ingest/bin/ndjson-to-tsv-and-fasta +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/env python3 -""" -Parses NDJSON records from stdin to two different files: a metadata TSV and a -sequences FASTA. - -Records that do not have an ID or sequence will be excluded from the output files. -""" -import argparse -import csv -import json -from sys import stderr, stdin - - -if __name__ == '__main__': - parser = argparse.ArgumentParser( - description=__doc__, - formatter_class=argparse.ArgumentDefaultsHelpFormatter - ) - parser.add_argument("--metadata", metavar="TSV", default="data/metadata.tsv", - help="The output metadata TSV file") - parser.add_argument("--fasta", metavar="FASTA", default="data/sequences.fasta", - help="The output sequences FASTA file") - parser.add_argument("--metadata-columns", nargs="+", - help="List of fields from the NDJSON records to include as columns in the metadata TSV. " + - "Metadata TSV columns will be in the order of the columns provided.") - parser.add_argument("--id-field", default='strain', - help="Field from the records to use as the sequence ID in the FASTA file.") - parser.add_argument("--sequence-field", default='sequence', - help="Field from the record that holds the genomic sequence for the FASTA file.") - - args = parser.parse_args() - - with open(args.metadata, 'wt') as metadata_output: - with open(args.fasta, 'wt') as fasta_output: - metadata_csv = csv.DictWriter( - metadata_output, - args.metadata_columns, - restval="", - extrasaction='ignore', - delimiter='\t' - ) - metadata_csv.writeheader() - - for index, record in enumerate(stdin): - record = json.loads(record) - - sequence_id = str(record.get(args.id_field, '')) - sequence = str(record.get(args.sequence_field, '')) - - if not sequence_id: - print( - f"WARNING: Record number {index} does not have a sequence ID.", - "This record will be excluded from the output files.", - file=stderr - ) - elif not sequence: - print( - f"WARNING: Record number {index} does not have a sequence.", - "This record will be excluded from the output files.", - file=stderr - ) - else: - metadata_csv.writerow(record) - - print(f">{sequence_id}", file=fasta_output) - print(f"{sequence}" , file= fasta_output) diff --git a/ingest/bin/notify-on-diff b/ingest/bin/notify-on-diff deleted file mode 100755 index c304d6b5..00000000 --- a/ingest/bin/notify-on-diff +++ /dev/null @@ -1,35 +0,0 @@ -#!/bin/bash - -set -euo pipefail - -: "${SLACK_TOKEN:?The SLACK_TOKEN environment variable is required.}" -: "${SLACK_CHANNELS:?The SLACK_CHANNELS environment variable is required.}" - -bin="$(dirname "$0")" - -src="${1:?A source file is required as the first argument.}" -dst="${2:?A destination s3:// URL is required as the second argument.}" - -dst_local="$(mktemp -t s3-file-XXXXXX)" -diff="$(mktemp -t diff-XXXXXX)" - -trap "rm -f '$dst_local' '$diff'" EXIT - -# if the file is not already present, just exit -"$bin"/s3-object-exists "$dst" || exit 0 - -"$bin"/download-from-s3 "$dst" "$dst_local" - -# diff's exit code is 0 for no differences, 1 for differences found, and >1 for errors -diff_exit_code=0 -diff "$dst_local" "$src" > "$diff" || diff_exit_code=$? - -if [[ "$diff_exit_code" -eq 1 ]]; then - echo "Notifying Slack about diff." - "$bin"/notify-slack --upload "$src.diff" < "$diff" -elif [[ "$diff_exit_code" -gt 1 ]]; then - echo "Notifying Slack about diff failure" - "$bin"/notify-slack "Diff failed for $src" -else - echo "No change in $src." -fi diff --git a/ingest/bin/notify-on-job-fail b/ingest/bin/notify-on-job-fail deleted file mode 100755 index 23d3a926..00000000 --- a/ingest/bin/notify-on-job-fail +++ /dev/null @@ -1,21 +0,0 @@ -#!/bin/bash -set -euo pipefail - -: "${SLACK_TOKEN:?The SLACK_TOKEN environment variable is required.}" -: "${SLACK_CHANNELS:?The SLACK_CHANNELS environment variable is required.}" - -: "${AWS_BATCH_JOB_ID:=}" -: "${GITHUB_RUN_ID:=}" - -bin="$(dirname "$0")" - -echo "Notifying Slack about failed ingest job." -message="❌ Ingest job has FAILED 😞 " - -if [ -n "${AWS_BATCH_JOB_ID}" ]; then - message+="See AWS Batch job \`${AWS_BATCH_JOB_ID}\` () for error details. " -elif [ -n "${GITHUB_RUN_ID}" ]; then - message+="See GitHub Action for error details. " -fi - -"$bin"/notify-slack "$message" diff --git a/ingest/bin/notify-on-job-start b/ingest/bin/notify-on-job-start deleted file mode 100755 index 9410fa38..00000000 --- a/ingest/bin/notify-on-job-start +++ /dev/null @@ -1,24 +0,0 @@ -#!/bin/bash -set -euo pipefail - -: "${SLACK_TOKEN:?The SLACK_TOKEN environment variable is required.}" -: "${SLACK_CHANNELS:?The SLACK_CHANNELS environment variable is required.}" - -: "${AWS_BATCH_JOB_ID:=}" -: "${GITHUB_RUN_ID:=}" - -bin="$(dirname "$0")" - -echo "Notifying Slack about started ingest job." -message="🐵 Monkeypox ingest job has started." - -if [[ -n "${GITHUB_RUN_ID}" ]]; then - message+=" The job was submitted by GitHub Action ." -fi - -if [[ -n "${AWS_BATCH_JOB_ID}" ]]; then - message+=" The job was launched as AWS Batch job \`${AWS_BATCH_JOB_ID}\` ()." - message+=" Follow along in your local \`monkeypox\` repo with: "'```'"nextstrain build --aws-batch --no-download --attach ${AWS_BATCH_JOB_ID} ingest/"'```' -fi - -"$bin"/notify-slack "$message" diff --git a/ingest/bin/notify-on-record-change b/ingest/bin/notify-on-record-change deleted file mode 100755 index 595835b5..00000000 --- a/ingest/bin/notify-on-record-change +++ /dev/null @@ -1,54 +0,0 @@ -#!/bin/bash -# Originally copied from nextstrain/ncov-ingest -set -euo pipefail - -: "${SLACK_TOKEN:?The SLACK_TOKEN environment variable is required.}" -: "${SLACK_CHANNELS:?The SLACK_CHANNELS environment variable is required.}" - -bin="$(dirname "$0")" - -src="${1:?A source ndjson file is required as the first argument.}" -dst="${2:?A destination ndjson s3:// URL is required as the second argument.}" -source_name=${3:?A record source name is required as the third argument.} - -# if the file is not already present, just exit -"$bin"/s3-object-exists "$dst" || exit 0 - -s3path="${dst#s3://}" -bucket="${s3path%%/*}" -key="${s3path#*/}" - -src_record_count="$(wc -l < "$src")" - -# Try getting record count from S3 object metadata -dst_record_count="$(aws s3api head-object --bucket "$bucket" --key "$key" --query "Metadata.recordcount || ''" --output text 2>/dev/null || true)" -if [[ -z "$dst_record_count" ]]; then - # This object doesn't have the record count stored as metadata - # We have to download it and count the lines locally - dst_record_count="$(wc -l < <(aws s3 cp --no-progress "$dst" - | xz -T0 -dcfq))" -fi - -added_records="$(( src_record_count - dst_record_count ))" - -printf "%'4d %s\n" "$src_record_count" "$src" -printf "%'4d %s\n" "$dst_record_count" "$dst" -printf "%'4d added records\n" "$added_records" - -slack_message="" - -if [[ $added_records -gt 0 ]]; then - echo "Notifying Slack about added records (n=$added_records)" - slack_message="📈 New monkeypox records (n=$added_records) found on $source_name." - -elif [[ $added_records -lt 0 ]]; then - echo "Notifying Slack about fewer records (n=$added_records)" - slack_message="📉 Fewer monkeypox records (n=$added_records) found on $source_name." - -else - echo "Notifying Slack about same number of records" - slack_message="⛔ No new monkeypox records found on $source_name." -fi - -slack_message+=" (Total record count: $src_record_count)" - -"$bin"/notify-slack "$slack_message" diff --git a/ingest/bin/notify-slack b/ingest/bin/notify-slack deleted file mode 100755 index 6ca20dec..00000000 --- a/ingest/bin/notify-slack +++ /dev/null @@ -1,58 +0,0 @@ -#!/bin/bash -# Originally copied from nextstrain/ncov-ingest repo -set -euo pipefail - -: "${SLACK_TOKEN:?The SLACK_TOKEN environment variable is required.}" -: "${SLACK_CHANNELS:?The SLACK_CHANNELS environment variable is required.}" - -upload=0 -output=/dev/null -thread_ts="" -broadcast=0 -args=() - -for arg; do - case "$arg" in - --upload) - upload=1;; - --output=*) - output="${arg#*=}";; - --thread-ts=*) - thread_ts="${arg#*=}";; - --broadcast) - broadcast=1;; - *) - args+=("$arg");; - esac -done - -set -- "${args[@]}" - -text="${1:?Some message text is required.}" - -if [[ "$upload" == 1 ]]; then - echo "Uploading data to Slack with the message: $text" - curl https://slack.com/api/files.upload \ - --header "Authorization: Bearer $SLACK_TOKEN" \ - --form-string channels="$SLACK_CHANNELS" \ - --form-string title="$text" \ - --form-string filename="$text" \ - --form-string thread_ts="$thread_ts" \ - --form-string reply_broadcast="$broadcast" \ - --form file=@/dev/stdin \ - --form filetype=text \ - --fail --silent --show-error \ - --http1.1 \ - --output "$output" -else - echo "Posting Slack message: $text" - curl https://slack.com/api/chat.postMessage \ - --header "Authorization: Bearer $SLACK_TOKEN" \ - --form-string channel="$SLACK_CHANNELS" \ - --form-string text="$text" \ - --form-string thread_ts="$thread_ts" \ - --form-string reply_broadcast="$broadcast" \ - --fail --silent --show-error \ - --http1.1 \ - --output "$output" -fi diff --git a/ingest/bin/reverse_reversed_sequences.py b/ingest/bin/reverse_reversed_sequences.py deleted file mode 100644 index 1ee9be22..00000000 --- a/ingest/bin/reverse_reversed_sequences.py +++ /dev/null @@ -1,29 +0,0 @@ -import pandas as pd -import argparse -from Bio import SeqIO - -if __name__=="__main__": - parser = argparse.ArgumentParser( - description="Reverse-complement reverse-complemented sequence", - formatter_class=argparse.ArgumentDefaultsHelpFormatter - ) - - parser.add_argument('--metadata', type=str, required=True, help="input metadata") - parser.add_argument('--sequences', type=str, required=True, help="input sequences") - parser.add_argument('--output', type=str, required=True, help="output sequences") - args = parser.parse_args() - - metadata = pd.read_csv(args.metadata, sep='\t') - - # Read in fasta file - with open(args.sequences, 'r') as f_in: - with open(args.output, 'w') as f_out: - for seq in SeqIO.parse(f_in, 'fasta'): - # Check if metadata['reverse'] is True - if metadata.loc[metadata['accession'] == seq.id, 'reverse'].values[0] == True: - # Reverse-complement sequence - seq.seq = seq.seq.reverse_complement() - print("Reverse-complementing sequence:", seq.id) - - # Write sequences to file - SeqIO.write(seq, f_out, 'fasta') diff --git a/ingest/bin/s3-object-exists b/ingest/bin/s3-object-exists deleted file mode 100755 index d586d0b8..00000000 --- a/ingest/bin/s3-object-exists +++ /dev/null @@ -1,9 +0,0 @@ -#!/bin/bash -# Originally copied from nextstrain/ncov-ingest -set -euo pipefail - -url="${1#s3://}" -bucket="${url%%/*}" -key="${url#*/}" - -aws s3api head-object --bucket "$bucket" --key "$key" &>/dev/null diff --git a/ingest/bin/sha256sum b/ingest/bin/sha256sum deleted file mode 100755 index aa05af00..00000000 --- a/ingest/bin/sha256sum +++ /dev/null @@ -1,16 +0,0 @@ -#!/usr/bin/env python3 -# Originally copied from nextstrain/ncov-ingest repo -""" -Portable sha256sum utility. -""" -from hashlib import sha256 -from sys import stdin - -chunk_size = 5 * 1024**2 # 5 MiB - -h = sha256() - -for chunk in iter(lambda: stdin.buffer.read(chunk_size), b""): - h.update(chunk) - -print(h.hexdigest()) diff --git a/ingest/bin/transform-authors b/ingest/bin/transform-authors deleted file mode 100755 index 0bade20e..00000000 --- a/ingest/bin/transform-authors +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/env python3 -""" -Abbreviates a full list of authors to be ' et al.' of the NDJSON -record from stdin and outputs modified records to stdout. - -Note: This is a "best effort" approach and can potentially mangle the author name. -""" -import argparse -import json -import re -from sys import stderr, stdin, stdout - - -def parse_authors(record: dict, authors_field: str, default_value: str, - index: int, abbr_authors_field: str = None) -> dict: - # Strip and normalize whitespace - new_authors = re.sub(r'\s+', ' ', record[authors_field]) - - if new_authors == "": - new_authors = default_value - else: - # Split authors list on comma/semicolon - # OR "and"/"&" with at least one space before and after - new_authors = re.split(r'(?:\s*[,,;;]\s*|\s+(?:and|&)\s+)', new_authors)[0] - - # if it does not already end with " et al.", add it - if not new_authors.strip('. ').endswith(" et al"): - new_authors += ' et al' - - if abbr_authors_field: - if record.get(abbr_authors_field): - print( - f"WARNING: the {abbr_authors_field!r} field already exists", - f"in record {index} and will be overwritten!", - file=stderr - ) - - record[abbr_authors_field] = new_authors - else: - record[authors_field] = new_authors - - return record - - -if __name__ == '__main__': - parser = argparse.ArgumentParser( - description=__doc__, - formatter_class=argparse.ArgumentDefaultsHelpFormatter - ) - parser.add_argument("--authors-field", default="authors", - help="The field containing list of authors.") - parser.add_argument("--default-value", default="?", - help="Default value to use if authors list is empty.") - parser.add_argument("--abbr-authors-field", - help="The field for the generated abbreviated authors. " + - "If not provided, the original authors field will be modified.") - - args = parser.parse_args() - - for index, record in enumerate(stdin): - record = json.loads(record) - - parse_authors(record, args.authors_field, args.default_value, index, args.abbr_authors_field) - - json.dump(record, stdout, allow_nan=False, indent=None, separators=',:') - print() diff --git a/ingest/bin/transform-date-fields b/ingest/bin/transform-date-fields deleted file mode 100755 index 4ff2a698..00000000 --- a/ingest/bin/transform-date-fields +++ /dev/null @@ -1,154 +0,0 @@ -#!/usr/bin/env python3 -""" -Standardizes format of date fields of the NDJSON record from stdin to -ISO 8601 date (YYYY-MM-DD) and outputs modified records to stdout. -""" -import argparse -import json -from datetime import datetime -from sys import stderr, stdin, stdout - - -def format_date(date_string: str, expected_formats: list) -> str: - """ - Originally from nextstrain/ncov-ingest - - Format *date_string* to ISO 8601 date (YYYY-MM-DD). - If *date_string* does not match *expected_formats*, return *date_string*. - If *date_string* is missing the year, return masked date 'XXXX-XX-XX'. - If *date_string* is an incomplete date (i.e. missing month or day), then - missing values are masked with 'XX'. - - >>> expected_formats = ['%Y-%m-%d', '%Y-%m-%dT%H:%M:%SZ', '%m-%d'] - - >>> format_date("01-01", expected_formats) - 'XXXX-XX-XX' - - >>> format_date("2020", expected_formats) - '2020-XX-XX' - - >>> format_date("2020-01", expected_formats) - '2020-01-XX' - - >>> format_date("2020-1-15", expected_formats) - '2020-01-15' - - >>> format_date("2020-1-1", expected_formats) - '2020-01-01' - - >>> format_date("2020-01-15", expected_formats) - '2020-01-15' - - >>> format_date("2020-01-15T00:00:00Z", expected_formats) - '2020-01-15' - """ - # Potential directives that datetime accepts that can return the correct year, month, day fields - # see https://docs.python.org/3.9/library/datetime.html#strftime-and-strptime-format-codes - # - # Allows us to check if year/month/day are included in the date format so we - # know when to mask incomplete dates with 'XX' - all_field_directives = {'%c', '%x', - ('%G', '%V', '%A'), ('%G', '%V', '%a'), ('%G', '%V', '%w'), ('%G', '%V', '%u') - } - month_and_day_directives = {'%j', - ('%U', '%A'), ('%U', '%a'), ('%U', '%w'), ('%U', '%u'), - ('%W', '%A'), ('%W', '%a'), ('%W', '%w'), ('%W', '%u') - } - year_directives = {'%y', '%Y'} - month_directives = {'%b', '%B', '%m'} - day_directives = {'%d'} - - def directive_is_included(potential_directives: set, date_format: str) -> bool: - """ - Checks if any of the directives in *potential_directives* is included - in *date_format* string. - - If an element within *potential_directives* is a tuple, then all directives - within the tuple must be included in *date_format*. - """ - return any( - ( - (isinstance(directive, str) and directive in date_format) or - (isinstance(directive, tuple) and all(sub_directive in date_format for sub_directive in directive)) - ) - for directive in potential_directives - ) - - for date_format in expected_formats: - try: - parsed_date = datetime.strptime(date_string, date_format) - except ValueError: - continue - - # Default to date masked as 'XXXX-XX-XX' so we don't return incorrect dates - year_string = 'XXXX' - month_string = day_string = 'XX' - - parsed_year_string = str(parsed_date.year) - parsed_month_string = str(parsed_date.month).zfill(2) - parsed_day_string = str(parsed_date.day).zfill(2) - - # If a directive for ALL fields is included in date format, - # then use all of the parsed field strings - if (directive_is_included(all_field_directives, date_format)): - year_string = parsed_year_string - month_string = parsed_month_string - day_string = parsed_day_string - - # If not all fields directives are included, then check year - # directive was included in date format - elif(directive_is_included(year_directives, date_format)): - year_string = parsed_year_string - - # Only check for month and day directives if year is included - # Check if directive for BOTH month and year is included in date format - if (directive_is_included(month_and_day_directives, date_format)): - month_string = parsed_month_string - day_string = parsed_day_string - - # If not directives for BOTH month and day are included, then check - # month directive was included in date format - elif(directive_is_included(month_directives, date_format)): - month_string = parsed_month_string - - # Only check for day directives if month is included - if(directive_is_included(day_directives, date_format)): - day_string = parsed_day_string - - return f"{year_string}-{month_string}-{day_string}" - - if date_string: - print( - f"WARNING: Unable to transform date string {date_string!r} because it does not match", - f"any of the expected formats {expected_formats}.", - file=stderr - ) - - return date_string - - -if __name__ == '__main__': - parser = argparse.ArgumentParser( - description=__doc__, - formatter_class=argparse.ArgumentDefaultsHelpFormatter - ) - parser.add_argument("--date-fields", nargs="+", - help="List of date field names in the NDJSON record that need to be standardized.") - parser.add_argument("--expected-date-formats", nargs="+", - help="Expected date formats that are currently in the provided date fields." + - "If a date string matches multiple formats, it will be parsed as the first format in the list.") - - args = parser.parse_args() - - expected_formats = args.expected_date_formats - - for record in stdin: - record = json.loads(record) - - for field in args.date_fields: - date_string = record.get(field) - if date_string: - record[field] = format_date(date_string, expected_formats) - - json.dump(record, stdout, allow_nan=False, indent=None, separators=',:') - print() diff --git a/ingest/bin/transform-field-names b/ingest/bin/transform-field-names deleted file mode 100755 index fde223fc..00000000 --- a/ingest/bin/transform-field-names +++ /dev/null @@ -1,48 +0,0 @@ -#!/usr/bin/env python3 -""" -Renames fields of the NDJSON record from stdin and outputs modified records -to stdout. -""" -import argparse -import json -from sys import stderr, stdin, stdout - - -if __name__ == '__main__': - parser = argparse.ArgumentParser( - description=__doc__, - formatter_class=argparse.ArgumentDefaultsHelpFormatter - ) - parser.add_argument("--field-map", nargs="+", - help="Fields names in the NDJSON record mapped to new field names, " + - "formatted as '{old_field_name}={new_field_name}'. " + - "If the old field does not exist in record, the new field will be added with an empty string value." + - "If the new field already exists in record, then the renaming of the old field will be skipped.") - parser.add_argument("--force", action="store_true", - help="Force renaming of old field even if the new field already exists. " + - "Please keep in mind this will overwrite the value of the new field.") - - args = parser.parse_args() - - field_map = {} - for field in args.field_map: - old_name, new_name = field.split('=') - field_map[old_name] = new_name - - for record in stdin: - record = json.loads(record) - - for old_field, new_field in field_map.items(): - - if record.get(new_field) and not args.force: - print( - f"WARNING: skipping rename of {old_field} because record", - f"already has a field named {new_field}.", - file=stderr - ) - continue - - record[new_field] = record.pop(old_field, '') - - json.dump(record, stdout, allow_nan=False, indent=None, separators=',:') - print() diff --git a/ingest/bin/transform-genbank-location b/ingest/bin/transform-genbank-location deleted file mode 100755 index 70ba56fb..00000000 --- a/ingest/bin/transform-genbank-location +++ /dev/null @@ -1,43 +0,0 @@ -#!/usr/bin/env python3 -""" -Parses GenBank's 'location' field of the NDJSON record from stdin to 3 separate -fields: 'country', 'division', and 'location'. Checks that a record is from -GenBank by verifying that the 'database' field has a value of "GenBank" or "RefSeq". - -Outputs the modified record to stdout. -""" -import json -from sys import stdin, stdout - - -def parse_location(record: dict) -> dict: - # Expected pattern for the location field is "[:][, ]" - # See GenBank docs for their "country" field: - # https://www.ncbi.nlm.nih.gov/genbank/collab/country/ - geographic_data = record['location'].split(':') - - country = geographic_data[0] - division = '' - location = '' - - if len(geographic_data) == 2: - division , _ , location = geographic_data[1].partition(',') - - record['country'] = country.strip() - record['division'] = division.strip() - record['location'] = location.strip() - - return record - - -if __name__ == '__main__': - - for record in stdin: - record = json.loads(record) - - database = record.get('database', '') - if database in {'GenBank', 'RefSeq'}: - parse_location(record) - - json.dump(record, stdout, allow_nan=False, indent=None, separators=',:') - print() diff --git a/ingest/bin/transform-strain-names b/ingest/bin/transform-strain-names deleted file mode 100755 index d86c0e40..00000000 --- a/ingest/bin/transform-strain-names +++ /dev/null @@ -1,50 +0,0 @@ -#!/usr/bin/env python3 -""" -Verifies strain name pattern in the 'strain' field of the NDJSON record from -stdin. Adds a 'strain' field to the record if it does not already exist. - -Outputs the modified records to stdout. -""" -import argparse -import json -import re -from sys import stderr, stdin, stdout - - -if __name__ == '__main__': - parser = argparse.ArgumentParser( - description=__doc__, - formatter_class=argparse.ArgumentDefaultsHelpFormatter - ) - parser.add_argument("--strain-regex", default="^.+$", - help="Regex pattern for strain names. " + - "Strain names that do not match the pattern will be dropped.") - parser.add_argument("--backup-fields", nargs="*", - help="List of backup fields to use as strain name if the value in 'strain' " + - "does not match the strain regex pattern. " + - "If multiple fields are provided, will use the first field that has a non-empty string.") - - args = parser.parse_args() - - strain_name_pattern = re.compile(args.strain_regex) - - for index, record in enumerate(stdin): - record = json.loads(record) - - # Verify strain name matches the strain regex pattern - if strain_name_pattern.match(record.get('strain', '')) is None: - # Default to empty string if not matching pattern - record['strain'] = '' - # Use non-empty value of backup fields if provided - if args.backup_fields: - for field in args.backup_fields: - if record.get(field): - record['strain'] = str(record[field]) - break - - if record['strain'] == '': - print(f"WARNING: Record number {index} has an empty string as the strain name.", file=stderr) - - - json.dump(record, stdout, allow_nan=False, indent=None, separators=',:') - print() diff --git a/ingest/bin/transform-string-fields b/ingest/bin/transform-string-fields deleted file mode 100755 index e0749e68..00000000 --- a/ingest/bin/transform-string-fields +++ /dev/null @@ -1,84 +0,0 @@ -#!/usr/bin/env python3 -""" -Standardizes string fields of the NDJSON record from stdin and outputs the -modified record to stdout. -""" -import argparse -import json -import re -from sys import stdin, stdout -from typing import Optional, Set, Union - - -def titlecase(text: Union[str, None], articles: Set[str] = {}, abbreviations: Set[str] = {}) -> Optional[str]: - """ - Originally from nextstrain/ncov-ingest - - Returns a title cased location name from the given location name - *tokens*. Ensures that no tokens contained in the *whitelist_tokens* are - converted to title case. - - >>> articles = {'a', 'and', 'of', 'the', 'le'} - >>> abbreviations = {'USA', 'DC'} - - >>> titlecase("the night OF THE LIVING DEAD", articles) - 'The Night of the Living Dead' - - >>> titlecase("BRAINE-LE-COMTE, FRANCE", articles) - 'Braine-le-Comte, France' - - >>> titlecase("auvergne-RHÔNE-alpes", articles) - 'Auvergne-Rhône-Alpes' - - >>> titlecase("washington DC, usa", articles, abbreviations) - 'Washington DC, USA' - """ - if not isinstance(text, str): - return - - words = enumerate(re.split(r'\b', text)) - - def changecase(index, word): - casefold = word.casefold() - upper = word.upper() - - if upper in abbreviations: - return upper - elif casefold in articles and index != 1: - return word.lower() - else: - return word.title() - - return ''.join(changecase(i, w) for i, w in words) - - -if __name__ == '__main__': - parser = argparse.ArgumentParser( - description=__doc__, - formatter_class=argparse.ArgumentDefaultsHelpFormatter - ) - parser.add_argument("--titlecase-fields", nargs="*", - help="List of fields to convert to titlecase.") - parser.add_argument("--articles", nargs="*", - help="List of articles that should not be cast to titlecase.") - parser.add_argument("--abbreviations", nargs="*", - help="List of abbreviations that should not be cast to titlecase, keeps uppercase.") - - - args = parser.parse_args() - - if args.articles: - articles = set(args.articles) - - if args.abbreviations: - abbreviations = set(args.abbreviations) - - for record in stdin: - record = json.loads(record) - - if args.titlecase_fields: - for field in args.titlecase_fields: - record[field] = titlecase(record.get(field, ""), articles, abbreviations) - - json.dump(record, stdout, allow_nan=False, indent=None, separators=',:') - print() diff --git a/ingest/bin/trigger b/ingest/bin/trigger deleted file mode 100755 index d40553b6..00000000 --- a/ingest/bin/trigger +++ /dev/null @@ -1,56 +0,0 @@ -#!/bin/bash -set -euo pipefail - -: "${PAT_GITHUB_DISPATCH:=}" - -repo="${1:?A repository name is required as the first argument.}" -event_type="${2:?An event type is required as the second argument.}" -shift 2 - -if [[ $# -eq 0 && -z $PAT_GITHUB_DISPATCH ]]; then - cat >&2 <<. -You must specify options to curl for your GitHub credentials. For example, you -can specify your GitHub username, and will be prompted for your password: - - $0 $repo $event_type --user - -Be sure to enter a personal access token¹ as your password since GitHub has -discontinued password authentication to the API starting on November 13, 2020². - -You can also store your credentials or a personal access token in a netrc -file³: - - machine api.github.com - login - password - -and then tell curl to use it: - - $0 $repo $event_type --netrc - -which will then not require you to type your password every time. - -¹ https://help.github.com/en/github/authenticating-to-github/creating-a-personal-access-token-for-the-command-line -² https://docs.github.com/en/rest/overview/other-authentication-methods#via-username-and-password -³ https://ec.haxx.se/usingcurl/usingcurl-netrc -. - exit 1 -fi - -auth=':' -if [[ -n $PAT_GITHUB_DISPATCH ]]; then - auth="Authorization: Bearer ${PAT_GITHUB_DISPATCH}" -fi - -if curl -fsS "https://api.github.com/repos/nextstrain/${repo}/dispatches" \ - -H 'Accept: application/vnd.github.v3+json' \ - -H 'Content-Type: application/json' \ - -H "$auth" \ - -d '{"event_type":"'"$event_type"'"}' \ - "$@" -then - echo "Successfully triggered $event_type" -else - echo "Request failed" >&2 - exit 1 -fi diff --git a/ingest/bin/trigger-on-new-data b/ingest/bin/trigger-on-new-data deleted file mode 100755 index 760a0187..00000000 --- a/ingest/bin/trigger-on-new-data +++ /dev/null @@ -1,30 +0,0 @@ -#!/bin/bash -set -euo pipefail - -: "${PAT_GITHUB_DISPATCH:?The PAT_GITHUB_DISPATCH environment variable is required.}" - -bin="$(dirname "$0")" - -metadata="${1:?A metadata upload output file is required as the first argument.}" -sequences="${2:?An sequence FASTA upload output file is required as the second argument.}" -identical_file_message="${3:-files are identical}" - -new_metadata=$(grep "$identical_file_message" "$metadata" >/dev/null; echo $?) -new_sequences=$(grep "$identical_file_message" "$sequences" >/dev/null; echo $?) - -slack_message="" - -# grep exit status 0 for found match, 1 for no match, 2 if an error occurred -if [[ $new_metadata -eq 1 || $new_sequences -eq 1 ]]; then - slack_message="Triggering new builds due to updated metadata and/or sequences" - "$bin"/trigger "monkeypox" "rebuild" -elif [[ $new_metadata -eq 0 && $new_sequences -eq 0 ]]; then - slack_message="Skipping trigger of rebuild: Both metadata TSV and sequences FASTA are identical to S3 files." -else - slack_message="Skipping trigger of rebuild: Unable to determine if data has been updated." -fi - - -if ! "$bin"/notify-slack "$slack_message"; then - echo "Notifying Slack failed, but exiting with success anyway." -fi diff --git a/ingest/bin/upload-to-s3 b/ingest/bin/upload-to-s3 deleted file mode 100755 index b993c3dd..00000000 --- a/ingest/bin/upload-to-s3 +++ /dev/null @@ -1,76 +0,0 @@ -#!/bin/bash -# Originally copied from nextstrain/ncov-ingest repo -set -euo pipefail - -bin="$(dirname "$0")" - -main() { - local quiet=0 - - for arg; do - case "$arg" in - --quiet) - quiet=1 - shift;; - *) - break;; - esac - done - - local src="${1:?A source file is required as the first argument.}" - local dst="${2:?A destination s3:// URL is required as the second argument.}" - local cloudfront_domain="${3:-}" - - local s3path="${dst#s3://}" - local bucket="${s3path%%/*}" - local key="${s3path#*/}" - - local src_hash dst_hash no_hash=0000000000000000000000000000000000000000000000000000000000000000 - src_hash="$("$bin/sha256sum" < "$src")" - dst_hash="$(aws s3api head-object --bucket "$bucket" --key "$key" --query Metadata.sha256sum --output text 2>/dev/null || echo "$no_hash")" - - if [[ $src_hash != "$dst_hash" ]]; then - # The record count may have changed - src_record_count="$(wc -l < "$src")" - - echo "Uploading $src → $dst" - if [[ "$dst" == *.gz ]]; then - gzip -c "$src" - elif [[ "$dst" == *.xz ]]; then - xz -2 -T0 -c "$src" - else - cat "$src" - fi | aws s3 cp --no-progress - "$dst" --metadata sha256sum="$src_hash",recordcount="$src_record_count" "$(content-type "$dst")" - - if [[ -n $cloudfront_domain ]]; then - echo "Creating CloudFront invalidation for $cloudfront_domain/$key" - if ! "$bin"/cloudfront-invalidate "$cloudfront_domain" "/$key"; then - echo "CloudFront invalidation failed, but exiting with success anyway." - fi - fi - - if [[ $quiet == 1 ]]; then - echo "Quiet mode. No Slack notification sent." - exit 0 - fi - - if ! "$bin"/notify-slack "Updated $dst available."; then - echo "Notifying Slack failed, but exiting with success anyway." - fi - else - echo "Uploading $src → $dst: files are identical, skipping upload" - fi -} - -content-type() { - case "$1" in - *.tsv) echo --content-type=text/tab-separated-values;; - *.csv) echo --content-type=text/comma-separated-values;; - *.ndjson) echo --content-type=application/x-ndjson;; - *.gz) echo --content-type=application/gzip;; - *.xz) echo --content-type=application/x-xz;; - *) echo --content-type=text/plain;; - esac -} - -main "$@" diff --git a/ingest/workflow/snakemake_rules/fetch_sequences.smk b/ingest/workflow/snakemake_rules/fetch_sequences.smk index 9f330062..67e13cd4 100644 --- a/ingest/workflow/snakemake_rules/fetch_sequences.smk +++ b/ingest/workflow/snakemake_rules/fetch_sequences.smk @@ -17,9 +17,28 @@ Produces final output as rule fetch_from_genbank: output: genbank_ndjson="data/genbank.ndjson", + params: + serotype_tax_id=download_serotype, + csv_to_ndjson_url="https://raw.githubusercontent.com/nextstrain/monkeypox/644d07ebe3fa5ded64d27d0964064fb722797c5d/ingest/bin/csv-to-ndjson", shell: """ - ./bin/fetch-from-genbank 10244 > {output.genbank_ndjson} + # (1) Pick curl or wget based on availability + if which curl > /dev/null; then + download_cmd="curl -fsSL --output" + elif which wget > /dev/null; then + download_cmd="wget -O" + else + echo "ERROR: Neither curl nor wget found. Please install one of them." + exit 1 + fi + + # (2) Download the required scripts if not already present + [[ -d bin ]] || mkdir bin + [[ -f bin/csv-to-ndjson ]] || $download_cmd bin/csv-to-ndjson {params.csv_to_ndjson_url} + chmod +x bin/* + + # (3) Fetch sequences from GenBank + ./bin/fetch-from-genbank {params.serotype_tax_id} > {output.genbank_ndjson} """ diff --git a/ingest/workflow/snakemake_rules/slack_notifications.smk b/ingest/workflow/snakemake_rules/slack_notifications.smk index b4a54753..8a4c2bf9 100644 --- a/ingest/workflow/snakemake_rules/slack_notifications.smk +++ b/ingest/workflow/snakemake_rules/slack_notifications.smk @@ -28,8 +28,25 @@ rule notify_on_genbank_record_change: touch("data/notify/genbank-record-change.done"), params: s3_src=S3_SRC, + notify_on_record_change_url="https://raw.githubusercontent.com/nextstrain/monkeypox/644d07ebe3fa5ded64d27d0964064fb722797c5d/ingest/bin/notify-on-record-change", shell: """ + # (1) Pick curl or wget based on availability + if which curl > /dev/null; then + download_cmd="curl -fsSL --output" + elif which wget > /dev/null; then + download_cmd="wget -O" + else + echo "ERROR: Neither curl nor wget found. Please install one of them." + exit 1 + fi + + # (2) Download the required scripts if not already present + [[ -d bin ]] || mkdir bin + [[ -f bin/notify-on-record-change ]] || $download_cmd bin/notify-on-record-change {params.notify_on_record_change_url} + chmod +x bin/* + + # (3) Run the script ./bin/notify-on-record-change {input.genbank_ndjson} {params.s3_src:q}/genbank.ndjson.xz Genbank """ @@ -41,8 +58,25 @@ rule notify_on_metadata_diff: touch("data/notify/metadata-diff.done"), params: s3_src=S3_SRC, + notify_on_diff_url = "https://raw.githubusercontent.com/nextstrain/monkeypox/644d07ebe3fa5ded64d27d0964064fb722797c5d/ingest/bin/notify-on-diff", shell: """ + # (1) Pick curl or wget based on availability + if which curl > /dev/null; then + download_cmd="curl -fsSL --output" + elif which wget > /dev/null; then + download_cmd="wget -O" + else + echo "ERROR: Neither curl nor wget found. Please install one of them." + exit 1 + fi + + # (2) Download the required scripts if not already present + [[ -d bin ]] || mkdir bin + [[ -f bin/notify-on-diff ]] || $download_cmd bin/notify-on-diff {params.notify_on_diff_url} + chmod +x bin/* + + # (3) Run the script ./bin/notify-on-diff {input.metadata} {params.s3_src:q}/metadata.tsv.gz """ diff --git a/ingest/workflow/snakemake_rules/transform.smk b/ingest/workflow/snakemake_rules/transform.smk index 1adc9bc1..c19eade6 100644 --- a/ingest/workflow/snakemake_rules/transform.smk +++ b/ingest/workflow/snakemake_rules/transform.smk @@ -20,7 +20,18 @@ rule fetch_general_geolocation_rules: geolocation_rules_url=config["transform"]["geolocation_rules_url"], shell: """ - curl {params.geolocation_rules_url} > {output.general_geolocation_rules} + # (1) Pick curl or wget based on availability + if which curl > /dev/null; then + download_cmd="curl -fsSL --output" + elif which wget > /dev/null; then + download_cmd="wget -O" + else + echo "ERROR: Neither curl nor wget found. Please install one of them." + exit 1 + fi + + # (2) Fetch general geolocation rules + $download_cmd {output.general_geolocation_rules} {params.geolocation_rules_url} """ @@ -62,8 +73,41 @@ rule transform: metadata_columns=config["transform"]["metadata_columns"], id_field=config["transform"]["id_field"], sequence_field=config["transform"]["sequence_field"], + transform_field_names_url="https://raw.githubusercontent.com/nextstrain/monkeypox/644d07ebe3fa5ded64d27d0964064fb722797c5d/ingest/bin/transform-field-names", + transform_string_fields_url="https://raw.githubusercontent.com/nextstrain/monkeypox/644d07ebe3fa5ded64d27d0964064fb722797c5d/ingest/bin/transform-string-fields", + transform_strain_names_url="https://raw.githubusercontent.com/nextstrain/monkeypox/b54768ec17872eb0d898e29527785642f6b98c0d/ingest/bin/transform-strain-names", + transform_date_fields_url="https://raw.githubusercontent.com/nextstrain/monkeypox/644d07ebe3fa5ded64d27d0964064fb722797c5d/ingest/bin/transform-date-fields", + transform_genbank_location_url="https://raw.githubusercontent.com/nextstrain/monkeypox/644d07ebe3fa5ded64d27d0964064fb722797c5d/ingest/bin/transform-genbank-location", + transform_authors_url="https://raw.githubusercontent.com/nextstrain/monkeypox/644d07ebe3fa5ded64d27d0964064fb722797c5d/ingest/bin/transform-authors", + apply_geolocation_rules_url="https://raw.githubusercontent.com/nextstrain/monkeypox/644d07ebe3fa5ded64d27d0964064fb722797c5d/ingest/bin/apply-geolocation-rules", + merge_user_metadata_url="https://raw.githubusercontent.com/nextstrain/monkeypox/644d07ebe3fa5ded64d27d0964064fb722797c5d/ingest/bin/merge-user-metadata", + ndjson_to_tsv_and_fasta_url="https://raw.githubusercontent.com/nextstrain/monkeypox/644d07ebe3fa5ded64d27d0964064fb722797c5d/ingest/bin/ndjson-to-tsv-and-fasta", shell: """ + # (1) Pick curl or wget based on availability + if which curl > /dev/null; then + download_cmd="curl -fsSL --output" + elif which wget > /dev/null; then + download_cmd="wget -O" + else + echo "ERROR: Neither curl nor wget found. Please install one of them." + exit 1 + fi + + # (2) Download the required scripts if not already present + [[ -d bin ]] || mkdir bin + [[ -f bin/transform-field-names ]] || $download_cmd bin/transform-field-names {params.transform_field_names_url} + [[ -f bin/transform-string-fields ]] || $download_cmd bin/transform-string-fields {params.transform_string_fields_url} + [[ -f bin/transform-strain-names ]] || $download_cmd bin/transform-strain-names {params.transform_strain_names_url} + [[ -f bin/transform-date-fields ]] || $download_cmd bin/transform-date-fields {params.transform_date_fields_url} + [[ -f bin/transform-genbank-location ]] || $download_cmd bin/transform-genbank-location {params.transform_genbank_location_url} + [[ -f bin/transform-authors ]] || $download_cmd bin/transform-authors {params.transform_authors_url} + [[ -f bin/apply-geolocation-rules ]] || $download_cmd bin/apply-geolocation-rules {params.apply_geolocation_rules_url} + [[ -f bin/merge-user-metadata ]] || $download_cmd bin/merge-user-metadata {params.merge_user_metadata_url} + [[ -f bin/ndjson-to-tsv-and-fasta ]] || $download_cmd bin/ndjson-to-tsv-and-fasta {params.ndjson_to_tsv_and_fasta_url} + chmod +x bin/* + + # (3) Transform the sequences (cat {input.sequences_ndjson} \ | ./bin/transform-field-names \ --field-map {params.field_map} \ diff --git a/ingest/workflow/snakemake_rules/trigger_rebuild.smk b/ingest/workflow/snakemake_rules/trigger_rebuild.smk index 6d8aedeb..47f43621 100644 --- a/ingest/workflow/snakemake_rules/trigger_rebuild.smk +++ b/ingest/workflow/snakemake_rules/trigger_rebuild.smk @@ -13,7 +13,25 @@ rule trigger_build: fasta_upload = "data/upload/s3/sequences.fasta-to-sequences.fasta.xz.done" output: touch("data/trigger/rebuild.done") + params: + trigger_on_new_data_url="https://raw.githubusercontent.com/nextstrain/monkeypox/644d07ebe3fa5ded64d27d0964064fb722797c5d/ingest/bin/trigger-on-new-data" shell: """ + # (1) Pick curl or wget based on availability + if which curl > /dev/null; then + download_cmd="curl -fsSL --output" + elif which wget > /dev/null; then + download_cmd="wget -O" + else + echo "ERROR: Neither curl nor wget found. Please install one of them." + exit 1 + fi + + # (2) Download the required scripts if not already present + [[ -d bin ]] || mkdir bin + [[ -f bin/trigger-on-new-data ]] || $download_cmd bin/trigger-on-new-data {params.trigger_on_new_data_url} + chmod +x bin/* + + # (3) Trigger the build ./bin/trigger-on-new-data {input.metadata_upload} {input.fasta_upload} """ diff --git a/ingest/workflow/snakemake_rules/upload.smk b/ingest/workflow/snakemake_rules/upload.smk index 23f892bd..67b99d43 100644 --- a/ingest/workflow/snakemake_rules/upload.smk +++ b/ingest/workflow/snakemake_rules/upload.smk @@ -54,8 +54,29 @@ rule upload_to_s3: quiet="" if send_notifications else "--quiet", s3_dst=config["upload"].get("s3", {}).get("dst", ""), cloudfront_domain=config["upload"].get("s3", {}).get("cloudfront_domain", ""), + upload_to_s3_url="https://raw.githubusercontent.com/nextstrain/monkeypox/644d07ebe3fa5ded64d27d0964064fb722797c5d/ingest/bin/upload-to-s3", + sha256sum_url="https://raw.githubusercontent.com/nextstrain/monkeypox/644d07ebe3fa5ded64d27d0964064fb722797c5d/ingest/bin/sha256sum", + cloudfront_invalidate_url="https://raw.githubusercontent.com/nextstrain/monkeypox/644d07ebe3fa5ded64d27d0964064fb722797c5d/ingest/bin/cloudfront-invalidate" shell: """ + # (1) Pick curl or wget based on availability + if which curl > /dev/null; then + download_cmd="curl -fsSL --output" + elif which wget > /dev/null; then + download_cmd="wget -O" + else + echo "ERROR: Neither curl nor wget found. Please install one of them." + exit 1 + fi + + # (2) Download the required scripts if not already present + [[ -d bin ]] || mkdir bin + [[ -f bin/upload-to-s3 ]] || $download_cmd bin/upload-to-s3 {params.upload_to_s3_url} + [[ -f bin/sha256sum ]] || $download_cmd bin/sha256sum {params.sha256sum_url} + [[ -f bin/cloudfront-invalidate ]] || $download_cmd bin/cloudfront-invalidate {params.cloudfront_invalidate_url} + chmod +x bin/* + + # (3) Run the upload script ./bin/upload-to-s3 \ {params.quiet} \ {input.file_to_upload:q} \