Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable BigQuery schema auto-detection with partitioning and clustering hints #1806

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 40 additions & 15 deletions dlt/destinations/impl/bigquery/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,29 @@
import functools
import os
from pathlib import Path
import time
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, cast

import google.cloud.bigquery as bigquery # noqa: I250
from google.api_core import exceptions as api_core_exceptions
from google.cloud import exceptions as gcp_exceptions
from google.api_core import retry
from google.cloud import exceptions as gcp_exceptions
from google.cloud.bigquery.retry import _RETRYABLE_REASONS

from dlt.common import logger
from dlt.common.runtime.signals import sleep
from dlt.common.json import json
from dlt.common.destination import DestinationCapabilitiesContext, PreparedTableSchema
from dlt.common.destination.reference import (
HasFollowupJobs,
FollowupJobRequest,
TLoadJobState,
RunnableLoadJob,
SupportsStagingDestination,
LoadJob,
)
from dlt.common.json import json
from dlt.common.runtime.signals import sleep
from dlt.common.schema import TColumnSchema, Schema, TTableSchemaColumns
from dlt.common.schema.typing import TColumnType
from dlt.common.schema.utils import get_inherited_table_hint
from dlt.common.schema.utils import get_inherited_table_hint, get_columns_names_with_prop
from dlt.common.storages.load_package import destination_state
from dlt.common.typing import DictStrAny
from dlt.destinations.job_impl import DestinationJsonlLoadJob, DestinationParquetLoadJob
from dlt.destinations.exceptions import (
DatabaseTransientException,
DatabaseUndefinedRelation,
Expand All @@ -49,6 +45,7 @@
from dlt.destinations.impl.bigquery.configuration import BigQueryClientConfiguration
from dlt.destinations.impl.bigquery.sql_client import BigQuerySqlClient, BQ_TERMINAL_REASONS
from dlt.destinations.job_client_impl import SqlJobClientWithStagingDataset
from dlt.destinations.job_impl import DestinationJsonlLoadJob, DestinationParquetLoadJob
from dlt.destinations.job_impl import ReferenceFollowupJobRequest
from dlt.destinations.sql_jobs import SqlMergeFollowupJob

Expand Down Expand Up @@ -227,8 +224,8 @@ def create_load_job(
def _get_table_update_sql(
self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool
) -> List[str]:
# return empty columns which will skip table CREATE or ALTER
# to let BigQuery autodetect table from data
# Return empty columns which will skip table CREATE or ALTER to let BigQuery
# auto-detect table from data.
table = self.prepare_load_table(table_name)
if should_autodetect_schema(table):
return []
Expand Down Expand Up @@ -410,11 +407,8 @@ def _create_load_job(self, table: PreparedTableSchema, file_path: str) -> bigque
max_bad_records=0,
)
if should_autodetect_schema(table):
# allow BigQuery to infer and evolve the schema, note that dlt is not
# creating such tables at all
job_config.autodetect = True
job_config.schema_update_options = bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION
job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED
# Allow BigQuery to infer and evolve the schema, note that dlt is not creating such tables at all.
job_config = self._set_user_hints_with_schema_autodetection(table, job_config)

if bucket_path:
return self.sql_client.native_connection.load_table_from_uri(
Expand All @@ -434,6 +428,37 @@ def _create_load_job(self, table: PreparedTableSchema, file_path: str) -> bigque
timeout=self.config.file_upload_timeout,
)

def _set_user_hints_with_schema_autodetection(
self, table: PreparedTableSchema, job_config: bigquery.LoadJobConfig
) -> bigquery.LoadJobConfig:
job_config.autodetect = True
job_config.schema_update_options = bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION
job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED
if partition_column_ := get_columns_names_with_prop(table, PARTITION_HINT):
partition_column = partition_column_[0]
col_dtype = table["columns"][partition_column]["data_type"]
if col_dtype == "date":
job_config.time_partitioning = bigquery.TimePartitioning(field=partition_column)
elif col_dtype == "timestamp":
job_config.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY, field=partition_column
)
elif col_dtype == "bigint":
job_config.range_partitioning = bigquery.RangePartitioning(
field=partition_column,
range_=bigquery.PartitionRange(start=-172800000, end=691200000, interval=86400),
)
if clustering_columns := get_columns_names_with_prop(table, CLUSTER_HINT):
job_config.clustering_fields = clustering_columns
if table_description := table.get(TABLE_DESCRIPTION_HINT, False):
job_config.destination_table_description = table_description
if table_expiration := table.get(TABLE_EXPIRATION_HINT, False):
raise ValueError(
f"Table expiration time ({table_expiration}) can't be set with BigQuery type"
" auto-detection enabled!"
)
return job_config

