Skip to content

Commit

Permalink
Merge branch 'master' into dependabot/pip/requests-2.31.0
Browse files Browse the repository at this point in the history
  • Loading branch information
leslievandemark authored Nov 14, 2023
2 parents 946fa5d + 85f3de9 commit d9820c3
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 56 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
# Changelog

## 2.12.2
## 2.12.3
* Dependabot update [#228](https://github.com/singer-io/tap-hubspot/pull/228)

## 2.12.2
* Use engagements_page_size advanced option [#234](https://github.com/singer-io/tap-hubspot/pull/234)

## 2.12.1
* Use sync start time for writing bookmarks [#226](https://github.com/singer-io/tap-hubspot/pull/226)

Expand Down
2 changes: 1 addition & 1 deletion tap_hubspot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,7 @@ def sync_engagements(STATE, ctx):
singer.write_state(STATE)

url = get_url("engagements_all")
params = {'limit': 250}
params = {'limit': int(CONFIG.get('engagements_page_size') or 190)}
top_level_key = "results"
engagements = gen_request(STATE, 'engagements', url, params, top_level_key, "hasMore", ["offset"], ["offset"])

Expand Down
108 changes: 78 additions & 30 deletions tests/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import random
import uuid
import sys

import backoff
import requests
Expand All @@ -15,6 +16,7 @@ class TestClient():
START_DATE_FORMAT = "%Y-%m-%dT00:00:00Z"
V3_DEALS_PROPERTY_PREFIXES = {'hs_date_entered', 'hs_date_exited', 'hs_time_in'}
BOOKMARK_DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
record_create_times = {}

##########################################################################
### CORE METHODS
Expand Down Expand Up @@ -176,7 +178,8 @@ def datatype_transformations(self, stream, records):
##########################################################################
### GET
##########################################################################
def read(self, stream, parent_ids=[], since=''):
### Take pagination parameter to limit the pagination to first 2 pages tdl-16124 ###
def read(self, stream, parent_ids=[], since='', pagination=False):

# Resets the access_token if the expiry time is less than or equal to the current time
if self.CONFIG["token_expires"] <= datetime.datetime.utcnow():
Expand All @@ -187,29 +190,29 @@ def read(self, stream, parent_ids=[], since=''):
elif stream == 'owners':
return self.get_owners()
elif stream == 'companies':
return self.get_companies(since)
return self.get_companies(since, pagination)
elif stream == 'contact_lists':
return self.get_contact_lists(since)
return self.get_contact_lists(since, pagination=pagination)
elif stream == 'contacts_by_company':
return self.get_contacts_by_company(parent_ids)
return self.get_contacts_by_company(parent_ids, pagination)
elif stream == 'engagements':
return self.get_engagements()
return self.get_engagements(pagination)
elif stream == 'campaigns':
return self.get_campaigns()
elif stream == 'deals':
return self.get_deals()
elif stream == 'workflows':
return self.get_workflows()
elif stream == 'contacts':
return self.get_contacts()
return self.get_contacts(pagination)
elif stream == 'deal_pipelines':
return self.get_deal_pipelines()
elif stream == 'email_events':
return self.get_email_events()
return self.get_email_events(pagination)
elif stream == 'subscription_changes':
return self.get_subscription_changes(since)
return self.get_subscription_changes(since, pagination)
elif stream == "tickets":
return self.get_tickets()
return self.get_tickets(pagination)
else:
raise NotImplementedError

Expand Down Expand Up @@ -238,10 +241,11 @@ def _get_company_by_id(self, company_id):
response = self.get(url)
return response

def get_companies(self, since=''):
def get_companies(self, since='', pagination=False):
"""
Get all companies by paginating using 'hasMore' and 'offset'.
"""
page_size = self.BaseTest.expected_metadata().get('companies', {}).get(self.BaseTest.EXPECTED_PAGE_SIZE)
url = f"{BASE_URL}/companies/v2/companies/paged"
if not since:
since = self.start_date_strf
Expand Down Expand Up @@ -273,6 +277,8 @@ def get_companies(self, since=''):

has_more = response['has-more']
params['offset'] = response['offset']
if pagination and len(companies) > page_size+10:
break

# get the details of each company
for company in companies:
Expand All @@ -283,11 +289,12 @@ def get_companies(self, since=''):

return records

def get_contact_lists(self, since='', list_id=''):
def get_contact_lists(self, since='', list_id='', pagination=False):
"""
Get all contact_lists by paginating using 'has-more' and 'offset'.
"""
url = f"{BASE_URL}/contacts/v1/lists"
page_size = self.BaseTest.expected_metadata().get('contact_lists',{}).get(self.BaseTest.EXPECTED_PAGE_SIZE)

if list_id:
url += f"/{list_id}"
Expand All @@ -296,7 +303,7 @@ def get_contact_lists(self, since='', list_id=''):
return response

if since == 'all':
params = {'count': 250}
params = {'count': page_size}
else:
if not since:
since = self.start_date_strf
Expand All @@ -305,15 +312,14 @@ def get_contact_lists(self, since='', list_id=''):
since = datetime.datetime.strptime(since, self.START_DATE_FORMAT)

since = str(since.timestamp() * 1000).split(".")[0]
params = {'since': since, 'count': 250}
params = {'since': since, 'count': page_size}

records = []
replication_key = list(self.replication_keys['contact_lists'])[0]

# paginating through allxo the contact_lists
has_more = True
while has_more:

response = self.get(url, params=params)
for record in response['lists']:

Expand All @@ -322,6 +328,8 @@ def get_contact_lists(self, since='', list_id=''):

has_more = response['has-more']
params['offset'] = response['offset']
if pagination and len(records) > page_size+10:
break

return records

Expand Down Expand Up @@ -354,16 +362,17 @@ def _get_contacts_by_pks(self, pks):

return records[0]

def get_contacts(self):
def get_contacts(self, pagination=False):
"""
Get all contact vids by paginating using 'has-more' and 'vid-offset/vidOffset'.
Then use the vids to grab the detailed contacts records.
"""
page_size = self.BaseTest.expected_metadata().get('contacts',{}).get(self.BaseTest.EXPECTED_PAGE_SIZE)
url_1 = f"{BASE_URL}/contacts/v1/lists/all/contacts/all"
params_1 = {
'showListMemberships': True,
'includeVersion': True,
'count': 100,
'count': page_size,
}
vids = []
url_2 = f"{BASE_URL}/contacts/v1/contact/vids/batch/"
Expand All @@ -379,18 +388,21 @@ def get_contacts(self):
response_1 = self.get(url_1, params=params_1)
vids = [record['vid'] for record in response_1['contacts']
if record['versionTimestamp'] >= self.start_date]

has_more = response_1['has-more']
params_1['vidOffset'] = response_1['vid-offset']

# get the detailed contacts records by vids
params_2['vid'] = vids
response_2 = self.get(url_2, params=params_2)
records.extend([record for record in response_2.values()])
if pagination and len(records) > page_size+10:
break

records = self.denest_properties('contacts', records)
return records

def get_contacts_by_company(self, parent_ids):
def get_contacts_by_company(self, parent_ids, pagination=False):
"""
Get all contacts_by_company iterating over compnayId's and
paginating using 'hasMore' and 'vidOffset'. This stream is essentially
Expand All @@ -400,8 +412,9 @@ def get_contacts_by_company(self, parent_ids):
pulling the 'companyId' from each record to perform the corresponding get here.
"""

page_size = self.BaseTest.expected_metadata().get('contacts_by_company', {}).get(self.BaseTest.EXPECTED_PAGE_SIZE)
url = f"{BASE_URL}/companies/v2/companies/{{}}/vids"
params = dict()
params = {'count': page_size}
records = []

for parent_id in parent_ids:
Expand All @@ -416,8 +429,10 @@ def get_contacts_by_company(self, parent_ids):

has_more = response['hasMore']
params['vidOffset'] = response['vidOffset']
if pagination and len(records) > page_size+10:
break

params = dict()
params = {'count': page_size}

return records

Expand Down Expand Up @@ -512,13 +527,14 @@ def get_deals(self):
records = self.denest_properties('deals', records)
return records

def get_email_events(self, recipient=''):
def get_email_events(self, recipient='', pagination=False):
"""
Get all email_events by paginating using 'hasMore' and 'offset'.
"""
page_size = self.BaseTest.expected_metadata().get('email_events',{}).get(self.BaseTest.EXPECTED_PAGE_SIZE)
url = f"{BASE_URL}/email/public/v1/events"
replication_key = list(self.replication_keys['email_events'])[0]
params = dict()
params = {'count': page_size}
if recipient:
params['recipient'] = recipient
records = []
Expand All @@ -532,6 +548,8 @@ def get_email_events(self, recipient=''):

has_more = response['hasMore']
params['offset'] = response['offset']
if pagination and len(records) > page_size+10:
break

return records

Expand All @@ -549,13 +567,14 @@ def _get_engagements_by_pk(self, engagement_id):

return response

def get_engagements(self):
def get_engagements(self, pagination=False):
"""
Get all engagements by paginating using 'hasMore' and 'offset'.
"""
page_size = self.BaseTest.expected_metadata().get('engagements',{}).get(self.BaseTest.EXPECTED_PAGE_SIZE)
url = f"{BASE_URL}/engagements/v1/engagements/paged"
replication_key = list(self.replication_keys['engagements'])[0]
params = {'limit': 250}
params = {'limit': page_size}
records = []

has_more = True
Expand All @@ -570,6 +589,8 @@ def get_engagements(self):

has_more = response['hasMore']
params['offset'] = response['offset']
if pagination and len(records) > page_size+10:
break

return records

Expand Down Expand Up @@ -606,13 +627,14 @@ def get_owners(self):
transformed_records = self.datatype_transformations('owners', records)
return transformed_records

def get_subscription_changes(self, since=''):
def get_subscription_changes(self, since='', pagination=False):
"""
Get all subscription_changes from 'since' date by paginating using 'hasMore' and 'offset'.
Default since date is one week ago
"""
page_size = self.BaseTest.expected_metadata().get('subscription_changes',{}).get(self.BaseTest.EXPECTED_PAGE_SIZE)
url = f"{BASE_URL}/email/public/v1/subscriptions/timeline"
params = dict()
params = {'count': page_size}
records = []
replication_key = list(self.replication_keys['subscription_changes'])[0]
if not since:
Expand All @@ -632,6 +654,8 @@ def get_subscription_changes(self, since=''):
# this won't be feasible until BUG_TDL-14938 is addressed
if int(since) <= record['timestamp']:
records.append(record)
if pagination and len(records) > page_size+10:
break

return records

Expand Down Expand Up @@ -677,18 +701,19 @@ def get_tickets_properties(self):

return ",".join([record["name"] for record in records["results"]])

def get_tickets(self):
def get_tickets(self, pagination=False):
"""
Get all tickets.
HubSpot API https://developers.hubspot.com/docs/api/crm/tickets
"""
page_size = self.BaseTest.expected_metadata().get('tickets',{}).get(self.BaseTest.EXPECTED_PAGE_SIZE)
url = f"{BASE_URL}/crm/v4/objects/tickets"
replication_key = list(self.replication_keys["tickets"])[0]
records = []

# response = self.get(url)

params = {"limit": 100, "associations": "contact,company,deals", 'properties': self.get_tickets_properties()}
params = {"limit": page_size, "associations": "contact,company,deals", 'properties': self.get_tickets_properties()}
while True:
response = self.get(url, params=params)

Expand All @@ -698,6 +723,8 @@ def get_tickets(self):

if not response.get("paging"):
break
if page_size and len(records) > page_size+10:
break
params["after"] = response.get("paging").get("next").get("after")

records = self.denest_properties('tickets', records)
Expand Down Expand Up @@ -733,7 +760,10 @@ def create(self, stream, company_ids=[], subscriptions=[], times=1):
elif stream == 'workflows':
return self.create_workflows()
elif stream == 'contacts':
return self.create_contacts()
if stream not in self.record_create_times.keys():
self.record_create_times[stream]=[]
records = self.create_contacts()
return records
elif stream == 'deal_pipelines':
return self.create_deal_pipelines()
elif stream == 'email_events':
Expand Down Expand Up @@ -797,6 +827,10 @@ def create_contacts(self):
]
}

# Get the current time in seconds
date = datetime.datetime.utcnow()
seconds = datetime.datetime.timestamp(date)

# generate a contacts record
response = self.post(url, data)
records = [response]
Expand All @@ -805,6 +839,12 @@ def create_contacts(self):
params = {'includeVersion': True}
get_resp = self.get(get_url, params=params)

#Get the created time and the difference to monitor the time difference - tdl-20939
created_time = get_resp.get('properties').get('createdate').get('value')
ts=int(created_time)/1000
LOGGER.info("Created Time %s", datetime.datetime.utcfromtimestamp(ts))
self.record_create_times["contacts"].append(ts-seconds)

converted_versionTimestamp = self.BaseTest.datetime_from_timestamp(
get_resp['versionTimestamp'] / 1000, self.BOOKMARK_DATE_FORMAT
)
Expand Down Expand Up @@ -880,7 +920,8 @@ def create_contacts_by_company(self, company_ids=[], contact_records=[], times=1
if not company_ids:
company_ids = [company['companyId'] for company in self.get_companies()]
if not contact_records:
contact_records = self.get_contacts()
page_size = self.BaseTest.expected_metadata().get('contacts_by_company',{}).get(self.BaseTest.EXPECTED_PAGE_SIZE)
contact_records = self.get_contacts(page_size)

records = []
for _ in range(times):
Expand Down Expand Up @@ -1041,7 +1082,8 @@ def create_engagements(self):
record_uuid = str(uuid.uuid4()).replace('-', '')

# gather all contacts and randomly choose one that has not hit the limit
contact_records = self.get_contacts()
page_size = self.BaseTest.expected_metadata().get('engagements',{}).get(self.BaseTest.EXPECTED_PAGE_SIZE)
contact_records = self.get_contacts(page_size)
contact_ids = [contact['vid']
for contact in contact_records
if contact['vid'] != 2304] # contact 2304 has hit the 10,000 assoc limit
Expand Down Expand Up @@ -1677,3 +1719,9 @@ def __init__(self, start_date=''):
delete_count = int(max_record_count / 2)
self.cleanup(stream, records, delete_count)
LOGGER.info(f"TEST CLIENT | {delete_count} records deleted from {stream}")

def print_histogram_data(self):
for stream, recorded_times in self.record_create_times.items():
LOGGER.info("Time taken for stream {} is total: {}, avg: {}, minimum: {}, maximum: {}".
format(stream, sum(recorded_times), sum(recorded_times)/len(recorded_times), min(recorded_times), max(recorded_times) ))

Loading

0 comments on commit d9820c3

Please sign in to comment.