Skip to content

Commit

Permalink
Merge pull request wintersrd#73 from mjsqu/fix/column_ordering
Browse files Browse the repository at this point in the history
fix: Schema is not retaining the correct column order
  • Loading branch information
mjsqu authored Jul 22, 2024
2 parents 5e65bd9 + f46f76a commit e4f90bd
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 28 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 2.3.0
current_version = 2.3.1
parse = (?P<major>\d+)
\.(?P<minor>\d+)
\.(?P<patch>\d+)
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# tap-mssql 2.3.1 2024-07-22

* Bug Fix. Issue #62 - change to column selection using lists instead of sets to preserve column ordering

# tap-mssql 2.3.0 2024-04-18

* Bug Fix. Change pendulum DateTime type to datetime.datetime as pymssql 2.3.0 is no longer compatible with query parameters as pendulum DateTime (https://github.com/pymssql/pymssql/issues/889)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "tap-mssql"
version = "2.3.0"
version = "2.3.1"
description = "A pipelinewise compatible tap for connecting Microsoft SQL Server"
authors = ["Rob Winters <[email protected]>"]
license = "GNU Affero"
Expand Down
63 changes: 39 additions & 24 deletions tap_mssql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,48 +314,63 @@ def do_discover(mssql_conn, config):
discover_catalog(mssql_conn, config).dump()


def desired_columns(selected, table_schema):
def desired_columns(selected : list, table_schema):
"""Return the set of column names we need to include in the SELECT.
selected - set of column names marked as selected in the input catalog
table_schema - the most recently discovered Schema for the table
"""
all_columns = set()
available = set()
automatic = set()
unsupported = set()

for column, column_schema in table_schema.properties.items():
all_columns.add(column)
inclusion = column_schema.inclusion
if inclusion == "automatic":
automatic.add(column)
elif inclusion == "available":
available.add(column)
elif inclusion == "unsupported":
unsupported.add(column)
else:
raise Exception("Unknown inclusion " + inclusion)
all_columns = [column for column in table_schema.properties.keys()]

available = [
column for column, column_schema
in table_schema.properties.items()
if column_schema.inclusion == 'available'
]

automatic = [
column for column, column_schema
in table_schema.properties.items()
if column_schema.inclusion == 'automatic'
]

unsupported = [
column for column, column_schema
in table_schema.properties.items()
if column_schema.inclusion == 'unsupported'
]

unknown = [
(column,column_schema.inclusion)
for column, column_schema
in table_schema.properties.items()
if column_schema.inclusion not in ['available', 'automatic', 'unsupported']
]

if unknown:
raise Exception(f"Unknown inclusions: {unknown}")

selected_but_unsupported = selected.intersection(unsupported)
selected_but_unsupported = [c for c in selected if c in unsupported]
if selected_but_unsupported:
LOGGER.warning(
"Columns %s were selected but are not supported. Skipping them.",
selected_but_unsupported,
)

selected_but_nonexistent = selected.difference(all_columns)
selected_but_nonexistent = [c for c in selected if c not in all_columns]
if selected_but_nonexistent:
LOGGER.warning("Columns %s were selected but do not exist.", selected_but_nonexistent)

not_selected_but_automatic = automatic.difference(selected)
not_selected_but_automatic = [c for c in automatic if c not in selected]
if not_selected_but_automatic:
LOGGER.warning(
"Columns %s are primary keys but were not selected. Adding them.",
not_selected_but_automatic,
)

return selected.intersection(available).union(automatic)
desired = [c for c in all_columns if (c in available and c in selected) or c in automatic]

return list(dict.fromkeys(desired))


def is_valid_currently_syncing_stream(selected_stream, state):
Expand Down Expand Up @@ -405,11 +420,11 @@ def resolve_catalog(discovered_catalog, streams_to_sync):
)
continue

selected = {
selected = [
k
for k, v in discovered_table.schema.properties.items()
for k in discovered_table.schema.properties.keys()
if common.property_is_selected(catalog_entry, k) or k == replication_key
}
]

# These are the columns we need to select
columns = desired_columns(selected, discovered_table.schema)
Expand Down
19 changes: 17 additions & 2 deletions tests/test_tap_mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,14 @@ def test_pk(self):

class TestSelectsAppropriateColumns(unittest.TestCase):
def runTest(self):
selected_cols = set(["a", "b", "d"])
selected_cols = ["a", "a", "a1", "a2", "b", "d"]
table_schema = Schema(
type="object",
properties={
"a": Schema(None, inclusion="available"),
"a1": Schema(None, inclusion="available"),
"a2": Schema(None, inclusion="available"),
"a3": Schema(None, inclusion="available"),
"b": Schema(None, inclusion="unsupported"),
"c": Schema(None, inclusion="automatic"),
},
Expand All @@ -277,9 +280,21 @@ def runTest(self):
got_cols = tap_mssql.desired_columns(selected_cols, table_schema)

self.assertEqual(
got_cols, set(["a", "c"]), "Keep automatic as well as selected, available columns."
got_cols, ["a", "a1", "a2", "c"], "Keep automatic as well as selected, available columns. Ordered correctly."
)

class TestInvalidInclusion(unittest.TestCase):
def runTest(self):
selected_cols = ["a", "e"]
table_schema = Schema(
type="object",
properties={
"a": Schema(None, inclusion="available"),
"e": Schema(None, inclusion="invalid"),
},
)

self.assertRaises(Exception, tap_mssql.desired_columns, selected_cols, table_schema)

class TestSchemaMessages(unittest.TestCase):
def runTest(self):
Expand Down

0 comments on commit e4f90bd

Please sign in to comment.