def _retrieve_load_job(self, file_path: str) -> bigquery.LoadJob:
job_id = BigQueryLoadJob.get_job_id_from_file_path(file_path)
return cast(bigquery.LoadJob, self.sql_client.native_connection.get_job(job_id))
Expand Down
114 changes: 8 additions & 106 deletions tests/load/bigquery/test_bigquery_table_builder.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,33 @@
import os
from copy import deepcopy
from typing import Iterator, Dict, Any, List
from dlt.common.destination.exceptions import DestinationSchemaTampered
from dlt.common.schema.exceptions import SchemaIdentifierNormalizationCollision
from dlt.destinations.impl.bigquery.bigquery_adapter import (
PARTITION_HINT,
CLUSTER_HINT,
)

import google
import pytest
import sqlfluff
from google.cloud.bigquery import Table

import dlt
from dlt.common.configuration import resolve_configuration
from dlt.common.configuration.specs import (
GcpServiceAccountCredentialsWithoutDefaults,
GcpServiceAccountCredentials,
)
from dlt.common.destination.exceptions import DestinationSchemaTampered
from dlt.common.pendulum import pendulum
from dlt.common.schema import Schema, utils
from dlt.common.schema.exceptions import SchemaIdentifierNormalizationCollision
from dlt.common.utils import custom_environ
from dlt.common.utils import uniq_id

from dlt.destinations.exceptions import DestinationSchemaWillNotUpdate
from dlt.destinations import bigquery
from dlt.destinations.impl.bigquery.bigquery import BigQueryClient
from dlt.destinations.adapters import bigquery_adapter
from dlt.destinations.exceptions import DestinationSchemaWillNotUpdate
from dlt.destinations.impl.bigquery.bigquery import BigQueryClient
from dlt.destinations.impl.bigquery.bigquery_adapter import (
PARTITION_HINT,
CLUSTER_HINT,
)
from dlt.destinations.impl.bigquery.configuration import BigQueryClientConfiguration

from dlt.extract import DltResource

from tests.load.utils import (
destinations_configs,
DestinationTestConfiguration,
Expand Down Expand Up @@ -1043,97 +1039,3 @@ def some_data() -> Iterator[Dict[str, str]]:
bigquery_adapter(some_data, table_expiration_datetime="2030-01-01")

assert some_data._hints["x-bigquery-table-expiration"] == pendulum.datetime(2030, 1, 1) # type: ignore


@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True, subset=["bigquery"]),
ids=lambda x: x.name,
)
def test_adapter_additional_table_hints_table_expiration(
destination_config: DestinationTestConfiguration,
) -> None:
@dlt.resource(columns=[{"name": "col1", "data_type": "text"}])
def no_hints() -> Iterator[Dict[str, str]]:
yield from [{"col1": str(i)} for i in range(10)]

hints = bigquery_adapter(
no_hints.with_name(new_name="hints"), table_expiration_datetime="2030-01-01"
)

@dlt.source(max_table_nesting=0)
def sources() -> List[DltResource]:
return [no_hints, hints]

pipeline = destination_config.setup_pipeline(
f"bigquery_{uniq_id()}",
dev_mode=True,
)

pipeline.run(sources())

with pipeline.sql_client() as c:
nc: google.cloud.bigquery.client.Client = c.native_connection

fqtn_no_hints = c.make_qualified_table_name("no_hints", escape=False)
fqtn_hints = c.make_qualified_table_name("hints", escape=False)

no_hints_table = nc.get_table(fqtn_no_hints)
hints_table = nc.get_table(fqtn_hints)

assert not no_hints_table.expires
assert hints_table.expires == pendulum.datetime(2030, 1, 1, 0)


@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True, subset=["bigquery"]),
ids=lambda x: x.name,
)
def test_adapter_merge_behaviour(
destination_config: DestinationTestConfiguration,
) -> None:
@dlt.resource(
columns=[
{"name": "col1", "data_type": "text"},
{"name": "col2", "data_type": "bigint"},
{"name": "col3", "data_type": "double"},
]
)
def hints() -> Iterator[Dict[str, Any]]:
yield from [{"col1": str(i), "col2": i, "col3": float(i)} for i in range(10)]

bigquery_adapter(hints, table_expiration_datetime="2030-01-01", cluster=["col1"])
bigquery_adapter(
hints,
table_description="A small table somewhere in the cosmos...",
partition="col2",
)

pipeline = destination_config.setup_pipeline(
f"bigquery_{uniq_id()}",
dev_mode=True,
)

pipeline.run(hints)

with pipeline.sql_client() as c:
nc: google.cloud.bigquery.client.Client = c.native_connection

table_fqtn = c.make_qualified_table_name("hints", escape=False)

table: Table = nc.get_table(table_fqtn)

table_cluster_fields = [] if table.clustering_fields is None else table.clustering_fields

# Test merging behaviour.
assert table.expires == pendulum.datetime(2030, 1, 1, 0)
assert ["col1"] == table_cluster_fields, "`hints` table IS NOT clustered by `col1`."
assert table.description == "A small table somewhere in the cosmos..."

if not table.range_partitioning:
raise ValueError("`hints` table IS NOT clustered on a column.")
else:
assert (
table.range_partitioning.field == "col2"
), "`hints` table IS NOT clustered on column `col2`."
Loading
Loading