Skip to content

Commit

Permalink
fix: preserve physical row order during deduplication
Browse files Browse the repository at this point in the history
  • Loading branch information
emilklindt committed Oct 1, 2024
1 parent 9d1d0b0 commit 226c6f2
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions target_bigquery/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ def worker_cls_factory(

def merge_table(self, bigquery_client:bigquery.Client) -> None:
target = self.merge_target.as_table()
date_columns = ["_sdc_extracted_at", "_sdc_received_at"]
ordering_columns = ["_sdc_extracted_at", "_sdc_received_at"]
tmp, ctas_tmp = None, "SELECT 1 AS _no_op"
if self._is_dedupe_before_upsert_candidate():
# We can't use MERGE with a non-unique key, so we need to dedupe the temp table into
Expand All @@ -525,7 +525,7 @@ def merge_table(self, bigquery_client:bigquery.Client) -> None:
dedupe_query = (
f"SELECT * FROM {self.table.get_escaped_name()} "
f"QUALIFY ROW_NUMBER() OVER (PARTITION BY {', '.join(f'`{p}`' for p in self.key_properties)} "
f"ORDER BY COALESCE({', '.join(date_columns)}) DESC) = 1"
f"ORDER BY {', '.join(f'{c} DESC' for c in ordering_columns)}) = 1"
)
ctas_tmp = f"CREATE OR REPLACE TEMP TABLE `{tmp}` AS {dedupe_query}"
merge_clause = (
Expand Down

0 comments on commit 226c6f2

Please sign in to comment.