From cfb2123342bc9ef0c7b688d4ad6db7cd0f0d1008 Mon Sep 17 00:00:00 2001 From: Abhishek Mohan Date: Mon, 24 Jul 2023 14:43:33 -0400 Subject: [PATCH] Rebase changes. --- cosmos/__init__.py | 4 ---- cosmos/airflow/graph.py | 4 ++-- cosmos/converter.py | 5 ++++- cosmos/dbt/graph.py | 10 ++++++++-- cosmos/dbt/parser/project.py | 15 ++++++++------- dev/dags/dbt/jaffle_shop_python/.user.yml | 1 + 6 files changed, 23 insertions(+), 16 deletions(-) create mode 100644 dev/dags/dbt/jaffle_shop_python/.user.yml diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 2113b4522..82ed9ef3f 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -11,10 +11,6 @@ from cosmos.airflow.task_group import DbtTaskGroup from cosmos.constants import LoadMode, TestBehavior, ExecutionMode from cosmos.dataset import get_dbt_dataset - -# re-export the dag and task group -from cosmos.airflow.dag import DbtDag -from cosmos.airflow.task_group import DbtTaskGroup from cosmos.operators.lazy_load import MissingPackage from cosmos.operators.local import ( diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 5fb24b5fe..d6a67c663 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -51,7 +51,7 @@ def calculate_leaves(tasks_ids: list[str], nodes: dict[str, DbtNode]) -> list[st return leaves -def create_task_metadata(node: DbtNode, execution_mode: ExecutionMode, args: dict) -> TaskMetadata: +def create_task_metadata(node: DbtNode, execution_mode: ExecutionMode, args: dict[str, Any]) -> TaskMetadata | None: """ Create the metadata that will be used to instantiate the Airflow Task used to run the Dbt node. @@ -88,7 +88,7 @@ def create_test_task_metadata( test_task_name: str, execution_mode: ExecutionMode, task_args: dict[str, Any], - on_warning_callback: callable, + on_warning_callback: Callable[..., Any] | None = None, model_name: str | None = None, ) -> TaskMetadata: """ diff --git a/cosmos/converter.py b/cosmos/converter.py index 7ea065780..f89fe05c1 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -1,8 +1,11 @@ +# mypy: ignore-errors +# ignoring enum Mypy errors + from __future__ import annotations +from enum import Enum import inspect import logging -import sys from typing import Any, Callable from airflow.exceptions import AirflowException diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 527723b3a..7ba4a52e3 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -4,6 +4,7 @@ import logging import os from dataclasses import dataclass, field +from pathlib import Path from subprocess import Popen, PIPE from typing import Any @@ -34,7 +35,7 @@ class DbtNode: name: str unique_id: str - resource_type: str + resource_type: DbtResourceType depends_on: list[str] file_path: Path tags: list[str] = field(default_factory=lambda: []) @@ -130,7 +131,12 @@ def load_via_dbt_ls(self) -> None: logger.info(f"Running command: {command}") try: process = Popen( - command, stdout=PIPE, stderr=PIPE, cwd=self.project.dir, universal_newlines=True, env=os.environ # type: ignore[arg-type] + command, # type: ignore[arg-type] + stdout=PIPE, + stderr=PIPE, + cwd=self.project.dir, + universal_newlines=True, + env=os.environ, ) except FileNotFoundError as exception: raise CosmosLoadDbtException(f"Unable to run the command due to the error:\n{exception}") diff --git a/cosmos/dbt/parser/project.py b/cosmos/dbt/parser/project.py index ec8966903..3eff9935c 100644 --- a/cosmos/dbt/parser/project.py +++ b/cosmos/dbt/parser/project.py @@ -103,17 +103,18 @@ def extract_python_file_upstream_requirements(code: str) -> list[str]: source_code = ast.parse(code) upstream_entities = [] - model_function = "" + model_function = None for node in source_code.body: if isinstance(node, ast.FunctionDef) and node.name == DBT_PY_MODEL_METHOD_NAME: model_function = node break - for item in ast.walk(model_function): - if isinstance(item, ast.Call) and item.func.attr == DBT_PY_DEP_METHOD_NAME: - upstream_entity_id = hasattr(item.args[-1], "value") and item.args[-1].value - if upstream_entity_id: - upstream_entities.append(upstream_entity_id) + if model_function: + for item in ast.walk(model_function): + if isinstance(item, ast.Call) and item.func.attr == DBT_PY_DEP_METHOD_NAME: # type: ignore[attr-defined] + upstream_entity_id = hasattr(item.args[-1], "value") and item.args[-1].value + if upstream_entity_id: + upstream_entities.append(upstream_entity_id) return upstream_entities @@ -153,7 +154,7 @@ def __post_init__(self) -> None: code = code.split("{%")[0] elif self.type == DbtModelType.DBT_SEED: - code = None + code = "" if self.path.suffix == PYTHON_FILE_SUFFIX: config.upstream_models = config.upstream_models.union(set(extract_python_file_upstream_requirements(code))) diff --git a/dev/dags/dbt/jaffle_shop_python/.user.yml b/dev/dags/dbt/jaffle_shop_python/.user.yml new file mode 100644 index 000000000..99944580e --- /dev/null +++ b/dev/dags/dbt/jaffle_shop_python/.user.yml @@ -0,0 +1 @@ +id: 215ac1e7-601b-45dd-9867-587a3e282bce