From 682c34f8d28f8b4201ac7786fa042500fc0b5175 Mon Sep 17 00:00:00 2001 From: JoeSham Date: Wed, 28 Jun 2023 14:14:01 +0200 Subject: [PATCH] Add support for BQ SA keyfile_dict Related issue: https://github.com/astronomer/astronomer-cosmos/issues/350 --- cosmos/profiles/__init__.py | 2 + cosmos/profiles/base.py | 19 +++----- cosmos/profiles/bigquery/__init__.py | 6 ++- .../bigquery/service_account_keyfile_dict.py | 43 +++++++++++++++++++ docs/dbt/connections-profiles.rst | 8 ++++ 5 files changed, 65 insertions(+), 13 deletions(-) create mode 100644 cosmos/profiles/bigquery/service_account_keyfile_dict.py diff --git a/cosmos/profiles/__init__.py b/cosmos/profiles/__init__.py index 1399c0a1d..e006f85ee 100644 --- a/cosmos/profiles/__init__.py +++ b/cosmos/profiles/__init__.py @@ -8,6 +8,7 @@ from .base import BaseProfileMapping from .bigquery.service_account_file import GoogleCloudServiceAccountFileProfileMapping +from .bigquery.service_account_keyfile_dict import GoogleCloudServiceAccountDictProfileMapping from .databricks.token import DatabricksTokenProfileMapping from .exasol.user_pass import ExasolUserPasswordProfileMapping from .postgres.user_pass import PostgresUserPasswordProfileMapping @@ -20,6 +21,7 @@ profile_mappings: list[Type[BaseProfileMapping]] = [ GoogleCloudServiceAccountFileProfileMapping, + GoogleCloudServiceAccountDictProfileMapping, DatabricksTokenProfileMapping, PostgresUserPasswordProfileMapping, RedshiftUserPasswordProfileMapping, diff --git a/cosmos/profiles/base.py b/cosmos/profiles/base.py index 081b9fb00..9e97f1a2a 100644 --- a/cosmos/profiles/base.py +++ b/cosmos/profiles/base.py @@ -27,7 +27,7 @@ class BaseProfileMapping(ABC): airflow_connection_type: str = "generic" is_community: bool = False - required_fields: list[str] = [] + required_fields: list[str | list[str]] = [] secret_fields: list[str] = [] airflow_param_mapping: dict[str, str | list[str]] = {} @@ -42,20 +42,15 @@ def can_claim_connection(self) -> bool: if self.conn.conn_type != self.airflow_connection_type: return False - for field in self.required_fields: - try: - if not getattr(self, field): - logger.info( - "Not using mapping %s because %s is not set", - self.__class__.__name__, - field, - ) - return False - except AttributeError: + # Check if all required fields exist. If a sublist is provided, then it means 1 of the fields has to exist + for field_list in self.required_fields: + if isinstance(field_list, str): + field_list = [field_list] + if not any([getattr(self, field, False) for field in field_list]): logger.info( "Not using mapping %s because %s is not set", self.__class__.__name__, - field, + field_list, ) return False diff --git a/cosmos/profiles/bigquery/__init__.py b/cosmos/profiles/bigquery/__init__.py index 3416eb300..e322c3af5 100644 --- a/cosmos/profiles/bigquery/__init__.py +++ b/cosmos/profiles/bigquery/__init__.py @@ -1,5 +1,9 @@ "BigQuery Airflow connection -> dbt profile mappings" from .service_account_file import GoogleCloudServiceAccountFileProfileMapping +from .service_account_keyfile_dict import GoogleCloudServiceAccountDictProfileMapping -__all__ = ["GoogleCloudServiceAccountFileProfileMapping"] +__all__ = [ + "GoogleCloudServiceAccountFileProfileMapping", + "GoogleCloudServiceAccountDictProfileMapping", +] diff --git a/cosmos/profiles/bigquery/service_account_keyfile_dict.py b/cosmos/profiles/bigquery/service_account_keyfile_dict.py new file mode 100644 index 000000000..e94c9dab6 --- /dev/null +++ b/cosmos/profiles/bigquery/service_account_keyfile_dict.py @@ -0,0 +1,43 @@ +"Maps Airflow GCP connections to dbt BigQuery profiles if they use a service account keyfile dict/json." +from __future__ import annotations + +from typing import Any + +from cosmos.profiles.base import BaseProfileMapping + + +class GoogleCloudServiceAccountDictProfileMapping(BaseProfileMapping): + """ + Maps Airflow GCP connections to dbt BigQuery profiles if they use a service account keyfile dict/json. + + https://docs.getdbt.com/reference/warehouse-setups/bigquery-setup#service-account-file + https://airflow.apache.org/docs/apache-airflow-providers-google/stable/connections/gcp.html + """ + + airflow_connection_type: str = "google_cloud_platform" + + required_fields = [ + "project", + "dataset", + "keyfile_dict", + ] + + airflow_param_mapping = { + "project": "extra.project", + "dataset": "dataset", + # multiple options for keyfile_dict param name because of older Airflow versions + "keyfile_dict": ["extra.keyfile_dict", "keyfile_dict", "extra__google_cloud_platform__keyfile_dict"], + } + + @property + def profile(self) -> dict[str, Any | None]: + """Generates profile. Defaults `threads` to 1.""" + return { + "type": "bigquery", + "method": "service-account-json", + "project": self.project, + "dataset": self.dataset, + "threads": self.profile_args.get("threads") or 1, + "keyfile_json": self.keyfile_dict, + **self.profile_args, + } diff --git a/docs/dbt/connections-profiles.rst b/docs/dbt/connections-profiles.rst index 59187b9d6..643625100 100644 --- a/docs/dbt/connections-profiles.rst +++ b/docs/dbt/connections-profiles.rst @@ -89,6 +89,14 @@ Service Account File :members: +Service Account Dict +~~~~~~~~~~~~~~~~~~~~ + +.. autoclass:: cosmos.profiles.bigquery.GoogleCloudServiceAccountDictProfileMapping + :undoc-members: + :members: + + Databricks ----------