diff --git a/tap_mssql/__init__.py b/tap_mssql/__init__.py index c972c43..1ad3c0e 100644 --- a/tap_mssql/__init__.py +++ b/tap_mssql/__init__.py @@ -66,7 +66,7 @@ DECIMAL_TYPES = set(["decimal", "number", "money", "smallmoney", "numeric"]) -DATETIME_TYPES = set(["datetime2", "datetime", "datetimeoffset", "timestamp", "smalldatetime"]) +DATETIME_TYPES = set(["datetime2", "datetime", "datetimeoffset", "smalldatetime"]) DATE_TYPES = set(["date"]) @@ -101,6 +101,10 @@ def schema_for_column(c, config): if data_type == "bit": result.type = ["null", "boolean"] + + elif data_type in ["timestamp", "rowversion"]: + result.type = ["null", "string"] + result.format = "rowversion" elif data_type in BYTES_FOR_INTEGER_TYPE: result.type = ["null", "integer"] diff --git a/tap_mssql/sync_strategies/incremental.py b/tap_mssql/sync_strategies/incremental.py index e1ed41d..8cedf4b 100755 --- a/tap_mssql/sync_strategies/incremental.py +++ b/tap_mssql/sync_strategies/incremental.py @@ -57,11 +57,19 @@ def sync_table(mssql_conn, config, catalog_entry, state, columns): replication_key_value = datetime.fromtimestamp( pendulum.parse(replication_key_value).timestamp() ) + # Handle timestamp incremental (timestamp) + if catalog_entry.schema.properties[replication_key_metadata].format == 'rowversion': + select_sql += """ WHERE CAST("{}" AS BIGINT) >= + convert(bigint, convert (varbinary(8), '0x{}', 1)) + ORDER BY "{}" ASC""".format( + replication_key_metadata, replication_key_value, replication_key_metadata + ) + else: + select_sql += ' WHERE "{}" >= %(replication_key_value)s ORDER BY "{}" ASC'.format( + replication_key_metadata, replication_key_metadata + ) - select_sql += ' WHERE "{}" >= %(replication_key_value)s ORDER BY "{}" ASC'.format( - replication_key_metadata, replication_key_metadata - ) params["replication_key_value"] = replication_key_value elif replication_key_metadata is not None: diff --git a/tests/test_tap_mssql.py b/tests/test_tap_mssql.py index e8d1154..ffbcf86 100755 --- a/tests/test_tap_mssql.py +++ b/tests/test_tap_mssql.py @@ -561,15 +561,15 @@ def test_with_no_state(self): (message_types, versions) = message_types_and_versions(SINGER_MESSAGES) - self.assertEqual( - [ - "ActivateVersionMessage", - "RecordMessage", - ], - sorted(list(set(message_types))), - ) + self.assertTrue(isinstance(versions[0], int)) self.assertEqual(versions[0], versions[1]) + record_messages = [message for message in SINGER_MESSAGES if isinstance(message,singer.RecordMessage)] + incremental_record_messages = [m for m in record_messages if m.stream == 'dbo-incremental'] + integer_incremental_record_messages = [m for m in record_messages if m.stream == 'dbo-integer_incremental'] + + self.assertEqual(len(incremental_record_messages),3) + self.assertEqual(len(integer_incremental_record_messages),3) def test_with_state(self): state = { @@ -602,7 +602,14 @@ def test_with_state(self): ) self.assertTrue(isinstance(versions[0], int)) self.assertEqual(versions[0], versions[1]) - self.assertEqual(versions[1], 12345) + + # Based on state values provided check the number of record messages emitted + record_messages = [message for message in SINGER_MESSAGES if isinstance(message,singer.RecordMessage)] + incremental_record_messages = [m for m in record_messages if m.stream == 'dbo-incremental'] + integer_incremental_record_messages = [m for m in record_messages if m.stream == 'dbo-integer_incremental'] + + self.assertEqual(len(incremental_record_messages),2) + self.assertEqual(len(integer_incremental_record_messages),1) class TestViews(unittest.TestCase): @@ -650,6 +657,76 @@ def test_do_not_discover_key_properties_for_view(self): self.assertEqual(primary_keys, {"a_table": ["id"], "a_view": []}) +class TestTimestampIncrementalReplication(unittest.TestCase): + def setUp(self): + self.conn = test_utils.get_test_connection() + + with connect_with_backoff(self.conn) as open_conn: + with open_conn.cursor() as cursor: + try: + cursor.execute("drop table incremental") + except: + pass + cursor.execute("CREATE TABLE incremental (val int, updated timestamp)") + cursor.execute("INSERT INTO incremental (val) VALUES (1)") #00000000000007d1 + cursor.execute("INSERT INTO incremental (val) VALUES (2)") #00000000000007d2 + cursor.execute("INSERT INTO incremental (val) VALUES (3)") #00000000000007d3 + + self.catalog = test_utils.discover_catalog(self.conn, {}) + + for stream in self.catalog.streams: + stream.metadata = [ + { + "breadcrumb": (), + "metadata": { + "selected": True, + "table-key-properties": [], + "database-name": "dbo", + }, + }, + {"breadcrumb": ("properties", "val"), "metadata": {"selected": True}}, + ] + + stream.stream = stream.table + test_utils.set_replication_method_and_key(stream, "INCREMENTAL", "updated") + + def test_with_no_state(self): + state = {} + + global SINGER_MESSAGES + SINGER_MESSAGES.clear() + + tap_mssql.do_sync(self.conn, test_utils.get_db_config(), self.catalog, state) + + (message_types, versions) = message_types_and_versions(SINGER_MESSAGES) + + record_messages = [message for message in SINGER_MESSAGES if isinstance(message,singer.RecordMessage)] + + self.assertEqual(len(record_messages),3) + + + def test_with_state(self): + state = { + "bookmarks": { + "dbo-incremental": { + "version": 1, + "replication_key_value": '00000000000007d2', + "replication_key": "updated", + }, + } + } + + global SINGER_MESSAGES + SINGER_MESSAGES.clear() + tap_mssql.do_sync(self.conn, test_utils.get_db_config(), self.catalog, state) + + (message_types, versions) = message_types_and_versions(SINGER_MESSAGES) + + # Given the state value supplied, there should only be two RECORD messages + record_messages = [message for message in SINGER_MESSAGES if isinstance(message,singer.RecordMessage)] + + self.assertEqual(len(record_messages),2) + if __name__ == "__main__": # test1 = TestBinlogReplication()