From dfafb38ee228c631daa4960948f90ddbaaea4f36 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Wed, 11 Sep 2024 19:54:33 +0200 Subject: [PATCH 1/6] Add autodetect schema with hints test for BigQuery table builder Signed-off-by: Marcel Coetzee --- tests/destinations/conftest.py | 2 +- .../bigquery/test_bigquery_table_builder.py | 75 ++++++++++++++++--- 2 files changed, 65 insertions(+), 12 deletions(-) diff --git a/tests/destinations/conftest.py b/tests/destinations/conftest.py index 286b665adc..dd5bf34f91 100644 --- a/tests/destinations/conftest.py +++ b/tests/destinations/conftest.py @@ -5,4 +5,4 @@ wipe_pipeline, duckdb_pipeline_location, ) -from tests.common.configuration.utils import environment \ No newline at end of file +from tests.common.configuration.utils import environment diff --git a/tests/load/bigquery/test_bigquery_table_builder.py b/tests/load/bigquery/test_bigquery_table_builder.py index 0a6aa234ea..cf3bbf15a8 100644 --- a/tests/load/bigquery/test_bigquery_table_builder.py +++ b/tests/load/bigquery/test_bigquery_table_builder.py @@ -1,12 +1,6 @@ import os from copy import deepcopy from typing import Iterator, Dict, Any, List -from dlt.common.destination.exceptions import DestinationSchemaTampered -from dlt.common.schema.exceptions import SchemaIdentifierNormalizationCollision -from dlt.destinations.impl.bigquery.bigquery_adapter import ( - PARTITION_HINT, - CLUSTER_HINT, -) import google import pytest @@ -19,19 +13,22 @@ GcpServiceAccountCredentialsWithoutDefaults, GcpServiceAccountCredentials, ) +from dlt.common.destination.exceptions import DestinationSchemaTampered from dlt.common.pendulum import pendulum from dlt.common.schema import Schema, utils +from dlt.common.schema.exceptions import SchemaIdentifierNormalizationCollision from dlt.common.utils import custom_environ from dlt.common.utils import uniq_id - -from dlt.destinations.exceptions import DestinationSchemaWillNotUpdate from dlt.destinations import bigquery -from dlt.destinations.impl.bigquery.bigquery import BigQueryClient from dlt.destinations.adapters import bigquery_adapter +from dlt.destinations.exceptions import DestinationSchemaWillNotUpdate +from dlt.destinations.impl.bigquery.bigquery import BigQueryClient +from dlt.destinations.impl.bigquery.bigquery_adapter import ( + PARTITION_HINT, + CLUSTER_HINT, +) from dlt.destinations.impl.bigquery.configuration import BigQueryClientConfiguration - from dlt.extract import DltResource - from tests.load.utils import ( destinations_configs, DestinationTestConfiguration, @@ -1137,3 +1134,59 @@ def hints() -> Iterator[Dict[str, Any]]: assert ( table.range_partitioning.field == "col2" ), "`hints` table IS NOT clustered on column `col2`." + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs(default_sql_configs=True, subset=["bigquery"]), + ids=lambda x: x.name, +) +def test_adapter_autodetect_schema_with_hints( + destination_config: DestinationTestConfiguration, +) -> None: + @dlt.resource( + columns=[ + {"name": "col1", "data_type": "text"}, + {"name": "col2", "data_type": "bigint"}, + {"name": "col3", "data_type": "double"}, + ] + ) + def hints() -> Iterator[Dict[str, Any]]: + yield from [{"col1": str(i), "col2": i, "col3": float(i)} for i in range(10)] + + bigquery_adapter( + hints, + table_description="A small table somewhere in the cosmos...", + partition="col2", + table_expiration_datetime="2030-01-01", + cluster=["col1"], + autodetect_schema=True, + ) + + pipeline = destination_config.setup_pipeline( + f"bigquery_{uniq_id()}", + dev_mode=True, + ) + + pipeline.run(hints) + + with pipeline.sql_client() as c: + nc: google.cloud.bigquery.client.Client = c.native_connection + + table_fqtn = c.make_qualified_table_name("hints", escape=False) + + table: Table = nc.get_table(table_fqtn) + + table_cluster_fields = [] if table.clustering_fields is None else table.clustering_fields + + # Test merging behaviour. + assert table.expires == pendulum.datetime(2030, 1, 1, 0) + assert ["col1"] == table_cluster_fields, "`hints` table IS NOT clustered by `col1`." + assert table.description == "A small table somewhere in the cosmos..." + + if not table.range_partitioning: + raise ValueError("`hints` table IS NOT clustered on a column.") + else: + assert ( + table.range_partitioning.field == "col2" + ), "`hints` table IS NOT clustered on column `col2`." From 133c3004209d5088f2d3a2a3148b46b507b9df99 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Thu, 12 Sep 2024 22:02:14 +0200 Subject: [PATCH 2/6] Use SDK to set hints for autodetect_schema path Signed-off-by: Marcel Coetzee --- dlt/destinations/impl/bigquery/bigquery.py | 45 ++++++++++++++----- .../bigquery/test_bigquery_table_builder.py | 1 + 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/dlt/destinations/impl/bigquery/bigquery.py b/dlt/destinations/impl/bigquery/bigquery.py index 5bc7a64e7d..9a565df852 100644 --- a/dlt/destinations/impl/bigquery/bigquery.py +++ b/dlt/destinations/impl/bigquery/bigquery.py @@ -1,33 +1,29 @@ -import functools import os from pathlib import Path -import time from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, cast import google.cloud.bigquery as bigquery # noqa: I250 from google.api_core import exceptions as api_core_exceptions -from google.cloud import exceptions as gcp_exceptions from google.api_core import retry +from google.cloud import exceptions as gcp_exceptions from google.cloud.bigquery.retry import _RETRYABLE_REASONS from dlt.common import logger -from dlt.common.runtime.signals import sleep -from dlt.common.json import json from dlt.common.destination import DestinationCapabilitiesContext, PreparedTableSchema from dlt.common.destination.reference import ( HasFollowupJobs, FollowupJobRequest, - TLoadJobState, RunnableLoadJob, SupportsStagingDestination, LoadJob, ) +from dlt.common.json import json +from dlt.common.runtime.signals import sleep from dlt.common.schema import TColumnSchema, Schema, TTableSchemaColumns from dlt.common.schema.typing import TColumnType -from dlt.common.schema.utils import get_inherited_table_hint +from dlt.common.schema.utils import get_inherited_table_hint, get_columns_names_with_prop from dlt.common.storages.load_package import destination_state from dlt.common.typing import DictStrAny -from dlt.destinations.job_impl import DestinationJsonlLoadJob, DestinationParquetLoadJob from dlt.destinations.exceptions import ( DatabaseTransientException, DatabaseUndefinedRelation, @@ -49,6 +45,7 @@ from dlt.destinations.impl.bigquery.configuration import BigQueryClientConfiguration from dlt.destinations.impl.bigquery.sql_client import BigQuerySqlClient, BQ_TERMINAL_REASONS from dlt.destinations.job_client_impl import SqlJobClientWithStagingDataset +from dlt.destinations.job_impl import DestinationJsonlLoadJob, DestinationParquetLoadJob from dlt.destinations.job_impl import ReferenceFollowupJobRequest from dlt.destinations.sql_jobs import SqlMergeFollowupJob @@ -227,8 +224,8 @@ def create_load_job( def _get_table_update_sql( self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool ) -> List[str]: - # return empty columns which will skip table CREATE or ALTER - # to let BigQuery autodetect table from data + # Return empty columns which will skip table CREATE or ALTER to let BigQuery + # auto-detect table from data. table = self.prepare_load_table(table_name) if should_autodetect_schema(table): return [] @@ -410,11 +407,35 @@ def _create_load_job(self, table: PreparedTableSchema, file_path: str) -> bigque max_bad_records=0, ) if should_autodetect_schema(table): - # allow BigQuery to infer and evolve the schema, note that dlt is not - # creating such tables at all + # Allow BigQuery to infer and evolve the schema, note that dlt is not creating such tables at all. job_config.autodetect = True job_config.schema_update_options = bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED + if partition_column_ := get_columns_names_with_prop(table, PARTITION_HINT): + partition_column = partition_column_[0] + col_dtype = table["columns"][partition_column]["data_type"] + if col_dtype == "date": + job_config.time_partitioning = bigquery.TimePartitioning( + field=partition_column + ) + elif col_dtype == "timestamp": + job_config.time_partitioning = bigquery.TimePartitioning( + type_=bigquery.TimePartitioningType.DAY, + field=partition_column + ) + elif col_dtype == "bigint": + job_config.range_partitioning = bigquery.RangePartitioning( + field=partition_column, + range_=bigquery.PartitionRange( + start=-172800000, end=691200000, interval=86400 + ), + ) + + if clustering_columns := get_columns_names_with_prop(table, CLUSTER_HINT): + job_config.clustering_fields = clustering_columns + + # TODO: Write tests to cover all partitioning types above as well as clustering + # TODO: Table description, expiration if bucket_path: return self.sql_client.native_connection.load_table_from_uri( diff --git a/tests/load/bigquery/test_bigquery_table_builder.py b/tests/load/bigquery/test_bigquery_table_builder.py index cf3bbf15a8..177c4795c6 100644 --- a/tests/load/bigquery/test_bigquery_table_builder.py +++ b/tests/load/bigquery/test_bigquery_table_builder.py @@ -1168,6 +1168,7 @@ def hints() -> Iterator[Dict[str, Any]]: dev_mode=True, ) + pipeline.run(hints) with pipeline.sql_client() as c: From 7ce3de64667f9d5c248fdd3a5b9ef64204c855f5 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Fri, 13 Sep 2024 17:49:59 +0200 Subject: [PATCH 3/6] Pass timestamp test Signed-off-by: Marcel Coetzee --- dlt/destinations/impl/bigquery/bigquery.py | 16 +-- .../bigquery/test_bigquery_table_builder.py | 111 +++++++++++++++--- 2 files changed, 106 insertions(+), 21 deletions(-) diff --git a/dlt/destinations/impl/bigquery/bigquery.py b/dlt/destinations/impl/bigquery/bigquery.py index 9a565df852..97f20b5bf1 100644 --- a/dlt/destinations/impl/bigquery/bigquery.py +++ b/dlt/destinations/impl/bigquery/bigquery.py @@ -415,13 +415,10 @@ def _create_load_job(self, table: PreparedTableSchema, file_path: str) -> bigque partition_column = partition_column_[0] col_dtype = table["columns"][partition_column]["data_type"] if col_dtype == "date": - job_config.time_partitioning = bigquery.TimePartitioning( - field=partition_column - ) + job_config.time_partitioning = bigquery.TimePartitioning(field=partition_column) elif col_dtype == "timestamp": job_config.time_partitioning = bigquery.TimePartitioning( - type_=bigquery.TimePartitioningType.DAY, - field=partition_column + type_=bigquery.TimePartitioningType.DAY, field=partition_column ) elif col_dtype == "bigint": job_config.range_partitioning = bigquery.RangePartitioning( @@ -434,8 +431,13 @@ def _create_load_job(self, table: PreparedTableSchema, file_path: str) -> bigque if clustering_columns := get_columns_names_with_prop(table, CLUSTER_HINT): job_config.clustering_fields = clustering_columns - # TODO: Write tests to cover all partitioning types above as well as clustering - # TODO: Table description, expiration + if table_description := table.get(TABLE_DESCRIPTION_HINT, False): + job_config.destination_table_description = table_description + if table_expiration := table.get(TABLE_EXPIRATION_HINT, False): + raise ValueError( + f"Table expiration time ({table_expiration}) can't be set with BigQuery type" + " auto-detection enabled!" + ) if bucket_path: return self.sql_client.native_connection.load_table_from_uri( diff --git a/tests/load/bigquery/test_bigquery_table_builder.py b/tests/load/bigquery/test_bigquery_table_builder.py index 177c4795c6..07fe0a4fd9 100644 --- a/tests/load/bigquery/test_bigquery_table_builder.py +++ b/tests/load/bigquery/test_bigquery_table_builder.py @@ -1151,14 +1151,35 @@ def test_adapter_autodetect_schema_with_hints( {"name": "col3", "data_type": "double"}, ] ) - def hints() -> Iterator[Dict[str, Any]]: + def general_types() -> Iterator[Dict[str, Any]]: yield from [{"col1": str(i), "col2": i, "col3": float(i)} for i in range(10)] + @dlt.resource( + columns=[ + {"name": "my_time_column", "data_type": "timestamp"}, + ] + ) + def partition_time() -> Iterator[Dict[str, Any]]: + for i in range(10): + yield { + "my_time_column": pendulum.from_timestamp(1700784000 + i * 50_000), + } + + @dlt.resource( + columns=[ + {"name": "my_date_column", "data_type": "date"}, + ] + ) + def partition_date() -> Iterator[Dict[str, Any]]: + for i in range(10): + yield { + "my_date_column": pendulum.from_timestamp(1700784000 + i * 50_000).date(), + } + bigquery_adapter( - hints, + general_types, table_description="A small table somewhere in the cosmos...", partition="col2", - table_expiration_datetime="2030-01-01", cluster=["col1"], autodetect_schema=True, ) @@ -1168,26 +1189,88 @@ def hints() -> Iterator[Dict[str, Any]]: dev_mode=True, ) + pipeline.run(general_types) - pipeline.run(hints) + bigquery_adapter( + partition_time, + partition="my_time_column", + autodetect_schema=True, + ) + + pipeline_time = destination_config.setup_pipeline( + f"bigquery_{uniq_id()}", + dev_mode=True, + ) + + pipeline_time.run(partition_time) + + bigquery_adapter( + partition_date, + partition="my_date_column", + autodetect_schema=True, + ) + + pipeline_date = destination_config.setup_pipeline( + f"bigquery_{uniq_id()}", + dev_mode=True, + ) + + pipeline_date.run(partition_date) with pipeline.sql_client() as c: nc: google.cloud.bigquery.client.Client = c.native_connection - table_fqtn = c.make_qualified_table_name("hints", escape=False) + table_fqtn = c.make_qualified_table_name("general_types", escape=False) table: Table = nc.get_table(table_fqtn) table_cluster_fields = [] if table.clustering_fields is None else table.clustering_fields + assert ["col1"] == table_cluster_fields, "NOT clustered by `col1`." - # Test merging behaviour. - assert table.expires == pendulum.datetime(2030, 1, 1, 0) - assert ["col1"] == table_cluster_fields, "`hints` table IS NOT clustered by `col1`." assert table.description == "A small table somewhere in the cosmos..." + assert table.range_partitioning.field == "col2", "NOT partitioned on column `col2`." - if not table.range_partitioning: - raise ValueError("`hints` table IS NOT clustered on a column.") - else: - assert ( - table.range_partitioning.field == "col2" - ), "`hints` table IS NOT clustered on column `col2`." + with pipeline_time.sql_client() as c: + nc: google.cloud.bigquery.client.Client = c.native_connection # type: ignore[no-redef] + table_fqtn = c.make_qualified_table_name("partition_time", escape=False) + table: Table = nc.get_table(table_fqtn) # type: ignore[no-redef] + assert table.time_partitioning.field == "my_time_column" + + with pipeline_date.sql_client() as c: + nc: google.cloud.bigquery.client.Client = c.native_connection # type: ignore[no-redef] + table_fqtn = c.make_qualified_table_name("partition_date", escape=False) + table: Table = nc.get_table(table_fqtn) # type: ignore[no-redef] + assert table.time_partitioning.field == "my_date_column" + assert table.time_partitioning.type_ == "DAY" + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs(default_sql_configs=True, subset=["bigquery"]), + ids=lambda x: x.name, +) +def test_adapter_no_expiration_with_autodetection_allowed( + destination_config: DestinationTestConfiguration, +) -> None: + @dlt.resource( + columns=[ + {"name": "col1", "data_type": "text"}, + {"name": "col2", "data_type": "bigint"}, + {"name": "col3", "data_type": "double"}, + ] + ) + def data() -> Iterator[Dict[str, Any]]: + yield from [{"col1": str(i), "col2": i, "col3": float(i)} for i in range(10)] + + bigquery_adapter( + data, + table_expiration_datetime="2030-01-01", + autodetect_schema=True, + ) + + pipeline = destination_config.setup_pipeline( + f"bigquery_{uniq_id()}", + dev_mode=True, + ) + + pipeline.run(data) From e12675813c08ff0633824a2dd2fcb66c46abe744 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Fri, 13 Sep 2024 17:50:49 +0200 Subject: [PATCH 4/6] Remove redundant test Signed-off-by: Marcel Coetzee --- .../bigquery/test_bigquery_table_builder.py | 32 ------------------- 1 file changed, 32 deletions(-) diff --git a/tests/load/bigquery/test_bigquery_table_builder.py b/tests/load/bigquery/test_bigquery_table_builder.py index 07fe0a4fd9..954933e191 100644 --- a/tests/load/bigquery/test_bigquery_table_builder.py +++ b/tests/load/bigquery/test_bigquery_table_builder.py @@ -1242,35 +1242,3 @@ def partition_date() -> Iterator[Dict[str, Any]]: table: Table = nc.get_table(table_fqtn) # type: ignore[no-redef] assert table.time_partitioning.field == "my_date_column" assert table.time_partitioning.type_ == "DAY" - - -@pytest.mark.parametrize( - "destination_config", - destinations_configs(default_sql_configs=True, subset=["bigquery"]), - ids=lambda x: x.name, -) -def test_adapter_no_expiration_with_autodetection_allowed( - destination_config: DestinationTestConfiguration, -) -> None: - @dlt.resource( - columns=[ - {"name": "col1", "data_type": "text"}, - {"name": "col2", "data_type": "bigint"}, - {"name": "col3", "data_type": "double"}, - ] - ) - def data() -> Iterator[Dict[str, Any]]: - yield from [{"col1": str(i), "col2": i, "col3": float(i)} for i in range(10)] - - bigquery_adapter( - data, - table_expiration_datetime="2030-01-01", - autodetect_schema=True, - ) - - pipeline = destination_config.setup_pipeline( - f"bigquery_{uniq_id()}", - dev_mode=True, - ) - - pipeline.run(data) From 6910aa4b847b9d5565d20e65b2b93dce359fa8c0 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Fri, 13 Sep 2024 17:59:56 +0200 Subject: [PATCH 5/6] Extract BigQuery load job configuration into own method Signed-off-by: Marcel Coetzee --- dlt/destinations/impl/bigquery/bigquery.py | 62 +++++++++++----------- 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/dlt/destinations/impl/bigquery/bigquery.py b/dlt/destinations/impl/bigquery/bigquery.py index 97f20b5bf1..b4b9e01dfa 100644 --- a/dlt/destinations/impl/bigquery/bigquery.py +++ b/dlt/destinations/impl/bigquery/bigquery.py @@ -408,36 +408,7 @@ def _create_load_job(self, table: PreparedTableSchema, file_path: str) -> bigque ) if should_autodetect_schema(table): # Allow BigQuery to infer and evolve the schema, note that dlt is not creating such tables at all. - job_config.autodetect = True - job_config.schema_update_options = bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION - job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED - if partition_column_ := get_columns_names_with_prop(table, PARTITION_HINT): - partition_column = partition_column_[0] - col_dtype = table["columns"][partition_column]["data_type"] - if col_dtype == "date": - job_config.time_partitioning = bigquery.TimePartitioning(field=partition_column) - elif col_dtype == "timestamp": - job_config.time_partitioning = bigquery.TimePartitioning( - type_=bigquery.TimePartitioningType.DAY, field=partition_column - ) - elif col_dtype == "bigint": - job_config.range_partitioning = bigquery.RangePartitioning( - field=partition_column, - range_=bigquery.PartitionRange( - start=-172800000, end=691200000, interval=86400 - ), - ) - - if clustering_columns := get_columns_names_with_prop(table, CLUSTER_HINT): - job_config.clustering_fields = clustering_columns - - if table_description := table.get(TABLE_DESCRIPTION_HINT, False): - job_config.destination_table_description = table_description - if table_expiration := table.get(TABLE_EXPIRATION_HINT, False): - raise ValueError( - f"Table expiration time ({table_expiration}) can't be set with BigQuery type" - " auto-detection enabled!" - ) + job_config = self._set_user_hints_with_schema_autodetection(table, job_config) if bucket_path: return self.sql_client.native_connection.load_table_from_uri( @@ -457,6 +428,37 @@ def _create_load_job(self, table: PreparedTableSchema, file_path: str) -> bigque timeout=self.config.file_upload_timeout, ) + def _set_user_hints_with_schema_autodetection( + self, table: PreparedTableSchema, job_config: bigquery.LoadJobConfig + ) -> bigquery.LoadJobConfig: + job_config.autodetect = True + job_config.schema_update_options = bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION + job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED + if partition_column_ := get_columns_names_with_prop(table, PARTITION_HINT): + partition_column = partition_column_[0] + col_dtype = table["columns"][partition_column]["data_type"] + if col_dtype == "date": + job_config.time_partitioning = bigquery.TimePartitioning(field=partition_column) + elif col_dtype == "timestamp": + job_config.time_partitioning = bigquery.TimePartitioning( + type_=bigquery.TimePartitioningType.DAY, field=partition_column + ) + elif col_dtype == "bigint": + job_config.range_partitioning = bigquery.RangePartitioning( + field=partition_column, + range_=bigquery.PartitionRange(start=-172800000, end=691200000, interval=86400), + ) + if clustering_columns := get_columns_names_with_prop(table, CLUSTER_HINT): + job_config.clustering_fields = clustering_columns + if table_description := table.get(TABLE_DESCRIPTION_HINT, False): + job_config.destination_table_description = table_description + if table_expiration := table.get(TABLE_EXPIRATION_HINT, False): + raise ValueError( + f"Table expiration time ({table_expiration}) can't be set with BigQuery type" + " auto-detection enabled!" + ) + return job_config + def _retrieve_load_job(self, file_path: str) -> bigquery.LoadJob: job_id = BigQueryLoadJob.get_job_id_from_file_path(file_path) return cast(bigquery.LoadJob, self.sql_client.native_connection.get_job(job_id)) From 32430257f1369677920d945c719308f81a7cb496 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Fri, 13 Sep 2024 21:53:44 +0200 Subject: [PATCH 6/6] moves pipeline tests to pipelines --- .../bigquery/test_bigquery_table_builder.py | 203 ---------------- tests/load/pipeline/test_bigquery.py | 217 +++++++++++++++++- 2 files changed, 216 insertions(+), 204 deletions(-) diff --git a/tests/load/bigquery/test_bigquery_table_builder.py b/tests/load/bigquery/test_bigquery_table_builder.py index 954933e191..18059767cd 100644 --- a/tests/load/bigquery/test_bigquery_table_builder.py +++ b/tests/load/bigquery/test_bigquery_table_builder.py @@ -5,7 +5,6 @@ import google import pytest import sqlfluff -from google.cloud.bigquery import Table import dlt from dlt.common.configuration import resolve_configuration @@ -1040,205 +1039,3 @@ def some_data() -> Iterator[Dict[str, str]]: bigquery_adapter(some_data, table_expiration_datetime="2030-01-01") assert some_data._hints["x-bigquery-table-expiration"] == pendulum.datetime(2030, 1, 1) # type: ignore - - -@pytest.mark.parametrize( - "destination_config", - destinations_configs(default_sql_configs=True, subset=["bigquery"]), - ids=lambda x: x.name, -) -def test_adapter_additional_table_hints_table_expiration( - destination_config: DestinationTestConfiguration, -) -> None: - @dlt.resource(columns=[{"name": "col1", "data_type": "text"}]) - def no_hints() -> Iterator[Dict[str, str]]: - yield from [{"col1": str(i)} for i in range(10)] - - hints = bigquery_adapter( - no_hints.with_name(new_name="hints"), table_expiration_datetime="2030-01-01" - ) - - @dlt.source(max_table_nesting=0) - def sources() -> List[DltResource]: - return [no_hints, hints] - - pipeline = destination_config.setup_pipeline( - f"bigquery_{uniq_id()}", - dev_mode=True, - ) - - pipeline.run(sources()) - - with pipeline.sql_client() as c: - nc: google.cloud.bigquery.client.Client = c.native_connection - - fqtn_no_hints = c.make_qualified_table_name("no_hints", escape=False) - fqtn_hints = c.make_qualified_table_name("hints", escape=False) - - no_hints_table = nc.get_table(fqtn_no_hints) - hints_table = nc.get_table(fqtn_hints) - - assert not no_hints_table.expires - assert hints_table.expires == pendulum.datetime(2030, 1, 1, 0) - - -@pytest.mark.parametrize( - "destination_config", - destinations_configs(default_sql_configs=True, subset=["bigquery"]), - ids=lambda x: x.name, -) -def test_adapter_merge_behaviour( - destination_config: DestinationTestConfiguration, -) -> None: - @dlt.resource( - columns=[ - {"name": "col1", "data_type": "text"}, - {"name": "col2", "data_type": "bigint"}, - {"name": "col3", "data_type": "double"}, - ] - ) - def hints() -> Iterator[Dict[str, Any]]: - yield from [{"col1": str(i), "col2": i, "col3": float(i)} for i in range(10)] - - bigquery_adapter(hints, table_expiration_datetime="2030-01-01", cluster=["col1"]) - bigquery_adapter( - hints, - table_description="A small table somewhere in the cosmos...", - partition="col2", - ) - - pipeline = destination_config.setup_pipeline( - f"bigquery_{uniq_id()}", - dev_mode=True, - ) - - pipeline.run(hints) - - with pipeline.sql_client() as c: - nc: google.cloud.bigquery.client.Client = c.native_connection - - table_fqtn = c.make_qualified_table_name("hints", escape=False) - - table: Table = nc.get_table(table_fqtn) - - table_cluster_fields = [] if table.clustering_fields is None else table.clustering_fields - - # Test merging behaviour. - assert table.expires == pendulum.datetime(2030, 1, 1, 0) - assert ["col1"] == table_cluster_fields, "`hints` table IS NOT clustered by `col1`." - assert table.description == "A small table somewhere in the cosmos..." - - if not table.range_partitioning: - raise ValueError("`hints` table IS NOT clustered on a column.") - else: - assert ( - table.range_partitioning.field == "col2" - ), "`hints` table IS NOT clustered on column `col2`." - - -@pytest.mark.parametrize( - "destination_config", - destinations_configs(default_sql_configs=True, subset=["bigquery"]), - ids=lambda x: x.name, -) -def test_adapter_autodetect_schema_with_hints( - destination_config: DestinationTestConfiguration, -) -> None: - @dlt.resource( - columns=[ - {"name": "col1", "data_type": "text"}, - {"name": "col2", "data_type": "bigint"}, - {"name": "col3", "data_type": "double"}, - ] - ) - def general_types() -> Iterator[Dict[str, Any]]: - yield from [{"col1": str(i), "col2": i, "col3": float(i)} for i in range(10)] - - @dlt.resource( - columns=[ - {"name": "my_time_column", "data_type": "timestamp"}, - ] - ) - def partition_time() -> Iterator[Dict[str, Any]]: - for i in range(10): - yield { - "my_time_column": pendulum.from_timestamp(1700784000 + i * 50_000), - } - - @dlt.resource( - columns=[ - {"name": "my_date_column", "data_type": "date"}, - ] - ) - def partition_date() -> Iterator[Dict[str, Any]]: - for i in range(10): - yield { - "my_date_column": pendulum.from_timestamp(1700784000 + i * 50_000).date(), - } - - bigquery_adapter( - general_types, - table_description="A small table somewhere in the cosmos...", - partition="col2", - cluster=["col1"], - autodetect_schema=True, - ) - - pipeline = destination_config.setup_pipeline( - f"bigquery_{uniq_id()}", - dev_mode=True, - ) - - pipeline.run(general_types) - - bigquery_adapter( - partition_time, - partition="my_time_column", - autodetect_schema=True, - ) - - pipeline_time = destination_config.setup_pipeline( - f"bigquery_{uniq_id()}", - dev_mode=True, - ) - - pipeline_time.run(partition_time) - - bigquery_adapter( - partition_date, - partition="my_date_column", - autodetect_schema=True, - ) - - pipeline_date = destination_config.setup_pipeline( - f"bigquery_{uniq_id()}", - dev_mode=True, - ) - - pipeline_date.run(partition_date) - - with pipeline.sql_client() as c: - nc: google.cloud.bigquery.client.Client = c.native_connection - - table_fqtn = c.make_qualified_table_name("general_types", escape=False) - - table: Table = nc.get_table(table_fqtn) - - table_cluster_fields = [] if table.clustering_fields is None else table.clustering_fields - assert ["col1"] == table_cluster_fields, "NOT clustered by `col1`." - - assert table.description == "A small table somewhere in the cosmos..." - assert table.range_partitioning.field == "col2", "NOT partitioned on column `col2`." - - with pipeline_time.sql_client() as c: - nc: google.cloud.bigquery.client.Client = c.native_connection # type: ignore[no-redef] - table_fqtn = c.make_qualified_table_name("partition_time", escape=False) - table: Table = nc.get_table(table_fqtn) # type: ignore[no-redef] - assert table.time_partitioning.field == "my_time_column" - - with pipeline_date.sql_client() as c: - nc: google.cloud.bigquery.client.Client = c.native_connection # type: ignore[no-redef] - table_fqtn = c.make_qualified_table_name("partition_date", escape=False) - table: Table = nc.get_table(table_fqtn) # type: ignore[no-redef] - assert table.time_partitioning.field == "my_date_column" - assert table.time_partitioning.type_ == "DAY" diff --git a/tests/load/pipeline/test_bigquery.py b/tests/load/pipeline/test_bigquery.py index 809bd11bc0..9d2a4abf49 100644 --- a/tests/load/pipeline/test_bigquery.py +++ b/tests/load/pipeline/test_bigquery.py @@ -1,10 +1,15 @@ +from typing import Any, Dict, Iterator +from git import List import pytest import io import dlt -from dlt.common import Decimal, json +from dlt.common import Decimal, json, pendulum from dlt.common.typing import TLoaderFileFormat +from dlt.common.utils import uniq_id +from dlt.destinations.adapters import bigquery_adapter +from dlt.extract.resource import DltResource from tests.pipeline.utils import assert_load_info from tests.load.utils import destinations_configs, DestinationTestConfiguration @@ -145,3 +150,213 @@ def load_cve(stage: int): field = field.fields[0] # it looks like BigQuery can evolve structs and the field is added nested_field = next(f for f in field.fields if f.name == "refsource") + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs(default_sql_configs=True, subset=["bigquery"]), + ids=lambda x: x.name, +) +def test_adapter_additional_table_hints_table_expiration( + destination_config: DestinationTestConfiguration, +) -> None: + import google + + @dlt.resource(columns=[{"name": "col1", "data_type": "text"}]) + def no_hints() -> Iterator[Dict[str, str]]: + yield from [{"col1": str(i)} for i in range(10)] + + hints = bigquery_adapter( + no_hints.with_name(new_name="hints"), table_expiration_datetime="2030-01-01" + ) + + @dlt.source(max_table_nesting=0) + def sources() -> List[DltResource]: + return [no_hints, hints] + + pipeline = destination_config.setup_pipeline( + f"bigquery_{uniq_id()}", + dev_mode=True, + ) + + pipeline.run(sources()) + + with pipeline.sql_client() as c: + nc: google.cloud.bigquery.client.Client = c.native_connection + + fqtn_no_hints = c.make_qualified_table_name("no_hints", escape=False) + fqtn_hints = c.make_qualified_table_name("hints", escape=False) + + no_hints_table = nc.get_table(fqtn_no_hints) + hints_table = nc.get_table(fqtn_hints) + + assert not no_hints_table.expires + assert hints_table.expires == pendulum.datetime(2030, 1, 1, 0) + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs(default_sql_configs=True, subset=["bigquery"]), + ids=lambda x: x.name, +) +def test_adapter_merge_behaviour( + destination_config: DestinationTestConfiguration, +) -> None: + import google + from google.cloud.bigquery import Table + + @dlt.resource( + columns=[ + {"name": "col1", "data_type": "text"}, + {"name": "col2", "data_type": "bigint"}, + {"name": "col3", "data_type": "double"}, + ] + ) + def hints() -> Iterator[Dict[str, Any]]: + yield from [{"col1": str(i), "col2": i, "col3": float(i)} for i in range(10)] + + bigquery_adapter(hints, table_expiration_datetime="2030-01-01", cluster=["col1"]) + bigquery_adapter( + hints, + table_description="A small table somewhere in the cosmos...", + partition="col2", + ) + + pipeline = destination_config.setup_pipeline( + f"bigquery_{uniq_id()}", + dev_mode=True, + ) + + pipeline.run(hints) + + with pipeline.sql_client() as c: + nc: google.cloud.bigquery.client.Client = c.native_connection + + table_fqtn = c.make_qualified_table_name("hints", escape=False) + + table: Table = nc.get_table(table_fqtn) + + table_cluster_fields = [] if table.clustering_fields is None else table.clustering_fields + + # Test merging behaviour. + assert table.expires == pendulum.datetime(2030, 1, 1, 0) + assert ["col1"] == table_cluster_fields, "`hints` table IS NOT clustered by `col1`." + assert table.description == "A small table somewhere in the cosmos..." + + if not table.range_partitioning: + raise ValueError("`hints` table IS NOT clustered on a column.") + else: + assert ( + table.range_partitioning.field == "col2" + ), "`hints` table IS NOT clustered on column `col2`." + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs(default_sql_configs=True, subset=["bigquery"]), + ids=lambda x: x.name, +) +def test_adapter_autodetect_schema_with_hints( + destination_config: DestinationTestConfiguration, +) -> None: + import google + from google.cloud.bigquery import Table + + @dlt.resource( + columns=[ + {"name": "col1", "data_type": "text"}, + {"name": "col2", "data_type": "bigint"}, + {"name": "col3", "data_type": "double"}, + ] + ) + def general_types() -> Iterator[Dict[str, Any]]: + yield from [{"col1": str(i), "col2": i, "col3": float(i)} for i in range(10)] + + @dlt.resource( + columns=[ + {"name": "my_time_column", "data_type": "timestamp"}, + ] + ) + def partition_time() -> Iterator[Dict[str, Any]]: + for i in range(10): + yield { + "my_time_column": pendulum.from_timestamp(1700784000 + i * 50_000), + } + + @dlt.resource( + columns=[ + {"name": "my_date_column", "data_type": "date"}, + ] + ) + def partition_date() -> Iterator[Dict[str, Any]]: + for i in range(10): + yield { + "my_date_column": pendulum.from_timestamp(1700784000 + i * 50_000).date(), + } + + bigquery_adapter( + general_types, + table_description="A small table somewhere in the cosmos...", + partition="col2", + cluster=["col1"], + autodetect_schema=True, + ) + + pipeline = destination_config.setup_pipeline( + f"bigquery_{uniq_id()}", + dev_mode=True, + ) + + pipeline.run(general_types) + + bigquery_adapter( + partition_time, + partition="my_time_column", + autodetect_schema=True, + ) + + pipeline_time = destination_config.setup_pipeline( + f"bigquery_{uniq_id()}", + dev_mode=True, + ) + + pipeline_time.run(partition_time) + + bigquery_adapter( + partition_date, + partition="my_date_column", + autodetect_schema=True, + ) + + pipeline_date = destination_config.setup_pipeline( + f"bigquery_{uniq_id()}", + dev_mode=True, + ) + + pipeline_date.run(partition_date) + + with pipeline.sql_client() as c: + nc: google.cloud.bigquery.client.Client = c.native_connection + + table_fqtn = c.make_qualified_table_name("general_types", escape=False) + + table: Table = nc.get_table(table_fqtn) + + table_cluster_fields = [] if table.clustering_fields is None else table.clustering_fields + assert ["col1"] == table_cluster_fields, "NOT clustered by `col1`." + + assert table.description == "A small table somewhere in the cosmos..." + assert table.range_partitioning.field == "col2", "NOT partitioned on column `col2`." + + with pipeline_time.sql_client() as c: + nc: google.cloud.bigquery.client.Client = c.native_connection # type: ignore[no-redef] + table_fqtn = c.make_qualified_table_name("partition_time", escape=False) + table: Table = nc.get_table(table_fqtn) # type: ignore[no-redef] + assert table.time_partitioning.field == "my_time_column" + + with pipeline_date.sql_client() as c: + nc: google.cloud.bigquery.client.Client = c.native_connection # type: ignore[no-redef] + table_fqtn = c.make_qualified_table_name("partition_date", escape=False) + table: Table = nc.get_table(table_fqtn) # type: ignore[no-redef] + assert table.time_partitioning.field == "my_date_column" + assert table.time_partitioning.type_ == "DAY"