Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add task_kwargs param to dbt_flow #47

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions prefect_dbt_flow/dbt/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ def generate_tasks_dag(
profile: Optional[DbtProfile],
dag_options: Optional[DbtDagOptions],
dbt_graph: List[DbtNode],
task_kwargs: Optional[Dict] = None,
run_test_after_model: bool = False,
) -> None:
"""
Expand All @@ -224,6 +225,7 @@ def generate_tasks_dag(
profile: A class that represents a dbt profile configuration.
dag_options: A class to add dbt DAG configurations.
dbt_graph: A list of dbt nodes (models) to include in the DAG.
task_kwargs: Additional task configuration for running each model.
run_test_after_model: If True, run tests after running each model.

Returns:
Expand All @@ -237,6 +239,7 @@ def generate_tasks_dag(
profile=profile,
dag_options=dag_options,
dbt_node=dbt_node,
task_kwargs=task_kwargs,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now I've applied the same task_kwargs to all dbt tasks corresponding to resource types, but not those corresponding to dbt tests (below). However, I'm wondering if a more specific method of supplying task_kwargs is preferred, for example to specify different values for tests or even for different resource types.

)
for dbt_node in dbt_graph
}
Expand Down
3 changes: 3 additions & 0 deletions prefect_dbt_flow/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ def dbt_flow(
profile: Optional[DbtProfile] = None,
dag_options: Optional[DbtDagOptions] = None,
flow_kwargs: Optional[dict] = None,
task_kwargs: Optional[dict] = None,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that in tasks.py, Dict is imported from typing (along with List), while flow.py just uses dict. If this is an inconsistency, I'm happy to clean it up in this PR.

) -> Flow:
"""
Create a PrefectFlow for executing a dbt project.
Expand All @@ -20,6 +21,7 @@ def dbt_flow(
profile: A Class that represents a dbt profile configuration.
dag_options: A Class to add dbt DAG configurations.
flow_kwargs: A dict of prefect @flow arguments
task_kwargs: A dict of Prefect @task arguments

Returns:
dbt_flow: A Prefec Flow.
Expand All @@ -44,6 +46,7 @@ def dbt_flow():
profile,
dag_options,
dbt_graph,
task_kwargs,
dag_options.run_test_after_model if dag_options else False,
)

Expand Down