From aa6789154dbf16ca5366d0e729755e4eb1f7b225 Mon Sep 17 00:00:00 2001 From: Rushikesh Todkar <98420315+RushiT0122@users.noreply.github.com> Date: Tue, 23 Jan 2024 16:40:53 +0530 Subject: [PATCH 1/4] Tdl 24590 fix contacts to company (#250) * add batch processing for contacts_by_company stream * add interrupted sync support to contacts_to_company * update integration tests * bump version 2.13.1 --------- Co-authored-by: RushiT0122 --- CHANGELOG.md | 3 + setup.py | 2 +- tap_hubspot/__init__.py | 71 ++++++++++++----- tests/test_hubspot_all_fields.py | 6 ++ tests/test_hubspot_interrupted_sync_offset.py | 79 +++++++++++++++---- 5 files changed, 124 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 952a01f1..d9ed39a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 2.13.1 + * Optimise contacts_by_company implementation [#250](https://github.com/singer-io/tap-hubspot/pull/250) + ## 2.13.0 * HubSpot Custom CRM Objects Support [#242](https://github.com/singer-io/tap-hubspot/pull/242) diff --git a/setup.py b/setup.py index f99dbc6a..849f20f2 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup setup(name='tap-hubspot', - version='2.13.0', + version='2.13.1', description='Singer.io tap for extracting data from the HubSpot API', author='Stitch', url='http://singer.io', diff --git a/tap_hubspot/__init__.py b/tap_hubspot/__init__.py index 218d1f50..37118b7a 100644 --- a/tap_hubspot/__init__.py +++ b/tap_hubspot/__init__.py @@ -6,7 +6,7 @@ import re import sys import json -# pylint: disable=import-error +# pylint: disable=import-error,too-many-statements import attr import backoff import requests @@ -76,7 +76,7 @@ class StateFields: "companies_all": "/companies/v2/companies/paged", "companies_recent": "/companies/v2/companies/recent/modified", "companies_detail": "/companies/v2/companies/{company_id}", - "contacts_by_company": "/companies/v2/companies/{company_id}/vids", + "contacts_by_company_v3": "/crm/v3/associations/company/contact/batch/read", "deals_properties": "/properties/v1/deals/properties", "deals_all": "/deals/v1/deal/paged", @@ -568,26 +568,24 @@ def use_recent_companies_endpoint(response): default_contacts_by_company_params = {'count' : 100} # NB> to do: support stream aliasing and field selection -def _sync_contacts_by_company(STATE, ctx, company_id): +def _sync_contacts_by_company_batch_read(STATE, ctx, company_ids): schema = load_schema(CONTACTS_BY_COMPANY) catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE)) mdata = metadata.to_map(catalog.get('metadata')) - url = get_url("contacts_by_company", company_id=company_id) - path = 'vids' + url = get_url("contacts_by_company_v3") with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee: with metrics.record_counter(CONTACTS_BY_COMPANY) as counter: - data = request(url, default_contacts_by_company_params).json() - - if data.get(path) is None: - raise RuntimeError("Unexpected API response: {} not in {}".format(path, data.keys())) - - for row in data[path]: - counter.increment() - record = {'company-id' : company_id, - 'contact-id' : row} - record = bumble_bee.transform(lift_properties_and_versions(record), schema, mdata) - singer.write_record("contacts_by_company", record, time_extracted=utils.now()) - + body = {'inputs': [{'id': company_id} for company_id in company_ids]} + contacts_to_company_rows = post_search_endpoint(url, body).json() + for row in contacts_to_company_rows['results']: + for contact in row['to']: + counter.increment() + record = {'company-id' : row['from']['id'], + 'contact-id' : contact['id']} + record = bumble_bee.transform(lift_properties_and_versions(record), schema, mdata) + singer.write_record("contacts_by_company", record, time_extracted=utils.now()) + STATE = singer.set_offset(STATE, "contacts_by_company", 'offset', company_ids[-1]) + singer.write_state(STATE) return STATE default_company_params = { @@ -620,8 +618,26 @@ def sync_companies(STATE, ctx): max_bk_value = start if CONTACTS_BY_COMPANY in ctx.selected_stream_ids: contacts_by_company_schema = load_schema(CONTACTS_BY_COMPANY) - singer.write_schema("contacts_by_company", contacts_by_company_schema, ["company-id", "contact-id"]) + singer.write_schema('contacts_by_company', contacts_by_company_schema, ["company-id", "contact-id"]) + + # This code handles the interrutped sync. When sync is interrupted, + # last batch of `contacts_by_company` extraction may get interrupted. + # So before ressuming, we should check between `companies` and `contacts_by_company` + # whose offset is lagging behind and set that as an offset value for `companies`. + # Note, few of the records may get duplicated. + if singer.get_offset(STATE, 'contacts_by_company', {}).get('offset'): + companies_offset = singer.get_offset(STATE, 'companies', {}).get('offset') + contacts_by_company_offset = singer.get_offset(STATE, 'contacts_by_company').get('offset') + if companies_offset: + offset = min(companies_offset, contacts_by_company_offset) + else: + offset = contacts_by_company_offset + + STATE = singer.set_offset(STATE, 'companies', 'offset', offset) + singer.write_state(STATE) + # This list collects the recently modified company ids to extract `contacts_by_company` records in batch + company_ids = [] with bumble_bee: for row in gen_request(STATE, 'companies', url, default_company_params, 'companies', 'has-more', ['offset'], ['offset']): row_properties = row['properties'] @@ -642,8 +658,21 @@ def sync_companies(STATE, ctx): record = request(get_url("companies_detail", company_id=row['companyId'])).json() record = bumble_bee.transform(lift_properties_and_versions(record), schema, mdata) singer.write_record("companies", record, catalog.get('stream_alias'), time_extracted=utils.now()) - if CONTACTS_BY_COMPANY in ctx.selected_stream_ids: - STATE = _sync_contacts_by_company(STATE, ctx, record['companyId']) + + if CONTACTS_BY_COMPANY in ctx.selected_stream_ids: + # Collect the recently modified company id + if not modified_time or modified_time >= start: + company_ids.append(row['companyId']) + + # Once batch size reaches set limit, extract the `contacts_by_company` for company ids collected + if len(company_ids) >= default_company_params['limit']: + STATE = _sync_contacts_by_company_batch_read(STATE, ctx, company_ids) + company_ids = [] # reset the list + + # Extract the records for last remaining company ids + if CONTACTS_BY_COMPANY in ctx.selected_stream_ids: + STATE = _sync_contacts_by_company_batch_read(STATE, ctx, company_ids) + STATE = singer.clear_offset(STATE, "contacts_by_company") # Don't bookmark past the start of this sync to account for updated records during the sync. new_bookmark = min(max_bk_value, current_sync_start) @@ -1417,7 +1446,7 @@ def discover_schemas(): # Load the contacts_by_company schema LOGGER.info('Loading schema for contacts_by_company') - contacts_by_company = Stream('contacts_by_company', _sync_contacts_by_company, ['company-id', 'contact-id'], None, 'FULL_TABLE') + contacts_by_company = Stream('contacts_by_company', _sync_contacts_by_company_batch_read, ['company-id', 'contact-id'], None, 'FULL_TABLE') schema, mdata = load_discovered_schema(contacts_by_company) result['streams'].append({'stream': CONTACTS_BY_COMPANY, diff --git a/tests/test_hubspot_all_fields.py b/tests/test_hubspot_all_fields.py index 53e955c4..abd1dc07 100644 --- a/tests/test_hubspot_all_fields.py +++ b/tests/test_hubspot_all_fields.py @@ -157,6 +157,12 @@ def get_matching_actual_record_by_pk(expected_primary_key_dict, actual_records): 'property_hs_analytics_latest_source_data_2', 'property_hs_analytics_latest_source_data_2_contact', 'property_hs_deal_score', + 'property_hs_is_active_shared_deal', + 'property_hs_v2_date_entered_appointmentscheduled', + 'property_hs_v2_date_exited_appointmentscheduled', + 'property_hs_v2_latest_time_in_appointmentscheduled', + 'property_hs_v2_cumulative_time_in_appointmentscheduled', + 'property_hs_v2_date_entered_qualifiedtobuy' }, 'subscription_changes':{ 'normalizedEmailId' diff --git a/tests/test_hubspot_interrupted_sync_offset.py b/tests/test_hubspot_interrupted_sync_offset.py index 891362b3..7d17554b 100644 --- a/tests/test_hubspot_interrupted_sync_offset.py +++ b/tests/test_hubspot_interrupted_sync_offset.py @@ -12,6 +12,8 @@ class TestHubspotInterruptedSyncOffsetContactLists(HubspotBaseTest): """Testing interrupted syncs for streams that implement unique bookmarking logic.""" + synced_records = None + @staticmethod def name(): return "tt_hubspot_interrupt_contact_lists" @@ -20,7 +22,6 @@ def streams_to_test(self): """expected streams minus the streams not under test""" untested = { # Streams tested elsewhere - 'companies', # covered in TestHubspotInterruptedSync1 'engagements', # covered in TestHubspotInterruptedSync1 # Feature Request | TDL-16095: [tap-hubspot] All incremental # streams should implement the interruptible sync feature @@ -31,7 +32,6 @@ def streams_to_test(self): 'deal_pipelines', # interruptible does not apply, child of deals 'campaigns', # unable to manually find a partial state with our test data 'email_events', # unable to manually find a partial state with our test data - 'contacts_by_company', # interruptible does not apply, child of 'companies' 'subscription_changes', # BUG_TDL-14938 'tickets' # covered in TestHubspotInterruptedSync1 } @@ -41,8 +41,9 @@ def streams_to_test(self): def stream_to_interrupt(self): return 'contact_lists' - def state_to_inject(self): - return {'offset': {'offset': 250}} + def state_to_inject(self, new_state): + new_state['bookmarks']['contact_lists'] = {'offset': {'offset': 250}} + return new_state def get_properties(self): return { @@ -79,14 +80,15 @@ def test_run(self): # Run sync 1 first_record_count_by_stream = self.run_and_verify_sync(conn_id) - synced_records = runner.get_records_from_target_output() + self.synced_records = runner.get_records_from_target_output() state_1 = menagerie.get_state(conn_id) # Update state to simulate a bookmark stream = self.stream_to_interrupt() new_state = copy.deepcopy(state_1) - new_state['bookmarks'][stream] = self.state_to_inject() + new_state = self.state_to_inject(new_state) new_state['currently_syncing'] = stream + menagerie.set_state(conn_id, new_state) # run second sync @@ -98,10 +100,21 @@ def test_run(self): # since newly created test records may get updated while stream is syncing replication_keys = self.expected_replication_keys() for stream in state_1.get('bookmarks'): - replication_key = list(replication_keys[stream])[0] - self.assertLessEqual(state_1["bookmarks"][stream].get(replication_key), - state_2["bookmarks"][stream].get(replication_key), - msg="First sync bookmark should not be greater than the second bookmark.") + + if self.stream_to_interrupt() == 'companies' and stream == 'companies': + replication_key = list(replication_keys[stream])[0] + self.assertLessEqual(new_state.get('bookmarks')[stream].get('current_sync_start'), + state_2["bookmarks"][stream].get(replication_key), + msg="First sync bookmark should not be greater than the second bookmark.") + elif stream == 'contacts_by_company': + self.assertEquals(state_1["bookmarks"][stream], {"offset": {}}) + self.assertEquals(state_2["bookmarks"][stream], {"offset": {}}) + + else: + replication_key = list(replication_keys[stream])[0] + self.assertLessEqual(state_1["bookmarks"][stream].get(replication_key), + state_2["bookmarks"][stream].get(replication_key), + msg="First sync bookmark should not be greater than the second bookmark.") class TestHubspotInterruptedSyncOffsetContacts(TestHubspotInterruptedSyncOffsetContactLists): @@ -119,8 +132,9 @@ def get_properties(self): def stream_to_interrupt(self): return 'contacts' - def state_to_inject(self): - return {'offset': {'vidOffset': 3502}} + def state_to_inject(self, new_state): + new_state['bookmarks']['contacts'] = {'offset': {'vidOffset': 3502}} + return new_state class TestHubspotInterruptedSyncOffsetDeals(TestHubspotInterruptedSyncOffsetContactLists): """Testing interrupted syncs for streams that implement unique bookmarking logic.""" @@ -136,6 +150,41 @@ def get_properties(self): def stream_to_interrupt(self): return 'deals' - def state_to_inject(self): - return {'property_hs_lastmodifieddate': '2021-10-13T08:32:08.383000Z', - 'offset': {'offset': 3442973342}} + def state_to_inject(self, new_state): + new_state['bookmarks']['deals'] = {'property_hs_lastmodifieddate': '2021-10-13T08:32:08.383000Z', + 'offset': {'offset': 3442973342}} + return new_state + + +class TestHubspotInterruptedSyncOffsetCompanies(TestHubspotInterruptedSyncOffsetContactLists): + """Testing interrupted syncs for streams that implement unique bookmarking logic.""" + @staticmethod + def name(): + return "tt_hubspot_interrupt_companies" + + def get_properties(self): + return { + 'start_date' : '2023-12-31T00:00:00Z' + } + + def stream_to_interrupt(self): + return 'companies' + + def state_to_inject(self, new_state): + companies_records = self.synced_records['companies']['messages'] + contacts_by_company_records = self.synced_records['contacts_by_company']['messages'] + + company_record_index = int(len(companies_records)/2) + contact_record_index = int(3*len(contacts_by_company_records)/4) + + last_modified_value = companies_records[-1]['data'][list(self.expected_replication_keys()['companies'])[0]]['value'] + current_sync_start = companies_records[company_record_index]['data'][list(self.expected_replication_keys()['companies'])[0]]['value'] + offset_1 = companies_records[company_record_index]['data']['companyId'] + offset_2 = contacts_by_company_records[contact_record_index]['data']['company-id'] + + new_state['bookmarks']['companies'] = {'property_hs_lastmodifieddate': last_modified_value, + 'current_sync_start': current_sync_start, + 'offset': {'offset': offset_1}} + new_state['bookmarks']['contacts_by_company'] = {'offset': {'offset': offset_2}} + + return new_state From 3ee399adc85a34e2ce1cc7c40ef6d17d3d98c479 Mon Sep 17 00:00:00 2001 From: Rushikesh Todkar <98420315+RushiT0122@users.noreply.github.com> Date: Wed, 24 Jan 2024 08:36:31 +0530 Subject: [PATCH 2/4] Patch to fix out of index error (#253) * patch to fix out of index error * bump version 2.13.2 --------- Co-authored-by: RushiT0122 --- CHANGELOG.md | 3 +++ setup.py | 2 +- tap_hubspot/__init__.py | 5 +++++ 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d9ed39a8..dd3c698a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 2.13.2 + * Fix out-of-index error [#253](https://github.com/singer-io/tap-hubspot/pull/253) + ## 2.13.1 * Optimise contacts_by_company implementation [#250](https://github.com/singer-io/tap-hubspot/pull/250) diff --git a/setup.py b/setup.py index 849f20f2..c24bb9fa 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup setup(name='tap-hubspot', - version='2.13.1', + version='2.13.2', description='Singer.io tap for extracting data from the HubSpot API', author='Stitch', url='http://singer.io', diff --git a/tap_hubspot/__init__.py b/tap_hubspot/__init__.py index 37118b7a..b8a1a6b6 100644 --- a/tap_hubspot/__init__.py +++ b/tap_hubspot/__init__.py @@ -569,10 +569,15 @@ def use_recent_companies_endpoint(response): # NB> to do: support stream aliasing and field selection def _sync_contacts_by_company_batch_read(STATE, ctx, company_ids): + # Return state as it is if company ids list is empty + if len(company_ids) == 0: + return STATE + schema = load_schema(CONTACTS_BY_COMPANY) catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE)) mdata = metadata.to_map(catalog.get('metadata')) url = get_url("contacts_by_company_v3") + with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee: with metrics.record_counter(CONTACTS_BY_COMPANY) as counter: body = {'inputs': [{'id': company_id} for company_id in company_ids]} From da38d56550145e56319f90e8d9c0f887c9c14751 Mon Sep 17 00:00:00 2001 From: bhuvana-talend <126521351+bhuvana-talend@users.noreply.github.com> Date: Wed, 24 Jan 2024 06:42:47 -0800 Subject: [PATCH 3/4] test to validate the replication of static contact list - Tdl 18267 (#245) * Create Static Lists-Temp checkin * Add verification of replication of static contact list * Implemented PR Review comments * Added a field with bad prefix * Update tests/client.py Co-authored-by: Harrison <38256339+HarrisonMarcRose@users.noreply.github.com> * Changed to string operator * Fixed the test failre in CI by adding a field to missing fields --------- Co-authored-by: Harrison <38256339+HarrisonMarcRose@users.noreply.github.com> --- tests/client.py | 43 +++++++++++++++++++++++++++++--- tests/test_hubspot_all_fields.py | 1 + tests/test_hubspot_bookmarks.py | 14 ++++++++--- 3 files changed, 50 insertions(+), 8 deletions(-) diff --git a/tests/client.py b/tests/client.py index aa63a2a3..8620ac3e 100644 --- a/tests/client.py +++ b/tests/client.py @@ -7,6 +7,7 @@ import requests from base import HubspotBaseTest from tap_tester import LOGGER +import time DEBUG = False BASE_URL = "https://api.hubapi.com" @@ -802,6 +803,13 @@ def create(self, stream, company_ids=[], subscriptions=[], times=1): return self.create_companies() elif stream == 'contact_lists': return self.create_contact_lists() + elif stream == 'static_contact_lists': + staticlist = self.create_contact_lists(dynamic=False) + listId = staticlist[0].get('listId') + records = self.create('contacts') + contact_email = records[0].get('properties').get('email').get('value') + self.add_contact_to_contact_list(listId, contact_email) + return staticlist elif stream == 'contacts_by_company': return self.create_contacts_by_company(company_ids, times=times) elif stream == 'engagements': @@ -813,8 +821,9 @@ def create(self, stream, company_ids=[], subscriptions=[], times=1): elif stream == 'workflows': return self.create_workflows() elif stream == 'contacts': + LOGGER.info("self.record_create_times is %s", self.record_create_times) if stream not in self.record_create_times.keys(): - self.record_create_times[stream]=[] + self.record_create_times['contacts']=[] records = self.create_contacts() return records elif stream == 'deal_pipelines': @@ -939,7 +948,7 @@ def create_companies(self): records = [response] return records - def create_contact_lists(self): + def create_contact_lists(self, dynamic=True): """ HubSpot API https://legacydocs.hubspot.com/docs/methods/lists/create_list @@ -948,15 +957,16 @@ def create_contact_lists(self): using different filters would result in any new fields. """ record_uuid = str(uuid.uuid4()).replace('-', '') + value = f"@hubspot{record_uuid}" url = f"{BASE_URL}/contacts/v1/lists/" data = { "name": f"tweeters{record_uuid}", - "dynamic": True, + "dynamic": dynamic, "filters": [ [{ "operator": "EQ", - "value": f"@hubspot{record_uuid}", + "value": value, "property": "twitterhandle", "type": "string" }] @@ -965,6 +975,31 @@ def create_contact_lists(self): # generate a record response = self.post(url, data) records = [response] + LOGGER.info("dynamic contact list is %s", records) + return records + + def add_contact_to_contact_list(self, list_id, contact_email): + """ + HubSpot API https://legacydocs.hubspot.com/docs/methods/lists/create_list + + NB: This generates a list based on a 'twitterhandle' filter. There are many + different filters, but at the time of implementation it did not seem that + using different filters would result in any new fields. + """ + record_uuid = str(uuid.uuid4()).replace('-', '') + value = f"@hubspot{record_uuid}" + + url = f"{BASE_URL}/contacts/v1/lists/{list_id}/add" + data = { + "emails": [ + contact_email + ] + } + # generate a record + LOGGER.info("Post URL is %s", url) + response = self.post(url, data) + records = [response] + LOGGER.info("updated contact_list is %s", records) return records def create_contacts_by_company(self, company_ids=[], contact_records=[], times=1): diff --git a/tests/test_hubspot_all_fields.py b/tests/test_hubspot_all_fields.py index abd1dc07..f7b0676d 100644 --- a/tests/test_hubspot_all_fields.py +++ b/tests/test_hubspot_all_fields.py @@ -142,6 +142,7 @@ def get_matching_actual_record_by_pk(expected_primary_key_dict, actual_records): 'property_hs_num_associated_deal_splits', 'property_hs_is_active_shared_deal', #https://jira.talendforge.org/browse/TDL-24758 'property_hs_is_deal_split', + 'property_hs_is_active_shared_deal', 'stateChanges', 'property_hs_num_associated_active_deal_registrations', 'property_hs_num_associated_deal_registrations', diff --git a/tests/test_hubspot_bookmarks.py b/tests/test_hubspot_bookmarks.py index fd3d35fd..6c44e66b 100644 --- a/tests/test_hubspot_bookmarks.py +++ b/tests/test_hubspot_bookmarks.py @@ -1,5 +1,5 @@ from datetime import datetime, timedelta -from time import sleep +import time import tap_tester.connections as connections @@ -34,7 +34,7 @@ def streams_to_test(self): return expected_streams.difference({ 'subscription_changes', # BUG_TDL-14938 https://jira.talendforge.org/browse/TDL-14938 - }) + }) def get_properties(self): return { @@ -57,17 +57,23 @@ def create_test_data(self, expected_streams): for stream in expected_streams - {'contacts_by_company'}: if stream == 'contacts': self.times=10 + elif stream == 'contact_lists': + self.times=2 else: self.times =3 - if stream in 'email_events': + if stream == 'email_events': email_records = self.test_client.create(stream, self.times) self.expected_records['email_events'] += email_records else: # create records, one will be updated between syncs + # create one static list and the rest dynamic list for _ in range(self.times): record = self.test_client.create(stream) self.expected_records[stream] += record + if stream == 'contact_lists': + static_list = self.test_client.create('static_contact_lists') + self.expected_records[stream] += static_list if 'contacts_by_company' in expected_streams: # do last company_ids = [record['companyId'] for record in self.expected_records['companies']] @@ -108,6 +114,7 @@ def test_run(self): for stream in expected_streams - {'contacts_by_company'}: record = self.test_client.create(stream) self.expected_records[stream] += record + if 'contacts_by_company' in expected_streams: company_ids = [record['companyId'] for record in self.expected_records['companies'][:-1]] contact_records = self.expected_records['contacts'][-1:] @@ -116,7 +123,6 @@ def test_run(self): ) self.expected_records['contacts_by_company'] += record - # Update 1 record from the test seutp for each stream that has an update endpoint for stream in expected_streams - STREAMS_WITHOUT_UPDATES: primary_key = list(self.expected_primary_keys()[stream])[0] From 14a22719e9e88b5c5659bfeae4b526973700e91c Mon Sep 17 00:00:00 2001 From: JYOTHINARAYANSETTY <49671483+JYOTHINARAYANSETTY@users.noreply.github.com> Date: Wed, 24 Jan 2024 13:53:05 -0500 Subject: [PATCH 4/4] tdl-16123 (#248) * tdl-16123 * remove extra return statement * review changes * correct method names * fix all_fields test --- tests/base_hubspot.py | 20 ++++++- tests/test_hubspot_all_fields.py | 5 +- tests/test_hubspot_pagination.py | 92 ++++++++------------------------ 3 files changed, 45 insertions(+), 72 deletions(-) diff --git a/tests/base_hubspot.py b/tests/base_hubspot.py index 0c8cdceb..4c8ca8e2 100644 --- a/tests/base_hubspot.py +++ b/tests/base_hubspot.py @@ -18,7 +18,7 @@ class HubspotBaseCase(BaseCase): EXTRA_FIELDS = { "contacts": { "versionTimestamp" } } - + def setUp(self): missing_envs = [x for x in [ 'TAP_HUBSPOT_REDIRECT_URI', @@ -140,5 +140,23 @@ def expected_metadata(cls): # DOCS_BUG https://stitchdata.atlassian.net/browse/ BaseCase.REPLICATION_KEYS: {"updatedAt"}, BaseCase.API_LIMIT: 100, BaseCase.OBEYS_START_DATE: True + }, + # below are the custom_objects stream + "cars": { + BaseCase.PRIMARY_KEYS: {"id"}, + BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, + BaseCase.REPLICATION_KEYS: {"updatedAt"}, + BaseCase.API_LIMIT: 100, + BaseCase.EXPECTED_PAGE_SIZE: 100, + BaseCase.OBEYS_START_DATE: True + }, + "co_firsts": { + BaseCase.PRIMARY_KEYS: {"id"}, + BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, + BaseCase.REPLICATION_KEYS: {"updatedAt"}, + BaseCase.API_LIMIT: 100, + BaseCase.EXPECTED_PAGE_SIZE: 100, + BaseCase.OBEYS_START_DATE: True } + } diff --git a/tests/test_hubspot_all_fields.py b/tests/test_hubspot_all_fields.py index f7b0676d..588670af 100644 --- a/tests/test_hubspot_all_fields.py +++ b/tests/test_hubspot_all_fields.py @@ -300,8 +300,9 @@ def test_run(self): # to our test data. We have determined that the filtering of these fields is an expected behavior. # deals workaround for 'property_hs_date_entered_' fields - bad_key_prefixes = {'property_hs_date_entered_', 'property_hs_v2_date_entered_', 'property_hs_date_exited_', - 'property_hs_time_in'} + + bad_key_prefixes = {'property_hs_date_entered_', 'property_hs_date_exited_', + 'property_hs_time_in', 'property_hs_'} bad_keys = set() for key in expected_keys_adjusted: for prefix in bad_key_prefixes: diff --git a/tests/test_hubspot_pagination.py b/tests/test_hubspot_pagination.py index 746e9cea..791ee863 100644 --- a/tests/test_hubspot_pagination.py +++ b/tests/test_hubspot_pagination.py @@ -1,22 +1,37 @@ from datetime import datetime from datetime import timedelta import time - -import tap_tester.connections as connections -import tap_tester.menagerie as menagerie -import tap_tester.runner as runner from tap_tester.logger import LOGGER from client import TestClient -from base import HubspotBaseTest +from tap_tester.base_suite_tests.pagination_test import PaginationTest +from base_hubspot import HubspotBaseCase -class TestHubspotPagination(HubspotBaseTest): +class HubspotPaginationTest(PaginationTest, HubspotBaseCase): @staticmethod def name(): return "tt_hubspot_pagination" + def streams_to_test(self): + """ + # All streams with limits are under test + # """ + streams_with_page_limits = { + stream + for stream, limit in self.expected_page_size().items() + if limit + } + streams_to_test = streams_with_page_limits.difference({ + # updates for contacts_by_company do not get processed quickly or consistently + # via Hubspot API, unable to guarantee page limit is exceeded + 'contacts_by_company', + 'email_events', + 'subscription_changes', # BUG_TDL-14938 https://jira.talendforge.org/browse/TDL-14938 + }) + return streams_to_test + def get_properties(self): return { 'start_date' : datetime.strftime(datetime.today()-timedelta(days=7), self.START_DATE_FORMAT) @@ -31,7 +46,7 @@ def setUp(self): # gather expectations existing_records = dict() - limits = self.expected_page_limits() + limits = self.expected_page_size() streams = self.streams_to_test() # order the creation of test data for streams based on the streams under test @@ -72,65 +87,4 @@ def setUp(self): setup_end = time.perf_counter() LOGGER.info(f"Test Client took about {str(setup_end-setup_start).split('.')[0]} seconds") - - def streams_to_test(self): - """ - All streams with limits are under test - """ - streams_with_page_limits = { - stream - for stream, limit in self.expected_page_limits().items() - if limit - } - streams_to_test = streams_with_page_limits.difference({ - # updates for contacts_by_company do not get processed quickly or consistently - # via Hubspot API, unable to guarantee page limit is exceeded - 'contacts_by_company', - 'email_events', - 'subscription_changes', # BUG_TDL-14938 https://jira.talendforge.org/browse/TDL-14938 - }) - return streams_to_test - - def test_run(self): - # Select only the expected streams tables - expected_streams = self.streams_to_test() - conn_id = connections.ensure_connection(self) - found_catalogs = self.run_and_verify_check_mode(conn_id) - - catalog_entries = [ce for ce in found_catalogs if ce['tap_stream_id'] in expected_streams] - for catalog_entry in catalog_entries: - stream_schema = menagerie.get_annotated_schema(conn_id, catalog_entry['stream_id']) - connections.select_catalog_and_fields_via_metadata( - conn_id, - catalog_entry, - stream_schema - ) - - sync_record_count = self.run_and_verify_sync(conn_id) - sync_records = runner.get_records_from_target_output() - - - # Test by stream - for stream in expected_streams: - with self.subTest(stream=stream): - - record_count = sync_record_count.get(stream, 0) - - sync_messages = sync_records.get(stream, {'messages': []}).get('messages') - - primary_keys = self.expected_primary_keys().get(stream) - - # Verify the sync meets or exceeds the default record count - stream_page_size = self.expected_page_limits()[stream] - self.assertLess(stream_page_size, record_count) - - # Verify we did not duplicate any records across pages - records_pks_set = {tuple([message.get('data').get(primary_key) - for primary_key in primary_keys]) - for message in sync_messages} - records_pks_list = [tuple([message.get('data').get(primary_key) - for primary_key in primary_keys]) - for message in sync_messages] - # records_pks_list = [message.get('data').get(primary_key) for message in sync_messages] - self.assertCountEqual(records_pks_set, records_pks_list, - msg=f"We have duplicate records for {stream}") + super().setUp()