diff --git a/cosmos/__init__.py b/cosmos/__init__.py index b2583f994..2113b4522 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -12,6 +12,9 @@ from cosmos.constants import LoadMode, TestBehavior, ExecutionMode from cosmos.dataset import get_dbt_dataset +# re-export the dag and task group +from cosmos.airflow.dag import DbtDag +from cosmos.airflow.task_group import DbtTaskGroup from cosmos.operators.lazy_load import MissingPackage from cosmos.operators.local import ( diff --git a/cosmos/dataset.py b/cosmos/dataset.py index ce8d26879..f6f029bd3 100644 --- a/cosmos/dataset.py +++ b/cosmos/dataset.py @@ -1,9 +1,9 @@ -from typing import Any +from typing import Any, Tuple try: from airflow.datasets import Dataset -except ImportError: +except (ImportError, ModuleNotFoundError): from logging import getLogger logger = getLogger(__name__) @@ -11,11 +11,11 @@ class Dataset: # type: ignore[no-redef] cosmos_override = True - def __init__(self, id: str, *args: tuple[Any], **kwargs: str): + def __init__(self, id: str, *args: Tuple[Any], **kwargs: str): self.id = id logger.warning("Datasets are not supported in Airflow < 2.5.0") - def __eq__(self, other: Dataset) -> bool: + def __eq__(self, other: "Dataset") -> bool: return self.id == other.id # type: ignore[no-any-return] diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index 5681aa53b..ec1cd6ae5 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -2,7 +2,10 @@ import logging from pathlib import Path -from cosmos.dbt.graph import DbtNode +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from cosmos.dbt.graph import DbtNode SUPPORTED_CONFIG = ["materialized", "schema", "tags"] diff --git a/cosmos/operators/docker.py b/cosmos/operators/docker.py index 4c55c1a6c..c8af03e96 100644 --- a/cosmos/operators/docker.py +++ b/cosmos/operators/docker.py @@ -153,7 +153,7 @@ class DbtRunOperationDockerOperator(DbtDockerBaseOperator): """ ui_color = "#8194E0" - template_fields: Sequence[str] = "args" + template_fields: Sequence[str] = ("args",) def __init__(self, macro_name: str, args: Optional[dict[str, Any]] = None, **kwargs: str) -> None: self.macro_name = macro_name diff --git a/cosmos/operators/kubernetes.py b/cosmos/operators/kubernetes.py index 6ccea7280..f3ff6275c 100644 --- a/cosmos/operators/kubernetes.py +++ b/cosmos/operators/kubernetes.py @@ -161,7 +161,7 @@ class DbtRunOperationKubernetesOperator(DbtKubernetesBaseOperator): """ ui_color = "#8194E0" - template_fields: Sequence[str] = "args" + template_fields: Sequence[str] = ("args",) def __init__(self, macro_name: str, args: Optional[dict[str, Any]] = None, **kwargs: str) -> None: self.macro_name = macro_name diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index e01409d62..69986aaf8 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -6,7 +6,7 @@ import signal import tempfile from pathlib import Path -from typing import Any, Callable, Optional, Sequence +from typing import Any, Callable, Optional, Sequence, Tuple import yaml from airflow.compat.functools import cached_property @@ -126,7 +126,7 @@ def store_compiled_sql(self, tmp_project_dir: str, context: Context, session: Se ).delete() session.add(rtif) - def run_subprocess(self, *args: tuple[Any], **kwargs: Any) -> FullOutputSubprocessResult: + def run_subprocess(self, *args: Tuple[Any], **kwargs: Any) -> FullOutputSubprocessResult: return self.subprocess_hook.run_command(*args, **kwargs) # type: ignore[no-any-return] def get_profile_name(self, project_dir: str) -> str: @@ -404,7 +404,7 @@ class DbtRunOperationLocalOperator(DbtLocalBaseOperator): """ ui_color = "#8194E0" - template_fields: Sequence[str] = "args" + template_fields: Sequence[str] = ("args",) def __init__(self, macro_name: str, args: Optional[dict[str, Any]] = None, **kwargs: Any) -> None: self.macro_name = macro_name diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index 082d18f02..22507c472 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -3,7 +3,7 @@ import logging from pathlib import Path from tempfile import TemporaryDirectory -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Tuple from airflow.compat.functools import cached_property @@ -85,9 +85,9 @@ def venv_dbt_path( self.log.info("Using dbt version %s available at %s", dbt_version, dbt_binary) return str(dbt_binary) - def run_subprocess(self, *args: tuple[Any], **kwargs: Any) -> FullOutputSubprocessResult: - command = kwargs["command"] - + def run_subprocess( # type: ignore[override] + self, *args: Tuple[Any], command: list[str], **kwargs: Any + ) -> FullOutputSubprocessResult: if self.py_requirements: command[0] = self.venv_dbt_path