Skip to content

Commit

Permalink
Silence mypy warnings
Browse files Browse the repository at this point in the history
Signed-off-by: Marcel Coetzee <[email protected]>
  • Loading branch information
Pipboyguy committed Jun 6, 2024
1 parent 827ebd3 commit 71e3579
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 21 deletions.
4 changes: 2 additions & 2 deletions dlt/common/libs/deltalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dlt.common.storages import FilesystemConfiguration

try:
from deltalake import write_deltalake
from deltalake import write_deltalake # type: ignore[import-not-found]
except ModuleNotFoundError:
raise MissingDependencyException(
"dlt deltalake helpers",
Expand Down Expand Up @@ -61,7 +61,7 @@ def write_delta_table(
# throws warning for `s3` protocol: https://github.com/delta-io/delta-rs/issues/2460
# TODO: upgrade `deltalake` lib after https://github.com/delta-io/delta-rs/pull/2500
# is released
write_deltalake( # type: ignore[call-overload]
write_deltalake(
table_or_uri=path,
data=ensure_delta_compatible_arrow_table(table),
mode=get_delta_write_mode(write_disposition),
Expand Down
33 changes: 18 additions & 15 deletions docs/examples/postgres_to_postgres/postgres_to_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,17 @@ def pg_resource_chunked(
order_date: str,
load_type: str = "merge",
columns: str = "*",
credentials: ConnectionStringCredentials = dlt.secrets[
"sources.postgres.credentials"
],
credentials: ConnectionStringCredentials = dlt.secrets["sources.postgres.credentials"],
):
print(
f"dlt.resource write_disposition: `{load_type}` -- ",
f"connection string: postgresql://{credentials.username}:*****@{credentials.host}:{credentials.host}/{credentials.database}",
"connection string:"
f" postgresql://{credentials.username}:*****@{credentials.host}:{credentials.host}/{credentials.database}",
)

query = f"SELECT {columns} FROM {schema_name}.{table_name} ORDER BY {order_date}" # Needed to have an idempotent query
query = ( # Needed to have an idempotent query
f"SELECT {columns} FROM {schema_name}.{table_name} ORDER BY {order_date}"
)

source = dlt.resource( # type: ignore
name=table_name,
Expand Down Expand Up @@ -133,9 +134,7 @@ def table_desc(table_name, pk, schema_name, order_date, columns="*"):

if __name__ == "__main__":
# Input Handling
parser = argparse.ArgumentParser(
description="Run specific functions in the script."
)
parser = argparse.ArgumentParser(description="Run specific functions in the script.")
parser.add_argument("--replace", action="store_true", help="Run initial load")
parser.add_argument("--merge", action="store_true", help="Run delta load")
args = parser.parse_args()
Expand Down Expand Up @@ -233,20 +232,26 @@ def table_desc(table_name, pk, schema_name, order_date, columns="*"):
).fetchone()[0]
print(f"timestamped_schema: {timestamped_schema}")

target_credentials = ConnectionStringCredentials(dlt.secrets["destination.postgres.credentials"])
target_credentials = ConnectionStringCredentials(
dlt.secrets["destination.postgres.credentials"]
)
# connect to destination (timestamped schema)
conn.sql(
f"ATTACH 'dbname={target_credentials.database} user={target_credentials.username} password={target_credentials.password} host={target_credentials.host} port={target_credentials.port}' AS pg_db (TYPE postgres);"
"ATTACH"
f" 'dbname={target_credentials.database} user={target_credentials.username} password={target_credentials.password} host={target_credentials.host} port={target_credentials.port}'"
" AS pg_db (TYPE postgres);"
)
conn.sql(f"CREATE SCHEMA IF NOT EXISTS pg_db.{timestamped_schema};")

for table in tables:
print(
f"LOAD DuckDB -> Postgres: table: {timestamped_schema}.{table['table_name']} TO Postgres {timestamped_schema}.{table['table_name']}"
f"LOAD DuckDB -> Postgres: table: {timestamped_schema}.{table['table_name']} TO"
f" Postgres {timestamped_schema}.{table['table_name']}"
)

conn.sql(
f"CREATE OR REPLACE TABLE pg_db.{timestamped_schema}.{table['table_name']} AS SELECT * FROM {timestamped_schema}.{table['table_name']};"
f"CREATE OR REPLACE TABLE pg_db.{timestamped_schema}.{table['table_name']} AS"
f" SELECT * FROM {timestamped_schema}.{table['table_name']};"
)
conn.sql(
f"SELECT count(*) as count FROM pg_db.{timestamped_schema}.{table['table_name']};"
Expand All @@ -262,9 +267,7 @@ def table_desc(table_name, pk, schema_name, order_date, columns="*"):
assert int(rows) == 9

# 5. Cleanup and rename Schema
print(
"##################################### RENAME Schema and CLEANUP ########"
)
print("##################################### RENAME Schema and CLEANUP ########")
try:
con_hd = psycopg2.connect(
dbname=target_credentials.database,
Expand Down
2 changes: 1 addition & 1 deletion tests/libs/test_deltalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Iterator, Tuple, cast

import pytest
from deltalake import DeltaTable
from deltalake import DeltaTable # type: ignore[import-not-found]

import dlt
from dlt.common.libs.pyarrow import pyarrow as pa
Expand Down
4 changes: 2 additions & 2 deletions tests/load/filesystem/test_object_store_rs_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from typing import Any, Dict, cast

import pytest
from deltalake import DeltaTable
from deltalake.exceptions import TableNotFoundError
from deltalake import DeltaTable # type: ignore[import-not-found]
from deltalake.exceptions import TableNotFoundError # type: ignore[import-not-found]

import dlt
from dlt.common.typing import TSecretStrValue
Expand Down
2 changes: 1 addition & 1 deletion tests/pipeline/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def _load_file(client: FSClientBase, filepath) -> List[Dict[str, Any]]:
# Load table dicts
#
def _get_delta_table(client: FilesystemClient, table_name: str) -> "DeltaTable": # type: ignore[name-defined] # noqa: F821
from deltalake import DeltaTable
from deltalake import DeltaTable # type: ignore[import-not-found]
from dlt.common.libs.deltalake import _deltalake_storage_options

table_dir = client.get_table_dir(table_name)
Expand Down

0 comments on commit 71e3579

Please sign in to comment.