From db9549004c67e34a873fe9280c879914b929e589 Mon Sep 17 00:00:00 2001 From: Alena Date: Mon, 3 Jun 2024 11:38:45 +0200 Subject: [PATCH] add tests --- .../postgres_to_postgres.py | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/docs/examples/postgres_to_postgres/postgres_to_postgres.py b/docs/examples/postgres_to_postgres/postgres_to_postgres.py index a89c7b5838..b18e6ec266 100644 --- a/docs/examples/postgres_to_postgres/postgres_to_postgres.py +++ b/docs/examples/postgres_to_postgres/postgres_to_postgres.py @@ -49,7 +49,7 @@ import argparse import os from datetime import datetime -from typing import Literal, List +from typing import List import connectorx as cx import duckdb @@ -85,7 +85,7 @@ def pg_resource_chunked( primary_key: List[str], schema_name: str, order_date: str, - load_type: Literal["skip", "append", "replace", "merge"] = "merge", + load_type: str = "merge", # type: ignore columns: str = "*", credentials: ConnectionStringCredentials = dlt.secrets[ "sources.postgres.credentials" @@ -145,12 +145,8 @@ def table_desc(table_name, pk, schema_name, order_date, columns="*"): table_desc("table_2", ["pk"], source_schema_name, "updated_at"), ] - if args.replace: - load_type = "replace" - else: - # default is delta load - load_type = "merge" - + # default is initial loading (replace) + load_type = "merge" if args.merge else "replace" print(f"LOAD-TYPE: {load_type}") resources = [] @@ -161,7 +157,7 @@ def table_desc(table_name, pk, schema_name, order_date, columns="*"): table["pk"], table["schema_name"], table["order_date"], - load_type=load_type, # ignore: type + load_type=load_type, columns=table["columns"], ) ) @@ -209,6 +205,14 @@ def table_desc(table_name, pk, schema_name, order_date, columns="*"): print(load_info) print(f"--Time elapsed: {datetime.now() - startTime}") + # check that stuff was loaded + row_counts = pipeline.last_trace.last_normalize_info.row_counts + assert row_counts["table_1"] == 9 + assert row_counts["table_2"] == 9 + + # make sure nothing failed + load_info.raise_on_failed_jobs() + if load_type == "replace": # 4. Load DuckDB local database into Postgres print("##################################### START DUCKDB LOAD ########") @@ -248,6 +252,12 @@ def table_desc(table_name, pk, schema_name, order_date, columns="*"): print(f"--Time elapsed: {datetime.now() - startTime}") print("##################################### FINISHED ########") + # check that stuff was loaded + row_counts = conn.sql( + f"SELECT count(*) as count FROM pg_db.{timestamped_schema}.{table['table_name']};" + ).fetchone()[0] + assert int(row_counts) == 9 + # 5. Cleanup and rename Schema print( "##################################### RENAME Schema and CLEANUP ########"