Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When using BigQuery as a destination, adapter settings partition and autodetect_schema are exclusive #1746

Closed
neuromantik33 opened this issue Aug 26, 2024 · 3 comments · Fixed by #1806
Assignees
Labels
community This issue came from slack community workspace enhancement New feature or request

Comments

@neuromantik33
Copy link
Contributor

dlt version

0.5.3

Describe the problem

I would like for dlt to be able to create partitioned BQ tables with partition columns as well as able to auto detect any schema changes as described here. When I run this code, everything works great

@dlt.source
def intra_db(conn_uri: str) -> DltSource:
    return [  # type: ignore[return-value]
        bigquery_adapter(
            sql_table(
                credentials=conn_uri,
                table=table,
                incremental=dlt.sources.incremental("updated_at"),
                detect_precision_hints=True,
                table_adapter_callback=table_adapters.get(table),  # type: ignore[arg-type]
            ),
            partition="updated_at",
        )
        for table in tables
    ]

and I have indeed time partitioned tables.

However, when working with complex types in parquet, I've only had success when adding the autodetect_schema=True hint to the bigquery_adapter function call. However once that is done, any partition info is lost upon table creation.

Expected behavior

The expected behavior is too be able to infer the schema using BQ's autodetect mecanism yet allow any manual column hints like clustering or partitioning.

Steps to reproduce

  • Create a table table_a in a SQL database test with an updated_at timestamp column
  • Create a pipeline from sql_database -> bigquery with this code
from typing import Callable

import dlt
from dlt.destinations.adapters import bigquery_adapter
from dlt.extract import DltSource
from sqlalchemy import Table

from intra_dlt.helpers.sql_database import sql_table

table_adapters: dict[str, Callable[[Table], None]] = {}
tables = [
    "table_a",
]


@dlt.source
def test_db(conn_uri: str) -> DltSource:
    return [  # type: ignore[return-value]
        bigquery_adapter(
            sql_table(
                credentials=conn_uri,
                table=table,
                incremental=dlt.sources.incremental("updated_at"),
                detect_precision_hints=True,
                table_adapter_callback=table_adapters.get(table),  # type: ignore[arg-type]
            ),
            partition="updated_at",
            autodetect_schema=True, ## <-- FIXME Bug is here
        )
        for table in tables
    ]


if __name__ == "__main__":
    from dlt import Pipeline

    pipeline: Pipeline = dlt.pipeline(
        pipeline_name="intra_db",
        destination="bigquery",
        staging="filesystem",
        dataset_name="test",
    )

    pg_dsn = "postgresql+psycopg2://test:test@localhost:5432/test"
    load_info = pipeline.run(test_db(pg_dsn), loader_file_format="parquet")

    load_info.raise_on_failed_jobs()
    print(load_info)

Operating system

Linux

Runtime environment

Local

Python version

3.10

dlt data source

sql_database

dlt destination

Google BigQuery

Other deployment details

The issue is with this piece of code https://github.com/dlt-hub/dlt/blob/devel/dlt/destinations/impl/bigquery/bigquery.py#L281-L287

def _get_table_update_sql(
    self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool
) -> List[str]:
    # return empty columns which will skip table CREATE or ALTER
    # to let BigQuery autodetect table from data
    if self._should_autodetect_schema(table_name):
        return []
    ...

which essentially ignores any other hints in place of auto detection.

Additional information

No response

@dlt-hub dlt-hub deleted a comment Aug 26, 2024
@rudolfix
Copy link
Collaborator

@neuromantik33 is there a way to implement it? when we use schema autodetection I think there's no way to ask to create partitions etc. if you known any workarounds let us know - this will speed up implementation significantly

@neuromantik33
Copy link
Contributor Author

@rudolfix Well I dont know what the latest python API provides but at least here is an example using a simple shell script

#!/bin/bash

set -ex

DATASET=${DATASET:-drnick}
TABLE=${TABLE:-test_autodetect}
TABLE_QNAME="${DATASET}.${TABLE}"


BUCKET=${BUCKET:-dev-42network}
GS_PATH="gs://$BUCKET/test.json"

bq rm -f "$TABLE_QNAME"
bq ls -q "$TABLE_QNAME" || true

gsutil cat $GS_PATH | jq

gsutil cp test.json "$GS_PATH"
gsutil ls -lh "$GS_PATH"

bq load --source_format NEWLINE_DELIMITED_JSON \
  --autodetect \
  --clustering_fields string \
  --time_partitioning_field timestamp \
  --time_partitioning_type DAY \
  "$TABLE_QNAME" \
  "$GS_PATH"

bq show "$TABLE_QNAME"
bq query --nouse_legacy_sql "select * from $TABLE_QNAME"

and the corresponding output

~/tmp $ ./load.sh 
+ DATASET=drnick
+ TABLE=test_autodetect
+ TABLE_QNAME=drnick.test_autodetect
+ BUCKET=dev-42network
+ GS_PATH=gs://dev-42network/test.json
+ bq rm -f drnick.test_autodetect
+ bq ls -q drnick.test_autodetect
Invalid identifier "drnick.test_autodetect" for ls, cannot call list on object of type TableReference
+ true
+ gsutil cat gs://dev-42network/test.json
+ jq
{
  "string": "test",
  "timestamp": "2024-10-03T09:59:03.423570Z"
}
{
  "string": "test2",
  "timestamp": "2024-10-04T09:59:03.423570Z"
}
+ gsutil cp test.json gs://dev-42network/test.json
Copying file://test.json [Content-Type=application/json]...
/ [1 files][  125.0 B/  125.0 B]                                                
Operation completed over 1 objects/125.0 B.                                      
+ gsutil ls -lh gs://dev-42network/test.json
     125 B  2024-08-27T13:47:53Z  gs://dev-42network/test.json
TOTAL: 1 objects, 125 bytes (125 B)
+ bq load --source_format NEWLINE_DELIMITED_JSON --autodetect --clustering_fields string --time_partitioning_field timestamp --time_partitioning_type DAY drnick.test_autodetect gs://dev-42network/test.json
Waiting on bqjob_r6e9c2292cde7c606_000001919417d6a4_1 ... (1s) Current status: DONE   
+ bq show drnick.test_autodetect
Table dev-42network:drnick.test_autodetect

   Last modified            Schema            Total Rows   Total Bytes   Expiration     Time Partitioning      Clustered Fields   Total Logical Bytes   Total Physical Bytes   Labels  
 ----------------- ------------------------- ------------ ------------- ------------ ------------------------ ------------------ --------------------- ---------------------- -------- 
  27 Aug 15:47:58   |- timestamp: timestamp   2            29                         DAY (field: timestamp)   string             29                                                   
                    |- string: string                                                                                                                                                  

+ bq query --nouse_legacy_sql 'select * from drnick.test_autodetect'
+---------------------+--------+
|      timestamp      | string |
+---------------------+--------+
| 2024-10-04 09:59:03 | test2  |
| 2024-10-03 09:59:03 | test   |
+---------------------+--------+

@rudolfix
Copy link
Collaborator

@neuromantik33 OK so this is doable. python api typically has parity with cli...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community This issue came from slack community workspace enhancement New feature or request
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.

5 participants
@rudolfix @neuromantik33 @Pipboyguy and others