Skip to content

Commit

Permalink
Merge pull request #189 from NASA-IMPACT/support-collection-multiple-…
Browse files Browse the repository at this point in the history
…config

scheduled discovery/ingest can now have files that contain multiple discovery config
  • Loading branch information
slesaad authored Jul 15, 2024
2 parents 2dc719c + e19da96 commit 7d163ab
Showing 1 changed file with 20 additions and 5 deletions.
25 changes: 20 additions & 5 deletions dags/generate_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@

from airflow.models.variable import Variable


from veda_data_pipeline.veda_discover_pipeline import get_discover_dag


def generate_dags():
import boto3
import json

from pathlib import Path


mwaa_stac_conf = Variable.get("MWAA_STACK_CONF", deserialize_json=True)
bucket = mwaa_stac_conf["EVENT_BUCKET"]

Expand All @@ -23,12 +25,25 @@ def generate_dags():
key = file_["Key"]
if key.endswith("/"):
continue
file_name = Path(key).stem
result = client.get_object(Bucket=bucket, Key=key)
collection = result["Body"].read().decode()
collection = json.loads(collection)
if collection.get("schedule"):
discovery_configs = result["Body"].read().decode()
discovery_configs = json.loads(discovery_configs)

# Allow the file content to be either one config or a list of configs
if type(discovery_configs) is dict:
discovery_configs = [discovery_configs]
scheduled_discovery_configs = [
discovery_config
for discovery_config in discovery_configs
if discovery_config.get("schedule")
]
for idx, discovery_config in enumerate(scheduled_discovery_configs):
id = f"discover-{file_name}"
if idx > 0:
id = f"{id}-{idx}"
get_discover_dag(
id=f"discover-{collection['collection']}", event=collection
id=id, event=discovery_config
)


Expand Down

0 comments on commit 7d163ab

Please sign in to comment.