From 71e3579942d2c4b98478ff534e704825ab4a2c18 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Thu, 6 Jun 2024 15:39:14 +0200 Subject: [PATCH] Silence mypy warnings Signed-off-by: Marcel Coetzee --- dlt/common/libs/deltalake.py | 4 +-- .../postgres_to_postgres.py | 33 ++++++++++--------- tests/libs/test_deltalake.py | 2 +- .../test_object_store_rs_credentials.py | 4 +-- tests/pipeline/utils.py | 2 +- 5 files changed, 24 insertions(+), 21 deletions(-) diff --git a/dlt/common/libs/deltalake.py b/dlt/common/libs/deltalake.py index 32847303f8..4e80ed4cad 100644 --- a/dlt/common/libs/deltalake.py +++ b/dlt/common/libs/deltalake.py @@ -9,7 +9,7 @@ from dlt.common.storages import FilesystemConfiguration try: - from deltalake import write_deltalake + from deltalake import write_deltalake # type: ignore[import-not-found] except ModuleNotFoundError: raise MissingDependencyException( "dlt deltalake helpers", @@ -61,7 +61,7 @@ def write_delta_table( # throws warning for `s3` protocol: https://github.com/delta-io/delta-rs/issues/2460 # TODO: upgrade `deltalake` lib after https://github.com/delta-io/delta-rs/pull/2500 # is released - write_deltalake( # type: ignore[call-overload] + write_deltalake( table_or_uri=path, data=ensure_delta_compatible_arrow_table(table), mode=get_delta_write_mode(write_disposition), diff --git a/docs/examples/postgres_to_postgres/postgres_to_postgres.py b/docs/examples/postgres_to_postgres/postgres_to_postgres.py index 85b8aed045..f5327ee236 100644 --- a/docs/examples/postgres_to_postgres/postgres_to_postgres.py +++ b/docs/examples/postgres_to_postgres/postgres_to_postgres.py @@ -91,16 +91,17 @@ def pg_resource_chunked( order_date: str, load_type: str = "merge", columns: str = "*", - credentials: ConnectionStringCredentials = dlt.secrets[ - "sources.postgres.credentials" - ], + credentials: ConnectionStringCredentials = dlt.secrets["sources.postgres.credentials"], ): print( f"dlt.resource write_disposition: `{load_type}` -- ", - f"connection string: postgresql://{credentials.username}:*****@{credentials.host}:{credentials.host}/{credentials.database}", + "connection string:" + f" postgresql://{credentials.username}:*****@{credentials.host}:{credentials.host}/{credentials.database}", ) - query = f"SELECT {columns} FROM {schema_name}.{table_name} ORDER BY {order_date}" # Needed to have an idempotent query + query = ( # Needed to have an idempotent query + f"SELECT {columns} FROM {schema_name}.{table_name} ORDER BY {order_date}" + ) source = dlt.resource( # type: ignore name=table_name, @@ -133,9 +134,7 @@ def table_desc(table_name, pk, schema_name, order_date, columns="*"): if __name__ == "__main__": # Input Handling - parser = argparse.ArgumentParser( - description="Run specific functions in the script." - ) + parser = argparse.ArgumentParser(description="Run specific functions in the script.") parser.add_argument("--replace", action="store_true", help="Run initial load") parser.add_argument("--merge", action="store_true", help="Run delta load") args = parser.parse_args() @@ -233,20 +232,26 @@ def table_desc(table_name, pk, schema_name, order_date, columns="*"): ).fetchone()[0] print(f"timestamped_schema: {timestamped_schema}") - target_credentials = ConnectionStringCredentials(dlt.secrets["destination.postgres.credentials"]) + target_credentials = ConnectionStringCredentials( + dlt.secrets["destination.postgres.credentials"] + ) # connect to destination (timestamped schema) conn.sql( - f"ATTACH 'dbname={target_credentials.database} user={target_credentials.username} password={target_credentials.password} host={target_credentials.host} port={target_credentials.port}' AS pg_db (TYPE postgres);" + "ATTACH" + f" 'dbname={target_credentials.database} user={target_credentials.username} password={target_credentials.password} host={target_credentials.host} port={target_credentials.port}'" + " AS pg_db (TYPE postgres);" ) conn.sql(f"CREATE SCHEMA IF NOT EXISTS pg_db.{timestamped_schema};") for table in tables: print( - f"LOAD DuckDB -> Postgres: table: {timestamped_schema}.{table['table_name']} TO Postgres {timestamped_schema}.{table['table_name']}" + f"LOAD DuckDB -> Postgres: table: {timestamped_schema}.{table['table_name']} TO" + f" Postgres {timestamped_schema}.{table['table_name']}" ) conn.sql( - f"CREATE OR REPLACE TABLE pg_db.{timestamped_schema}.{table['table_name']} AS SELECT * FROM {timestamped_schema}.{table['table_name']};" + f"CREATE OR REPLACE TABLE pg_db.{timestamped_schema}.{table['table_name']} AS" + f" SELECT * FROM {timestamped_schema}.{table['table_name']};" ) conn.sql( f"SELECT count(*) as count FROM pg_db.{timestamped_schema}.{table['table_name']};" @@ -262,9 +267,7 @@ def table_desc(table_name, pk, schema_name, order_date, columns="*"): assert int(rows) == 9 # 5. Cleanup and rename Schema - print( - "##################################### RENAME Schema and CLEANUP ########" - ) + print("##################################### RENAME Schema and CLEANUP ########") try: con_hd = psycopg2.connect( dbname=target_credentials.database, diff --git a/tests/libs/test_deltalake.py b/tests/libs/test_deltalake.py index d55f788fbe..f364eda0dc 100644 --- a/tests/libs/test_deltalake.py +++ b/tests/libs/test_deltalake.py @@ -2,7 +2,7 @@ from typing import Iterator, Tuple, cast import pytest -from deltalake import DeltaTable +from deltalake import DeltaTable # type: ignore[import-not-found] import dlt from dlt.common.libs.pyarrow import pyarrow as pa diff --git a/tests/load/filesystem/test_object_store_rs_credentials.py b/tests/load/filesystem/test_object_store_rs_credentials.py index 4e43b7c5d8..7e8e353afe 100644 --- a/tests/load/filesystem/test_object_store_rs_credentials.py +++ b/tests/load/filesystem/test_object_store_rs_credentials.py @@ -3,8 +3,8 @@ from typing import Any, Dict, cast import pytest -from deltalake import DeltaTable -from deltalake.exceptions import TableNotFoundError +from deltalake import DeltaTable # type: ignore[import-not-found] +from deltalake.exceptions import TableNotFoundError # type: ignore[import-not-found] import dlt from dlt.common.typing import TSecretStrValue diff --git a/tests/pipeline/utils.py b/tests/pipeline/utils.py index 7affcc5a81..0e5e271265 100644 --- a/tests/pipeline/utils.py +++ b/tests/pipeline/utils.py @@ -155,7 +155,7 @@ def _load_file(client: FSClientBase, filepath) -> List[Dict[str, Any]]: # Load table dicts # def _get_delta_table(client: FilesystemClient, table_name: str) -> "DeltaTable": # type: ignore[name-defined] # noqa: F821 - from deltalake import DeltaTable + from deltalake import DeltaTable # type: ignore[import-not-found] from dlt.common.libs.deltalake import _deltalake_storage_options table_dir = client.get_table_dir(table_name)