Skip to content

Commit

Permalink
Merge branch 'master' into tdl-24145-2
Browse files Browse the repository at this point in the history
  • Loading branch information
bhuvana-talend authored Jan 25, 2024
2 parents 1cb5bdb + 14a2271 commit 100a505
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 117 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## 2.13.2
* Fix out-of-index error [#253](https://github.com/singer-io/tap-hubspot/pull/253)

## 2.13.1
* Optimise contacts_by_company implementation [#250](https://github.com/singer-io/tap-hubspot/pull/250)

## 2.13.0
* HubSpot Custom CRM Objects Support [#242](https://github.com/singer-io/tap-hubspot/pull/242)

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from setuptools import setup

setup(name='tap-hubspot',
version='2.13.0',
version='2.13.2',
description='Singer.io tap for extracting data from the HubSpot API',
author='Stitch',
url='http://singer.io',
Expand Down
76 changes: 55 additions & 21 deletions tap_hubspot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import re
import sys
import json
# pylint: disable=import-error
# pylint: disable=import-error,too-many-statements
import attr
import backoff
import requests
Expand Down Expand Up @@ -76,7 +76,7 @@ class StateFields:
"companies_all": "/companies/v2/companies/paged",
"companies_recent": "/companies/v2/companies/recent/modified",
"companies_detail": "/companies/v2/companies/{company_id}",
"contacts_by_company": "/companies/v2/companies/{company_id}/vids",
"contacts_by_company_v3": "/crm/v3/associations/company/contact/batch/read",

"deals_properties": "/properties/v1/deals/properties",
"deals_all": "/deals/v1/deal/paged",
Expand Down Expand Up @@ -568,26 +568,29 @@ def use_recent_companies_endpoint(response):
default_contacts_by_company_params = {'count' : 100}

# NB> to do: support stream aliasing and field selection
def _sync_contacts_by_company(STATE, ctx, company_id):
def _sync_contacts_by_company_batch_read(STATE, ctx, company_ids):
# Return state as it is if company ids list is empty
if len(company_ids) == 0:
return STATE

schema = load_schema(CONTACTS_BY_COMPANY)
catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE))
mdata = metadata.to_map(catalog.get('metadata'))
url = get_url("contacts_by_company", company_id=company_id)
path = 'vids'
url = get_url("contacts_by_company_v3")

with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
with metrics.record_counter(CONTACTS_BY_COMPANY) as counter:
data = request(url, default_contacts_by_company_params).json()

if data.get(path) is None:
raise RuntimeError("Unexpected API response: {} not in {}".format(path, data.keys()))

for row in data[path]:
counter.increment()
record = {'company-id' : company_id,
'contact-id' : row}
record = bumble_bee.transform(lift_properties_and_versions(record), schema, mdata)
singer.write_record("contacts_by_company", record, time_extracted=utils.now())

body = {'inputs': [{'id': company_id} for company_id in company_ids]}
contacts_to_company_rows = post_search_endpoint(url, body).json()
for row in contacts_to_company_rows['results']:
for contact in row['to']:
counter.increment()
record = {'company-id' : row['from']['id'],
'contact-id' : contact['id']}
record = bumble_bee.transform(lift_properties_and_versions(record), schema, mdata)
singer.write_record("contacts_by_company", record, time_extracted=utils.now())
STATE = singer.set_offset(STATE, "contacts_by_company", 'offset', company_ids[-1])
singer.write_state(STATE)
return STATE

default_company_params = {
Expand Down Expand Up @@ -620,8 +623,26 @@ def sync_companies(STATE, ctx):
max_bk_value = start
if CONTACTS_BY_COMPANY in ctx.selected_stream_ids:
contacts_by_company_schema = load_schema(CONTACTS_BY_COMPANY)
singer.write_schema("contacts_by_company", contacts_by_company_schema, ["company-id", "contact-id"])
singer.write_schema('contacts_by_company', contacts_by_company_schema, ["company-id", "contact-id"])

# This code handles the interrutped sync. When sync is interrupted,
# last batch of `contacts_by_company` extraction may get interrupted.
# So before ressuming, we should check between `companies` and `contacts_by_company`
# whose offset is lagging behind and set that as an offset value for `companies`.
# Note, few of the records may get duplicated.
if singer.get_offset(STATE, 'contacts_by_company', {}).get('offset'):
companies_offset = singer.get_offset(STATE, 'companies', {}).get('offset')
contacts_by_company_offset = singer.get_offset(STATE, 'contacts_by_company').get('offset')
if companies_offset:
offset = min(companies_offset, contacts_by_company_offset)
else:
offset = contacts_by_company_offset

STATE = singer.set_offset(STATE, 'companies', 'offset', offset)
singer.write_state(STATE)

# This list collects the recently modified company ids to extract `contacts_by_company` records in batch
company_ids = []
with bumble_bee:
for row in gen_request(STATE, 'companies', url, default_company_params, 'companies', 'has-more', ['offset'], ['offset']):
row_properties = row['properties']
Expand All @@ -642,8 +663,21 @@ def sync_companies(STATE, ctx):
record = request(get_url("companies_detail", company_id=row['companyId'])).json()
record = bumble_bee.transform(lift_properties_and_versions(record), schema, mdata)
singer.write_record("companies", record, catalog.get('stream_alias'), time_extracted=utils.now())
if CONTACTS_BY_COMPANY in ctx.selected_stream_ids:
STATE = _sync_contacts_by_company(STATE, ctx, record['companyId'])

if CONTACTS_BY_COMPANY in ctx.selected_stream_ids:
# Collect the recently modified company id
if not modified_time or modified_time >= start:
company_ids.append(row['companyId'])

# Once batch size reaches set limit, extract the `contacts_by_company` for company ids collected
if len(company_ids) >= default_company_params['limit']:
STATE = _sync_contacts_by_company_batch_read(STATE, ctx, company_ids)
company_ids = [] # reset the list

# Extract the records for last remaining company ids
if CONTACTS_BY_COMPANY in ctx.selected_stream_ids:
STATE = _sync_contacts_by_company_batch_read(STATE, ctx, company_ids)
STATE = singer.clear_offset(STATE, "contacts_by_company")

# Don't bookmark past the start of this sync to account for updated records during the sync.
new_bookmark = min(max_bk_value, current_sync_start)
Expand Down Expand Up @@ -1417,7 +1451,7 @@ def discover_schemas():

# Load the contacts_by_company schema
LOGGER.info('Loading schema for contacts_by_company')
contacts_by_company = Stream('contacts_by_company', _sync_contacts_by_company, ['company-id', 'contact-id'], None, 'FULL_TABLE')
contacts_by_company = Stream('contacts_by_company', _sync_contacts_by_company_batch_read, ['company-id', 'contact-id'], None, 'FULL_TABLE')
schema, mdata = load_discovered_schema(contacts_by_company)

result['streams'].append({'stream': CONTACTS_BY_COMPANY,
Expand Down
20 changes: 19 additions & 1 deletion tests/base_hubspot.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class HubspotBaseCase(BaseCase):
EXTRA_FIELDS = {
"contacts": { "versionTimestamp" }
}

def setUp(self):
missing_envs = [x for x in [
'TAP_HUBSPOT_REDIRECT_URI',
Expand Down Expand Up @@ -140,5 +140,23 @@ def expected_metadata(cls): # DOCS_BUG https://stitchdata.atlassian.net/browse/
BaseCase.REPLICATION_KEYS: {"updatedAt"},
BaseCase.API_LIMIT: 100,
BaseCase.OBEYS_START_DATE: True
},
# below are the custom_objects stream
"cars": {
BaseCase.PRIMARY_KEYS: {"id"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"updatedAt"},
BaseCase.API_LIMIT: 100,
BaseCase.EXPECTED_PAGE_SIZE: 100,
BaseCase.OBEYS_START_DATE: True
},
"co_firsts": {
BaseCase.PRIMARY_KEYS: {"id"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"updatedAt"},
BaseCase.API_LIMIT: 100,
BaseCase.EXPECTED_PAGE_SIZE: 100,
BaseCase.OBEYS_START_DATE: True
}

}
42 changes: 38 additions & 4 deletions tests/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,13 @@ def create(self, stream, company_ids=[], subscriptions=[], times=1):
return self.create_companies()
elif stream == 'contact_lists':
return self.create_contact_lists()
elif stream == 'static_contact_lists':
staticlist = self.create_contact_lists(dynamic=False)
listId = staticlist[0].get('listId')
records = self.create('contacts')
contact_email = records[0].get('properties').get('email').get('value')
self.add_contact_to_contact_list(listId, contact_email)
return staticlist
elif stream == 'contacts_by_company':
return self.create_contacts_by_company(company_ids, times=times)
elif stream == 'engagements':
Expand All @@ -816,8 +823,9 @@ def create(self, stream, company_ids=[], subscriptions=[], times=1):
elif stream == 'workflows':
return self.create_workflows()
elif stream == 'contacts':
LOGGER.info("self.record_create_times is %s", self.record_create_times)
if stream not in self.record_create_times.keys():
self.record_create_times[stream]=[]
self.record_create_times['contacts']=[]
records = self.create_contacts()
return records
elif stream == 'deal_pipelines':
Expand Down Expand Up @@ -1051,7 +1059,7 @@ def create_companies(self):
records = [response]
return records

def create_contact_lists(self):
def create_contact_lists(self, dynamic=True):
"""
HubSpot API https://legacydocs.hubspot.com/docs/methods/lists/create_list
Expand All @@ -1060,15 +1068,16 @@ def create_contact_lists(self):
using different filters would result in any new fields.
"""
record_uuid = str(uuid.uuid4()).replace('-', '')
value = f"@hubspot{record_uuid}"

url = f"{BASE_URL}/contacts/v1/lists/"
data = {
"name": f"tweeters{record_uuid}",
"dynamic": True,
"dynamic": dynamic,
"filters": [
[{
"operator": "EQ",
"value": f"@hubspot{record_uuid}",
"value": value,
"property": "twitterhandle",
"type": "string"
}]
Expand All @@ -1077,6 +1086,31 @@ def create_contact_lists(self):
# generate a record
response = self.post(url, data)
records = [response]
LOGGER.info("dynamic contact list is %s", records)
return records

def add_contact_to_contact_list(self, list_id, contact_email):
"""
HubSpot API https://legacydocs.hubspot.com/docs/methods/lists/create_list
NB: This generates a list based on a 'twitterhandle' filter. There are many
different filters, but at the time of implementation it did not seem that
using different filters would result in any new fields.
"""
record_uuid = str(uuid.uuid4()).replace('-', '')
value = f"@hubspot{record_uuid}"

url = f"{BASE_URL}/contacts/v1/lists/{list_id}/add"
data = {
"emails": [
contact_email
]
}
# generate a record
LOGGER.info("Post URL is %s", url)
response = self.post(url, data)
records = [response]
LOGGER.info("updated contact_list is %s", records)
return records

def create_contacts_by_company(self, company_ids=[], contact_records=[], times=1):
Expand Down
12 changes: 10 additions & 2 deletions tests/test_hubspot_all_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def get_matching_actual_record_by_pk(expected_primary_key_dict, actual_records):
'property_hs_num_associated_deal_splits',
'property_hs_is_active_shared_deal', #https://jira.talendforge.org/browse/TDL-24758
'property_hs_is_deal_split',
'property_hs_is_active_shared_deal',
'stateChanges',
'property_hs_num_associated_active_deal_registrations',
'property_hs_num_associated_deal_registrations',
Expand All @@ -157,6 +158,12 @@ def get_matching_actual_record_by_pk(expected_primary_key_dict, actual_records):
'property_hs_analytics_latest_source_data_2',
'property_hs_analytics_latest_source_data_2_contact',
'property_hs_deal_score',
'property_hs_is_active_shared_deal',
'property_hs_v2_date_entered_appointmentscheduled',
'property_hs_v2_date_exited_appointmentscheduled',
'property_hs_v2_latest_time_in_appointmentscheduled',
'property_hs_v2_cumulative_time_in_appointmentscheduled',
'property_hs_v2_date_entered_qualifiedtobuy'
},
'subscription_changes':{
'normalizedEmailId'
Expand Down Expand Up @@ -293,8 +300,9 @@ def test_run(self):
# to our test data. We have determined that the filtering of these fields is an expected behavior.

# deals workaround for 'property_hs_date_entered_<property>' fields
bad_key_prefixes = {'property_hs_date_entered_', 'property_hs_v2_date_entered_', 'property_hs_date_exited_',
'property_hs_time_in'}

bad_key_prefixes = {'property_hs_date_entered_', 'property_hs_date_exited_',
'property_hs_time_in', 'property_hs_'}
bad_keys = set()
for key in expected_keys_adjusted:
for prefix in bad_key_prefixes:
Expand Down
14 changes: 10 additions & 4 deletions tests/test_hubspot_bookmarks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime, timedelta
from time import sleep
import time


import tap_tester.connections as connections
Expand Down Expand Up @@ -34,7 +34,7 @@ def streams_to_test(self):

return expected_streams.difference({
'subscription_changes', # BUG_TDL-14938 https://jira.talendforge.org/browse/TDL-14938
})
})

def get_properties(self):
return {
Expand All @@ -57,17 +57,23 @@ def create_test_data(self, expected_streams):
for stream in expected_streams - {'contacts_by_company'}:
if stream == 'contacts':
self.times=10
elif stream == 'contact_lists':
self.times=2
else:
self.times =3

if stream in 'email_events':
if stream == 'email_events':
email_records = self.test_client.create(stream, self.times)
self.expected_records['email_events'] += email_records
else:
# create records, one will be updated between syncs
# create one static list and the rest dynamic list
for _ in range(self.times):
record = self.test_client.create(stream)
self.expected_records[stream] += record
if stream == 'contact_lists':
static_list = self.test_client.create('static_contact_lists')
self.expected_records[stream] += static_list

if 'contacts_by_company' in expected_streams: # do last
company_ids = [record['companyId'] for record in self.expected_records['companies']]
Expand Down Expand Up @@ -108,6 +114,7 @@ def test_run(self):
for stream in expected_streams - {'contacts_by_company'}:
record = self.test_client.create(stream)
self.expected_records[stream] += record

if 'contacts_by_company' in expected_streams:
company_ids = [record['companyId'] for record in self.expected_records['companies'][:-1]]
contact_records = self.expected_records['contacts'][-1:]
Expand All @@ -116,7 +123,6 @@ def test_run(self):
)
self.expected_records['contacts_by_company'] += record


# Update 1 record from the test seutp for each stream that has an update endpoint
for stream in expected_streams - STREAMS_WITHOUT_UPDATES:
primary_key = list(self.expected_primary_keys()[stream])[0]
Expand Down
Loading

0 comments on commit 100a505

Please sign in to comment.