Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
AstrakhantsevaAA committed Jun 3, 2024
1 parent 4d216d4 commit db95490
Showing 1 changed file with 19 additions and 9 deletions.
28 changes: 19 additions & 9 deletions docs/examples/postgres_to_postgres/postgres_to_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import argparse
import os
from datetime import datetime
from typing import Literal, List
from typing import List

import connectorx as cx
import duckdb
Expand Down Expand Up @@ -85,7 +85,7 @@ def pg_resource_chunked(
primary_key: List[str],
schema_name: str,
order_date: str,
load_type: Literal["skip", "append", "replace", "merge"] = "merge",
load_type: str = "merge", # type: ignore
columns: str = "*",
credentials: ConnectionStringCredentials = dlt.secrets[
"sources.postgres.credentials"
Expand Down Expand Up @@ -145,12 +145,8 @@ def table_desc(table_name, pk, schema_name, order_date, columns="*"):
table_desc("table_2", ["pk"], source_schema_name, "updated_at"),
]

if args.replace:
load_type = "replace"
else:
# default is delta load
load_type = "merge"

# default is initial loading (replace)
load_type = "merge" if args.merge else "replace"
print(f"LOAD-TYPE: {load_type}")

resources = []
Expand All @@ -161,7 +157,7 @@ def table_desc(table_name, pk, schema_name, order_date, columns="*"):
table["pk"],
table["schema_name"],
table["order_date"],
load_type=load_type, # ignore: type
load_type=load_type,
columns=table["columns"],
)
)
Expand Down Expand Up @@ -209,6 +205,14 @@ def table_desc(table_name, pk, schema_name, order_date, columns="*"):
print(load_info)
print(f"--Time elapsed: {datetime.now() - startTime}")

# check that stuff was loaded
row_counts = pipeline.last_trace.last_normalize_info.row_counts
assert row_counts["table_1"] == 9
assert row_counts["table_2"] == 9

# make sure nothing failed
load_info.raise_on_failed_jobs()

if load_type == "replace":
# 4. Load DuckDB local database into Postgres
print("##################################### START DUCKDB LOAD ########")
Expand Down Expand Up @@ -248,6 +252,12 @@ def table_desc(table_name, pk, schema_name, order_date, columns="*"):
print(f"--Time elapsed: {datetime.now() - startTime}")
print("##################################### FINISHED ########")

# check that stuff was loaded
row_counts = conn.sql(
f"SELECT count(*) as count FROM pg_db.{timestamped_schema}.{table['table_name']};"
).fetchone()[0]
assert int(row_counts) == 9

# 5. Cleanup and rename Schema
print(
"##################################### RENAME Schema and CLEANUP ########"
Expand Down

0 comments on commit db95490

Please sign in to comment.