Skip to content

Commit

Permalink
add better support for passing down args
Browse files Browse the repository at this point in the history
  • Loading branch information
jlaneve committed Jul 24, 2023
1 parent 981ae8f commit 45799d1
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 7 deletions.
26 changes: 19 additions & 7 deletions cosmos/core/airflow.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import importlib
import logging
from typing import Optional
import inspect

from airflow.models import BaseOperator
from airflow.models.dag import DAG
Expand All @@ -24,14 +25,25 @@ def get_airflow_task(task: Task, dag: DAG, task_group: Optional[TaskGroup] = Non
# fully qualified name defined in the task
module_name, class_name = task.operator_class.rsplit(".", 1)
module = importlib.import_module(module_name)
Operator = getattr(module, class_name)

airflow_task = Operator(
task_id=task.id,
dag=dag,
task_group=task_group,
operator = getattr(module, class_name)

# ensure we only pass the arguments that the operator expects
supported_args = set()
for inherited_class in operator.mro():
for arg in inspect.signature(inherited_class.__init__).parameters:
supported_args.add(arg)

potential_operator_args = {
"task_id": task.id,
"dag": dag,
"task_group": task_group,
**task.arguments,
)
}
operator_args = {
arg_key: arg_value for arg_key, arg_value in potential_operator_args.items() if arg_key in supported_args
}

airflow_task = operator(**operator_args)

if not isinstance(airflow_task, BaseOperator):
raise TypeError(f"Operator class {task.operator_class} is not a subclass of BaseOperator")
Expand Down
3 changes: 3 additions & 0 deletions dev/dags/basic_cosmos_task_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ def basic_cosmos_task_group() -> None:
render_config=RenderConfig(
test_behavior="after_all",
),
operator_args={
"on_warning_callback": print,
},
)

post_dbt = EmptyOperator(task_id="post_dbt")
Expand Down

0 comments on commit 45799d1

Please sign in to comment.