Skip to content

Commit

Permalink
Limit the number of records returned as per the page_size defined
Browse files Browse the repository at this point in the history
  • Loading branch information
bhuvana-talend committed Oct 6, 2023
1 parent 08b9cfb commit 15c2e19
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 30 deletions.
74 changes: 49 additions & 25 deletions tests/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ def datatype_transformations(self, stream, records):
##########################################################################
### GET
##########################################################################
def read(self, stream, parent_ids=[], since=''):
### Take page_size also to limit the pagination to first 2 pages tdl-16124 ###
def read(self, stream, parent_ids=[], since='', page_size=0):

# Resets the access_token if the expiry time is less than or equal to the current time
if self.CONFIG["token_expires"] <= datetime.datetime.utcnow():
Expand All @@ -187,29 +188,29 @@ def read(self, stream, parent_ids=[], since=''):
elif stream == 'owners':
return self.get_owners()
elif stream == 'companies':
return self.get_companies(since)
return self.get_companies(since, page_size)
elif stream == 'contact_lists':
return self.get_contact_lists(since)
elif stream == 'contacts_by_company':
return self.get_contacts_by_company(parent_ids)
return self.get_contacts_by_company(parent_ids, page_size)
elif stream == 'engagements':
return self.get_engagements()
return self.get_engagements(page_size)
elif stream == 'campaigns':
return self.get_campaigns()
elif stream == 'deals':
return self.get_deals()
elif stream == 'workflows':
return self.get_workflows()
elif stream == 'contacts':
return self.get_contacts()
return self.get_contacts(page_size)
elif stream == 'deal_pipelines':
return self.get_deal_pipelines()
elif stream == 'email_events':
return self.get_email_events()
return self.get_email_events(page_size)
elif stream == 'subscription_changes':
return self.get_subscription_changes(since)
return self.get_subscription_changes(since, page_size)
elif stream == "tickets":
return self.get_tickets()
return self.get_tickets(page_size)
else:
raise NotImplementedError

Expand Down Expand Up @@ -238,7 +239,7 @@ def _get_company_by_id(self, company_id):
response = self.get(url)
return response

def get_companies(self, since=''):
def get_companies(self, since='', page_size=0):
"""
Get all companies by paginating using 'hasMore' and 'offset'.
"""
Expand Down Expand Up @@ -273,6 +274,8 @@ def get_companies(self, since=''):

has_more = response['has-more']
params['offset'] = response['offset']
if page_size and len(companies) > page_size+10:
break

# get the details of each company
for company in companies:
Expand All @@ -283,7 +286,7 @@ def get_companies(self, since=''):

return records

def get_contact_lists(self, since='', list_id=''):
def get_contact_lists(self, since='', list_id='', page_size=0):
"""
Get all contact_lists by paginating using 'has-more' and 'offset'.
"""
Expand Down Expand Up @@ -313,7 +316,7 @@ def get_contact_lists(self, since='', list_id=''):
# paginating through allxo the contact_lists
has_more = True
while has_more:

response = self.get(url, params=params)
for record in response['lists']:

Expand All @@ -322,6 +325,8 @@ def get_contact_lists(self, since='', list_id=''):

has_more = response['has-more']
params['offset'] = response['offset']
if page_size and len(records) > page_size+10:
break

return records

Expand Down Expand Up @@ -354,7 +359,7 @@ def _get_contacts_by_pks(self, pks):

return records[0]

def get_contacts(self):
def get_contacts(self, page_size=0):
"""
Get all contact vids by paginating using 'has-more' and 'vid-offset/vidOffset'.
Then use the vids to grab the detailed contacts records.
Expand All @@ -363,7 +368,7 @@ def get_contacts(self):
params_1 = {
'showListMemberships': True,
'includeVersion': True,
'count': 100,
'count': page_size,
}
vids = []
url_2 = f"{BASE_URL}/contacts/v1/contact/vids/batch/"
Expand All @@ -379,18 +384,21 @@ def get_contacts(self):
response_1 = self.get(url_1, params=params_1)
vids = [record['vid'] for record in response_1['contacts']
if record['versionTimestamp'] >= self.start_date]

has_more = response_1['has-more']
params_1['vidOffset'] = response_1['vid-offset']

# get the detailed contacts records by vids
params_2['vid'] = vids
response_2 = self.get(url_2, params=params_2)
records.extend([record for record in response_2.values()])
if page_size and len(records) > page_size+10:
break

records = self.denest_properties('contacts', records)
records = self.denest_properties('contacts is %s', records)
return records

def get_contacts_by_company(self, parent_ids):
def get_contacts_by_company(self, parent_ids, page_size=0):
"""
Get all contacts_by_company iterating over compnayId's and
paginating using 'hasMore' and 'vidOffset'. This stream is essentially
Expand All @@ -401,7 +409,7 @@ def get_contacts_by_company(self, parent_ids):
"""

url = f"{BASE_URL}/companies/v2/companies/{{}}/vids"
params = dict()
params = {'count': page_size}
records = []

for parent_id in parent_ids:
Expand All @@ -416,8 +424,10 @@ def get_contacts_by_company(self, parent_ids):

has_more = response['hasMore']
params['vidOffset'] = response['vidOffset']
if page_size and len(records) > page_size+10:
break

params = dict()
params = {'count': page_size}

return records

Expand Down Expand Up @@ -512,13 +522,13 @@ def get_deals(self):
records = self.denest_properties('deals', records)
return records

def get_email_events(self, recipient=''):
def get_email_events(self, recipient='', page_size=0):
"""
Get all email_events by paginating using 'hasMore' and 'offset'.
"""
url = f"{BASE_URL}/email/public/v1/events"
replication_key = list(self.replication_keys['email_events'])[0]
params = dict()
params = {'count': page_size}
if recipient:
params['recipient'] = recipient
records = []
Expand All @@ -532,6 +542,8 @@ def get_email_events(self, recipient=''):

has_more = response['hasMore']
params['offset'] = response['offset']
if page_size and len(records) > page_size+10:
break

return records

Expand All @@ -549,13 +561,13 @@ def _get_engagements_by_pk(self, engagement_id):

return response

def get_engagements(self):
def get_engagements(self, page_size=0):
"""
Get all engagements by paginating using 'hasMore' and 'offset'.
"""
url = f"{BASE_URL}/engagements/v1/engagements/paged"
replication_key = list(self.replication_keys['engagements'])[0]
params = {'limit': 250}
params = {'limit': page_size}
records = []

has_more = True
Expand All @@ -570,6 +582,8 @@ def get_engagements(self):

has_more = response['hasMore']
params['offset'] = response['offset']
if page_size and len(records) > page_size+10:
break

return records

Expand Down Expand Up @@ -606,13 +620,13 @@ def get_owners(self):
transformed_records = self.datatype_transformations('owners', records)
return transformed_records

def get_subscription_changes(self, since=''):
def get_subscription_changes(self, since='', page_size=0):
"""
Get all subscription_changes from 'since' date by paginating using 'hasMore' and 'offset'.
Default since date is one week ago
"""
url = f"{BASE_URL}/email/public/v1/subscriptions/timeline"
params = dict()
params = {'count': page_size}
records = []
replication_key = list(self.replication_keys['subscription_changes'])[0]
if not since:
Expand All @@ -632,6 +646,8 @@ def get_subscription_changes(self, since=''):
# this won't be feasible until BUG_TDL-14938 is addressed
if int(since) <= record['timestamp']:
records.append(record)
if page_size and len(records) > page_size+10:
break

return records

Expand Down Expand Up @@ -677,7 +693,7 @@ def get_tickets_properties(self):

return ",".join([record["name"] for record in records["results"]])

def get_tickets(self):
def get_tickets(self, page_size=0):
"""
Get all tickets.
HubSpot API https://developers.hubspot.com/docs/api/crm/tickets
Expand All @@ -688,7 +704,7 @@ def get_tickets(self):

# response = self.get(url)

params = {"limit": 100, "associations": "contact,company,deals", 'properties': self.get_tickets_properties()}
params = {"limit": page_size, "associations": "contact,company,deals", 'properties': self.get_tickets_properties()}
while True:
response = self.get(url, params=params)

Expand All @@ -698,6 +714,8 @@ def get_tickets(self):

if not response.get("paging"):
break
if page_size and len(records) > page_size+10:
break
params["after"] = response.get("paging").get("next").get("after")

records = self.denest_properties('tickets', records)
Expand Down Expand Up @@ -798,13 +816,19 @@ def create_contacts(self):
}

# generate a contacts record
LOGGER.info("Before Post")
response = self.post(url, data)
records = [response]

get_url = f"{BASE_URL}/contacts/v1/contact/vid/{response['vid']}/profile"
params = {'includeVersion': True}
LOGGER.info("Before Get")
get_resp = self.get(get_url, params=params)

created_time = get_resp.get('properties').get('createdate').get('value')
ts=int(created_time)/1000
LOGGER.info("Created Time %s", datetime.datetime.utcfromtimestamp(ts))

converted_versionTimestamp = self.BaseTest.datetime_from_timestamp(
get_resp['versionTimestamp'] / 1000, self.BOOKMARK_DATE_FORMAT
)
Expand Down
8 changes: 3 additions & 5 deletions tests/test_hubspot_pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,14 @@ def setUp(self):

# generate test data if necessary, one stream at a time
for stream in streams:

# Get all records
if stream == 'contacts_by_company':
company_ids = [company['companyId'] for company in existing_records['companies']]
existing_records[stream] = test_client.read(stream, parent_ids=company_ids)
existing_records[stream] = test_client.read(stream, parent_ids=company_ids, page_size=limits.get(stream))
elif stream in {'companies', 'contact_lists', 'subscription_changes', 'engagements', 'email_events'}:
existing_records[stream] = test_client.read(stream)
existing_records[stream] = test_client.read(stream, page_size=limits.get(stream))
else:
existing_records[stream] = test_client.read(stream)
existing_records[stream] = test_client.read(stream, page_size=limits.get(stream))

# check if we exceed the pagination limit
LOGGER.info(f"Pagination limit set to - {limits[stream]} and total number of existing record - {len(existing_records[stream])}")
Expand Down Expand Up @@ -92,7 +91,6 @@ def streams_to_test(self):
'email_events',
'subscription_changes', # BUG_TDL-14938 https://jira.talendforge.org/browse/TDL-14938
})

return streams_to_test

def test_run(self):
Expand Down

0 comments on commit 15c2e19

Please sign in to comment.