From d747271df1c777d2815cf06d8ffa0394b3bf0c2d Mon Sep 17 00:00:00 2001 From: Sourabh Gandhi Date: Fri, 12 Apr 2024 18:32:02 +0530 Subject: [PATCH 01/12] Update pagination logic for streams - accounts,campaign_groups,campaigns,creatives --- tap_linkedin_ads/streams.py | 64 ++++++++++++++++++++++++------------- 1 file changed, 42 insertions(+), 22 deletions(-) diff --git a/tap_linkedin_ads/streams.py b/tap_linkedin_ads/streams.py index e13ee36..3bcf52b 100644 --- a/tap_linkedin_ads/streams.py +++ b/tap_linkedin_ads/streams.py @@ -1,4 +1,5 @@ import urllib.parse +import re import copy import datetime from datetime import timedelta @@ -21,6 +22,8 @@ 'creativeId', } +CURSOR_BASED_PAGINATION_STREAMS = ["accounts", "campaign_groups", "campaigns", "creatives"] + def write_bookmark(state, value, stream_name): """ Write the bookmark in the state corresponding to the stream. @@ -71,29 +74,40 @@ def sync_analytics_endpoint(client, stream_name, path, query_string): data = client.get(url=next_url, endpoint=stream_name) yield data # Fetch next page - next_url = get_next_url(data) + next_url = get_next_url(stream_name, next_url, data) LOGGER.info('%s: Synced page %s', stream_name, page) page = page + 1 -def get_next_url(data): +def get_next_url(stream_name, next_url, data): """ Prepare and return the URL to fetch the next page of records. """ - next_url = None - links = data.get('paging', {}).get('links', []) - for link in links: - rel = link.get('rel') - if rel == 'next': - href = link.get('href') - if href: - # url must be kept encoded for the creatives endpoint. - # Ref - https://learn.microsoft.com/en-us/linkedin/marketing/integrations/ads/account-structure/create-and-manage-creatives?view=li-lms-2023-01&tabs=http#sample-request-3 - if "rest/creatives" in href: - return 'https://api.linkedin.com{}'.format(href) - # Prepare next page URL - next_url = 'https://api.linkedin.com{}'.format(urllib.parse.unquote(href)) - return next_url + if stream_name in CURSOR_BASED_PAGINATION_STREAMS: + next_page_token = data.get('metadata', {}).get('nextPageToken', None) + if next_page_token: + if 'pageToken=' in next_url: + next_url = re.sub(r'pageToken=[^&]+', f'pageToken={next_page_token}', next_url) + else: + next_url = f"{next_url}&pageToken={next_page_token}" + else: + next_url = None + return next_url + else: + next_url = None + links = data.get('paging', {}).get('links', []) + for link in links: + rel = link.get('rel') + if rel == 'next': + href = link.get('href') + if href: + # url must be kept encoded for the creatives endpoint. + # Ref - https://learn.microsoft.com/en-us/linkedin/marketing/integrations/ads/account-structure/create-and-manage-creatives?view=li-lms-2023-01&tabs=http#sample-request-3 + if "rest/creatives" in href: + return 'https://api.linkedin.com{}'.format(href) + # Prepare next page URL + next_url = 'https://api.linkedin.com{}'.format(urllib.parse.unquote(href)) + return next_url def shift_sync_window(params, today, date_window_size, forced_window_size=None): """ @@ -301,11 +315,17 @@ def sync_endpoint(self, total_records = 0 page = 1 - endpoint_params = { - 'start': start, - 'count': page_size, - **self.params # adds in endpoint specific, sort, filter params - } + if self.tap_stream_id in CURSOR_BASED_PAGINATION_STREAMS: + endpoint_params = { + 'pageSize': page_size, + **self.params + } + else: + endpoint_params = { + 'start': start, + 'count': page_size, + **self.params # adds in endpoint specific, sort, filter params + } querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in endpoint_params.items()]) next_url = 'https://api.linkedin.com/rest/{}?{}'.format(self.path, querystring) @@ -419,7 +439,7 @@ def sync_endpoint(self, LOGGER.info('FINISHED Syncing: %s', child_stream_name) # Pagination: Get next_url - next_url = get_next_url(data) + next_url = get_next_url(self.tap_stream_id, next_url, data) if self.tap_stream_id in selected_streams: LOGGER.info('%s: Synced page %s, this page: %s. Total records processed: %s', From 7bbcad057ce71a30b845d8b638c5d32e98363e2d Mon Sep 17 00:00:00 2001 From: Sourabh Gandhi Date: Fri, 12 Apr 2024 23:49:13 +0530 Subject: [PATCH 02/12] update API endpoint --- tap_linkedin_ads/client.py | 2 +- tap_linkedin_ads/streams.py | 248 +++++++++++++++++++----------------- tap_linkedin_ads/sync.py | 7 +- 3 files changed, 133 insertions(+), 124 deletions(-) diff --git a/tap_linkedin_ads/client.py b/tap_linkedin_ads/client.py index a221d08..6c7eaa3 100644 --- a/tap_linkedin_ads/client.py +++ b/tap_linkedin_ads/client.py @@ -11,7 +11,7 @@ BASE_URL = 'https://api.linkedin.com/rest' LINKEDIN_TOKEN_URI = 'https://www.linkedin.com/oauth/v2/accessToken' INTROSPECTION_URI = 'https://www.linkedin.com/oauth/v2/introspectToken' -LINKEDIN_VERSION = '202304' +LINKEDIN_VERSION = '202403' # set default timeout of 300 seconds REQUEST_TIMEOUT = 300 diff --git a/tap_linkedin_ads/streams.py b/tap_linkedin_ads/streams.py index 3bcf52b..7207acd 100644 --- a/tap_linkedin_ads/streams.py +++ b/tap_linkedin_ads/streams.py @@ -23,6 +23,8 @@ } CURSOR_BASED_PAGINATION_STREAMS = ["accounts", "campaign_groups", "campaigns", "creatives"] +NEW_PATH_STREAMS = ["campaign_groups", "campaigns", "creatives"] +BASE_URL = 'https://api.linkedin.com/rest' def write_bookmark(state, value, stream_name): """ @@ -274,6 +276,7 @@ def process_records(self, # pylint: disable=too-many-branches,too-many-statements,too-many-arguments,too-many-locals def sync_endpoint(self, client, + account_list, catalog, state, page_size, @@ -328,126 +331,135 @@ def sync_endpoint(self, } querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in endpoint_params.items()]) - next_url = 'https://api.linkedin.com/rest/{}?{}'.format(self.path, querystring) - while next_url: #pylint: disable=too-many-nested-blocks - LOGGER.info('URL for %s: %s', self.tap_stream_id, next_url) - - # Get data, API request - data = client.get( - url=next_url, - endpoint=self.tap_stream_id, - headers=self.headers) - # time_extracted: datetime when the data was extracted from the API - time_extracted = utils.now() - - # Transform data with transform_json from transform.py - # This function converts unix datetimes, de-nests audit fields, - # tranforms URNs to IDs, tranforms/abstracts variably named fields, - # converts camelCase to snake_case for fieldname keys. - # For the Linkedin Ads API, 'elements' is always the root data_key for records. - # The data_key identifies the collection of records below the element - transformed_data = [] # initialize the record list - if self.data_key in data: - transformed_data = transform_json(data, self.tap_stream_id)[self.data_key] - if not transformed_data or transformed_data is None: - LOGGER.info('No transformed_data') - break # No data results - - pre_singer_transformed_data = copy.deepcopy(transformed_data) - if self.tap_stream_id in selected_streams: - # Process records and gets the max_bookmark_value and record_count for the set of records - max_bookmark_value, record_count = self.process_records( - catalog=catalog, - records=transformed_data, - time_extracted=time_extracted, - bookmark_field=bookmark_field, - max_bookmark_value=max_bookmark_value, - last_datetime=last_datetime, - parent_id=parent_id) - LOGGER.info('%s, records processed: %s', self.tap_stream_id, record_count) - total_records = total_records + record_count - - # Loop thru parent batch records for each children objects - for child_stream_name in children: - if child_stream_name in selected_streams: - # For each parent record - child_obj = STREAMS[child_stream_name]() - - for record in pre_singer_transformed_data: - - parent_id = record.get(child_obj.foreign_key) - - child_stream_params = child_obj.params - # Add children filter params based on parent IDs - if self.tap_stream_id == 'accounts': - account = 'urn:li:sponsoredAccount:{}'.format(parent_id) - owner_id = record.get('reference_organization_id', None) - owner = 'urn:li:organization:{}'.format(owner_id) - if child_stream_name == 'video_ads' and owner_id is not None: - child_stream_params['account'] = account - child_stream_params['owner'] = owner + urllist = [] + if self.tap_stream_id in NEW_PATH_STREAMS: + for account in account_list: + url = f"{BASE_URL}/adAccounts/{account}/{self.path}?{querystring}" + urllist.append(url) + else: + url = 'https://api.linkedin.com/rest/{}?{}'.format(self.path, querystring) + urllist.append(url) + + for next_url in urllist: + while next_url: #pylint: disable=too-many-nested-blocks + LOGGER.info('URL for %s: %s', self.tap_stream_id, next_url) + + # Get data, API request + data = client.get( + url=next_url, + endpoint=self.tap_stream_id, + headers=self.headers) + # time_extracted: datetime when the data was extracted from the API + time_extracted = utils.now() + + # Transform data with transform_json from transform.py + # This function converts unix datetimes, de-nests audit fields, + # tranforms URNs to IDs, tranforms/abstracts variably named fields, + # converts camelCase to snake_case for fieldname keys. + # For the Linkedin Ads API, 'elements' is always the root data_key for records. + # The data_key identifies the collection of records below the element + transformed_data = [] # initialize the record list + if self.data_key in data: + transformed_data = transform_json(data, self.tap_stream_id)[self.data_key] + if not transformed_data or transformed_data is None: + LOGGER.info('No transformed_data') + break # No data results + + pre_singer_transformed_data = copy.deepcopy(transformed_data) + if self.tap_stream_id in selected_streams: + # Process records and gets the max_bookmark_value and record_count for the set of records + max_bookmark_value, record_count = self.process_records( + catalog=catalog, + records=transformed_data, + time_extracted=time_extracted, + bookmark_field=bookmark_field, + max_bookmark_value=max_bookmark_value, + last_datetime=last_datetime, + parent_id=parent_id) + LOGGER.info('%s, records processed: %s', self.tap_stream_id, record_count) + total_records = total_records + record_count + + # Loop thru parent batch records for each children objects + for child_stream_name in children: + if child_stream_name in selected_streams: + # For each parent record + child_obj = STREAMS[child_stream_name]() + + for record in pre_singer_transformed_data: + + parent_id = record.get(child_obj.foreign_key) + + child_stream_params = child_obj.params + # Add children filter params based on parent IDs + if self.tap_stream_id == 'accounts': + account = 'urn:li:sponsoredAccount:{}'.format(parent_id) + owner_id = record.get('reference_organization_id', None) + owner = 'urn:li:organization:{}'.format(owner_id) + if child_stream_name == 'video_ads' and owner_id is not None: + child_stream_params['account'] = account + child_stream_params['owner'] = owner + else: + LOGGER.warning("Skipping video_ads call for %s account as reference_organization_id is not found.", account) + continue + elif self.tap_stream_id == 'campaigns': + campaign = 'urn:li:sponsoredCampaign:{}'.format(parent_id) + if child_stream_name == 'creatives': + # The value of the campaigns in the query params should be passed in the encoded format. + # Ref - https://learn.microsoft.com/en-us/linkedin/marketing/integrations/ads/account-structure/create-and-manage-creatives?view=li-lms-2023-01&tabs=http#sample-request-3 + child_stream_params['campaigns'] = 'List(urn%3Ali%3AsponsoredCampaign%3A{})'.format(parent_id) + elif child_stream_name in ('ad_analytics_by_campaign', 'ad_analytics_by_creative'): + child_stream_params['campaigns[0]'] = campaign + + # Update params for the child stream + child_obj.params = child_stream_params + LOGGER.info('Syncing: %s, parent_stream: %s, parent_id: %s', + child_stream_name, + self.tap_stream_id, + parent_id) + + # Call sync method for the child stream + if child_stream_name in {'ad_analytics_by_campaign', 'ad_analytics_by_creative'}: + child_total_records, child_batch_bookmark_value = child_obj.sync_ad_analytics( + client=client, + catalog=catalog, + last_datetime=child_obj.get_bookmark(state, start_date), + date_window_size=date_window_size, + parent_id=parent_id) else: - LOGGER.warning("Skipping video_ads call for %s account as reference_organization_id is not found.", account) - continue - elif self.tap_stream_id == 'campaigns': - campaign = 'urn:li:sponsoredCampaign:{}'.format(parent_id) - if child_stream_name == 'creatives': - # The value of the campaigns in the query params should be passed in the encoded format. - # Ref - https://learn.microsoft.com/en-us/linkedin/marketing/integrations/ads/account-structure/create-and-manage-creatives?view=li-lms-2023-01&tabs=http#sample-request-3 - child_stream_params['campaigns'] = 'List(urn%3Ali%3AsponsoredCampaign%3A{})'.format(parent_id) - elif child_stream_name in ('ad_analytics_by_campaign', 'ad_analytics_by_creative'): - child_stream_params['campaigns[0]'] = campaign - - # Update params for the child stream - child_obj.params = child_stream_params - LOGGER.info('Syncing: %s, parent_stream: %s, parent_id: %s', - child_stream_name, - self.tap_stream_id, - parent_id) - - # Call sync method for the child stream - if child_stream_name in {'ad_analytics_by_campaign', 'ad_analytics_by_creative'}: - child_total_records, child_batch_bookmark_value = child_obj.sync_ad_analytics( - client=client, - catalog=catalog, - last_datetime=child_obj.get_bookmark(state, start_date), - date_window_size=date_window_size, - parent_id=parent_id) - else: - child_total_records, child_batch_bookmark_value = child_obj.sync_endpoint( - client=client, - catalog=catalog, - state=state, - page_size=page_size, - start_date=start_date, - selected_streams=selected_streams, - date_window_size=date_window_size, - parent_id=parent_id) - - child_batch_bookmark_dttm = strptime_to_utc(child_batch_bookmark_value) - child_max_bookmark = child_max_bookmarks.get(child_stream_name) - child_max_bookmark_dttm = strptime_to_utc(child_max_bookmark) - if child_batch_bookmark_dttm > child_max_bookmark_dttm: - # Update bookmark for child stream. - child_max_bookmarks[child_stream_name] = strftime(child_batch_bookmark_dttm) - - LOGGER.info('Synced: %s, parent_id: %s, total_records: %s', - child_stream_name, - parent_id, - child_total_records) - LOGGER.info('FINISHED Syncing: %s', child_stream_name) - - # Pagination: Get next_url - next_url = get_next_url(self.tap_stream_id, next_url, data) - - if self.tap_stream_id in selected_streams: - LOGGER.info('%s: Synced page %s, this page: %s. Total records processed: %s', - self.tap_stream_id, - page, - record_count, - total_records) - page = page + 1 + child_total_records, child_batch_bookmark_value = child_obj.sync_endpoint( + client=client, + catalog=catalog, + state=state, + page_size=page_size, + start_date=start_date, + selected_streams=selected_streams, + date_window_size=date_window_size, + parent_id=parent_id) + + child_batch_bookmark_dttm = strptime_to_utc(child_batch_bookmark_value) + child_max_bookmark = child_max_bookmarks.get(child_stream_name) + child_max_bookmark_dttm = strptime_to_utc(child_max_bookmark) + if child_batch_bookmark_dttm > child_max_bookmark_dttm: + # Update bookmark for child stream. + child_max_bookmarks[child_stream_name] = strftime(child_batch_bookmark_dttm) + + LOGGER.info('Synced: %s, parent_id: %s, total_records: %s', + child_stream_name, + parent_id, + child_total_records) + LOGGER.info('FINISHED Syncing: %s', child_stream_name) + + # Pagination: Get next_url + next_url = get_next_url(self.tap_stream_id, next_url, data) + + if self.tap_stream_id in selected_streams: + LOGGER.info('%s: Synced page %s, this page: %s. Total records processed: %s', + self.tap_stream_id, + page, + record_count, + total_records) + page = page + 1 # Write child stream's bookmarks for key, val in list(child_max_bookmarks.items()): diff --git a/tap_linkedin_ads/sync.py b/tap_linkedin_ads/sync.py index 85a5b08..37207ef 100644 --- a/tap_linkedin_ads/sync.py +++ b/tap_linkedin_ads/sync.py @@ -109,9 +109,6 @@ def sync(client, config, catalog, state): for idx, account in enumerate(account_list): if account_filter == 'search_id_values_param': params['search.id.values[{}]'.format(idx)] = int(account) - elif account_filter == 'search_account_values_param': - params['search.account.values[{}]'.format(idx)] = \ - 'urn:li:sponsoredAccount:{}'.format(account) elif account_filter == 'accounts_param': params['accounts[{}]'.format(idx)] = \ 'urn:li:sponsoredAccount:{}'.format(account) @@ -127,8 +124,8 @@ def sync(client, config, catalog, state): stream_obj.write_schema(catalog) total_records, max_bookmark_value = stream_obj.sync_endpoint( - client=client, catalog=catalog, - state=state, + client=client, account_list=account_list + catalog=catalog, state=state, page_size=page_size, start_date=start_date, selected_streams=selected_streams, From f7f988a498fb8300e117e449c5cfbe256f94fdf6 Mon Sep 17 00:00:00 2001 From: Sourabh Gandhi Date: Sat, 13 Apr 2024 09:42:54 +0530 Subject: [PATCH 03/12] update the APi endpoints to include Adaccount in the url --- tap_linkedin_ads/streams.py | 21 +++++++++------------ tap_linkedin_ads/sync.py | 8 ++++---- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/tap_linkedin_ads/streams.py b/tap_linkedin_ads/streams.py index 7207acd..b762b33 100644 --- a/tap_linkedin_ads/streams.py +++ b/tap_linkedin_ads/streams.py @@ -276,14 +276,14 @@ def process_records(self, # pylint: disable=too-many-branches,too-many-statements,too-many-arguments,too-many-locals def sync_endpoint(self, client, - account_list, catalog, state, page_size, start_date, selected_streams, date_window_size, - parent_id=None): + parent_id=None, + account_list=None): """ Sync a specific parent or child endpoint. """ @@ -336,12 +336,12 @@ def sync_endpoint(self, if self.tap_stream_id in NEW_PATH_STREAMS: for account in account_list: url = f"{BASE_URL}/adAccounts/{account}/{self.path}?{querystring}" - urllist.append(url) + urllist.append((account, url)) else: url = 'https://api.linkedin.com/rest/{}?{}'.format(self.path, querystring) - urllist.append(url) + urllist.append((None, url)) - for next_url in urllist: + for acct_id, next_url in urllist: while next_url: #pylint: disable=too-many-nested-blocks LOGGER.info('URL for %s: %s', self.tap_stream_id, next_url) @@ -435,7 +435,8 @@ def sync_endpoint(self, start_date=start_date, selected_streams=selected_streams, date_window_size=date_window_size, - parent_id=parent_id) + parent_id=parent_id, + account_list=[acct_id]) child_batch_bookmark_dttm = strptime_to_utc(child_batch_bookmark_value) child_max_bookmark = child_max_bookmarks.get(child_stream_name) @@ -641,9 +642,7 @@ class CampaignGroups(LinkedInAds): path = "adCampaignGroups" data_key = "elements" params = { - "q": "search", - "sort.field": "ID", - "sort.order": "ASCENDING" + "q": "search" } class Campaigns(LinkedInAds): @@ -659,9 +658,7 @@ class Campaigns(LinkedInAds): data_key = "elements" children = ["ad_analytics_by_campaign", "creatives", "ad_analytics_by_creative"] params = { - "q": "search", - "sort.field": "ID", - "sort.order": "ASCENDING" + "q": "search" } class Creatives(LinkedInAds): diff --git a/tap_linkedin_ads/sync.py b/tap_linkedin_ads/sync.py index 37207ef..dab7d9b 100644 --- a/tap_linkedin_ads/sync.py +++ b/tap_linkedin_ads/sync.py @@ -124,12 +124,12 @@ def sync(client, config, catalog, state): stream_obj.write_schema(catalog) total_records, max_bookmark_value = stream_obj.sync_endpoint( - client=client, account_list=account_list - catalog=catalog, state=state, - page_size=page_size, + client=client, catalog=catalog, + state=state, page_size=page_size, start_date=start_date, selected_streams=selected_streams, - date_window_size=date_window_size) + date_window_size=date_window_size, + account_list=account_list) # Write parent stream's bookmarks if stream_obj.replication_keys and stream_name in selected_streams: From faa6a399f0dc4f4a96ce404f42713c9d53ec2719 Mon Sep 17 00:00:00 2001 From: Sourabh Gandhi Date: Sat, 13 Apr 2024 10:07:23 +0530 Subject: [PATCH 04/12] remove unsupported params for Accounts stream --- tap_linkedin_ads/streams.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tap_linkedin_ads/streams.py b/tap_linkedin_ads/streams.py index b762b33..04f2c1e 100644 --- a/tap_linkedin_ads/streams.py +++ b/tap_linkedin_ads/streams.py @@ -594,9 +594,7 @@ class Accounts(LinkedInAds): data_key = "elements" children = ["video_ads"] params = { - "q": "search", - "sort.field": "ID", - "sort.order": "ASCENDING" + "q": "search" } class VideoAds(LinkedInAds): From b99949c56c63f241426e661afcd1990ec931376c Mon Sep 17 00:00:00 2001 From: Sourabh Gandhi Date: Sat, 13 Apr 2024 17:24:27 +0530 Subject: [PATCH 05/12] updated query param format for accounts --- tap_linkedin_ads/streams.py | 4 ++++ tap_linkedin_ads/sync.py | 19 ++++++++++++------- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/tap_linkedin_ads/streams.py b/tap_linkedin_ads/streams.py index 04f2c1e..de683ca 100644 --- a/tap_linkedin_ads/streams.py +++ b/tap_linkedin_ads/streams.py @@ -319,6 +319,9 @@ def sync_endpoint(self, page = 1 if self.tap_stream_id in CURSOR_BASED_PAGINATION_STREAMS: + # hardcoding the pagesize to 1000 for stream - accounts, as search and pageToken param can't be present at the same time. + if self.tap_stream_id == "accounts": + page_size = 1000 endpoint_params = { 'pageSize': page_size, **self.params @@ -596,6 +599,7 @@ class Accounts(LinkedInAds): params = { "q": "search" } + headers = {'X-Restli-Protocol-Version': "2.0.0"} class VideoAds(LinkedInAds): """ diff --git a/tap_linkedin_ads/sync.py b/tap_linkedin_ads/sync.py index dab7d9b..ad01778 100644 --- a/tap_linkedin_ads/sync.py +++ b/tap_linkedin_ads/sync.py @@ -105,16 +105,21 @@ def sync(client, config, catalog, state): account_filter = stream_obj.account_filter if config.get("accounts") and account_filter is not None: account_list = config['accounts'].replace(" ", "").split(",") - params = stream_obj.params - for idx, account in enumerate(account_list): + if len(account_list) > 0: + params = stream_obj.params if account_filter == 'search_id_values_param': - params['search.id.values[{}]'.format(idx)] = int(account) + # Convert account IDs to URN format + urn_list = [f"urn%3Ali%3AsponsoredAccount%3A{account_id}" for account_id in account_list] + # Create the query parameter string + param_value = f"(id:(values:List({','.join(urn_list)})))" + params['search'] = param_value elif account_filter == 'accounts_param': - params['accounts[{}]'.format(idx)] = \ - 'urn:li:sponsoredAccount:{}'.format(account) + for idx, account in enumerate(account_list): + params['accounts[{}]'.format(idx)] = \ + 'urn:li:sponsoredAccount:{}'.format(account) - # Update params of specific stream - stream_obj.params = params + # Update params of specific stream + stream_obj.params = params LOGGER.info('START Syncing: %s', stream_name) update_currently_syncing(state, stream_name) From b2aa449517ef001d667cf412077d6a985608af30 Mon Sep 17 00:00:00 2001 From: Sourabh Gandhi Date: Sat, 13 Apr 2024 21:02:33 +0530 Subject: [PATCH 06/12] handle pivot field for analytics API --- tap_linkedin_ads/streams.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/tap_linkedin_ads/streams.py b/tap_linkedin_ads/streams.py index de683ca..7d37955 100644 --- a/tap_linkedin_ads/streams.py +++ b/tap_linkedin_ads/streams.py @@ -20,6 +20,9 @@ 'endAt', 'creative', 'creativeId', + # `pivot` and `pivotValue` is not supported anymore, adding values within the code + 'pivot', + 'pivotValue' } CURSOR_BASED_PAGINATION_STREAMS = ["accounts", "campaign_groups", "campaigns", "creatives"] @@ -133,7 +136,7 @@ def shift_sync_window(params, today, date_window_size, forced_window_size=None): 'dateRange.end.year': new_end.year,} return current_end, new_end, new_params -def merge_responses(data): +def merge_responses(pivot, data): """ Prepare map with key as primary key and value as the record itself for analytics streams. The primary key is a combination of pivotValue and start date fields value. @@ -145,7 +148,10 @@ def merge_responses(data): # Loop through each record of the page for element in page: temp_start = element['dateRange']['start'] - temp_pivotValue = element['pivotValue'] + temp_pivotValue = element['pivotValues'][0] + # adding pivot and pivot_value to make it compatible with the previous tap version 2.2.0 + element['pivot'] = pivot + element["pivot_value"] = temp_pivotValue string_start = '{}-{}-{}'.format(temp_start['year'], temp_start['month'], temp_start['day']) primary_key = (temp_pivotValue, string_start) if primary_key in full_records: @@ -476,9 +482,8 @@ def sync_ad_analytics(self, client, catalog, last_datetime, date_window_size, pa """ Sync method for ad_analytics_by_campaign, ad_analytics_by_creative """ - # LinkedIn has a max of 20 fields per request. We cap the chunks at 17 - # to make sure there's always room for us to append `dateRange`, - # `pivot`, and `pivotValue` + # LinkedIn has a max of 20 fields per request. We cap the chunks at 18 + # to make sure there's always room for us to append `dateRange`, and `pivotValues` MAX_CHUNK_LENGTH = 17 bookmark_field = next(iter(self.replication_keys)) @@ -516,7 +521,7 @@ def sync_ad_analytics(self, client, catalog, last_datetime, date_window_size, pa # (even if this means the values are all `0`) and a day with null # values. We found that requesting these fields gives you the days with # non-null values - first_chunk = [['dateRange', 'pivot', 'pivotValue']] + first_chunk = [['dateRange', 'pivotValues']] chunks = first_chunk + list(split_into_chunks(valid_selected_fields, MAX_CHUNK_LENGTH)) @@ -524,7 +529,7 @@ def sync_ad_analytics(self, client, catalog, last_datetime, date_window_size, pa # so that we can create the composite primary key for the record and # to merge the multiple responses based on this primary key for chunk in chunks: - for field in ['dateRange', 'pivot', 'pivotValue']: + for field in ['dateRange', 'pivotValues']: if field not in chunk: chunk.append(field) @@ -549,7 +554,8 @@ def sync_ad_analytics(self, client, catalog, last_datetime, date_window_size, pa for page in sync_analytics_endpoint(client, self.tap_stream_id, self.path, query_string): if page.get(self.data_key): responses.append(page.get(self.data_key)) - raw_records = merge_responses(responses) + pivot = params["pivot"] if "pivot" in params.keys() else None + raw_records = merge_responses(pivot, responses) time_extracted = utils.now() # While we broke the ad_analytics streams out from From d4e6eee1170b811bb42aab7f2800d33f85ff07b5 Mon Sep 17 00:00:00 2001 From: Sourabh Gandhi Date: Sat, 13 Apr 2024 22:35:59 +0530 Subject: [PATCH 07/12] comment video_ads for the time being --- tap_linkedin_ads/streams.py | 2 +- tests/base.py | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/tap_linkedin_ads/streams.py b/tap_linkedin_ads/streams.py index 7d37955..2fb42c3 100644 --- a/tap_linkedin_ads/streams.py +++ b/tap_linkedin_ads/streams.py @@ -736,7 +736,7 @@ class AdAnalyticsByCreative(LinkedInAds): # Dictionary of the stream classes STREAMS = { "accounts": Accounts, - "video_ads": VideoAds, + # "video_ads": VideoAds, "account_users": AccountUsers, "campaign_groups": CampaignGroups, "campaigns": Campaigns, diff --git a/tests/base.py b/tests/base.py index dbc018a..f437720 100644 --- a/tests/base.py +++ b/tests/base.py @@ -81,7 +81,7 @@ def get_credentials(self): def expected_check_streams(): return { 'accounts', - 'video_ads', + # 'video_ads', 'account_users', 'campaign_groups', 'campaigns', @@ -100,12 +100,12 @@ def expected_metadata(self): self.OBEYS_START_DATE: True, self.REPLICATION_KEYS: {'last_modified_time'} }, - 'video_ads': { - self.PRIMARY_KEYS: {'content_reference'}, - self.REPLICATION_METHOD: self.INCREMENTAL, - self.OBEYS_START_DATE: True, - self.REPLICATION_KEYS: {'last_modified_time'} - }, + # 'video_ads': { + # self.PRIMARY_KEYS: {'content_reference'}, + # self.REPLICATION_METHOD: self.INCREMENTAL, + # self.OBEYS_START_DATE: True, + # self.REPLICATION_KEYS: {'last_modified_time'} + # }, 'account_users': { self.PRIMARY_KEYS: {'account_id', 'user_person_id'}, self.REPLICATION_METHOD: self.INCREMENTAL, From 93d6cf5caee3fd08e5ec0bcb9ec8c4f33154f1a8 Mon Sep 17 00:00:00 2001 From: Sourabh Gandhi Date: Sun, 14 Apr 2024 13:44:51 +0530 Subject: [PATCH 08/12] exclude missing fields from the all fields test --- tests/test_all_fields.py | 52 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/tests/test_all_fields.py b/tests/test_all_fields.py index 6f86039..4af6562 100644 --- a/tests/test_all_fields.py +++ b/tests/test_all_fields.py @@ -34,6 +34,32 @@ "average_previous_thirty_day_reach_metrics", #BUG: TDL-22692 "approximate_unique_impressions", + #BUG: TDL-25618 + 'job_applications', + 'viral_post_view_job_applications', + 'post_click_job_applications', + 'external_website_post_view_conversions', + 'viral_external_website_conversions', + 'viral_post_view_registrations', + 'talent_leads', + 'viral_registrations', + 'post_view_job_applications', + 'viral_external_website_post_click_conversions', + 'post_click_registrations', + 'job_apply_clicks', + 'conversion_value_in_local_currency', + 'registrations', + 'post_view_registrations', + 'viral_post_view_job_apply_clicks', + 'viral_external_website_post_view_conversions', + 'viral_post_click_job_apply_clicks', + 'viral_job_apply_clicks', + 'viral_job_applications', + 'viral_post_click_job_applications', + 'viral_post_click_registrations', + 'external_website_post_click_conversions', + 'post_view_job_apply_clicks', + 'post_click_job_apply_clicks' }, "ad_analytics_by_campaign": { "average_daily_reach_metrics" @@ -41,6 +67,32 @@ "average_previous_thirty_day_reach_metrics", #BUG: TDL-22692 "approximate_unique_impressions", + #BUG: TDL-25618 + 'job_applications', + 'viral_post_view_job_applications', + 'post_click_job_applications', + 'external_website_post_view_conversions', + 'viral_external_website_conversions', + 'viral_post_view_registrations', + 'talent_leads', + 'viral_registrations', + 'post_view_job_applications', + 'viral_external_website_post_click_conversions', + 'post_click_registrations', + 'job_apply_clicks', + 'conversion_value_in_local_currency', + 'registrations', + 'post_view_registrations', + 'viral_post_view_job_apply_clicks', + 'viral_external_website_post_view_conversions', + 'viral_post_click_job_apply_clicks', + 'viral_job_apply_clicks', + 'viral_job_applications', + 'viral_post_click_job_applications', + 'viral_post_click_registrations', + 'external_website_post_click_conversions', + 'post_view_job_apply_clicks', + 'post_click_job_apply_clicks' }, } From 1ec1015a213609598638e58479d66225686d04ef Mon Sep 17 00:00:00 2001 From: Vishal Date: Mon, 15 Apr 2024 00:50:08 +0530 Subject: [PATCH 09/12] Upgrade api version 202403 video ads (#71) * added video-ads implementation * fixed pylint issues * added handler for missing permission, removed test for skip condition * updated message * fstring -> format for 3.6 compatibility --- tap_linkedin_ads/schemas/video_ads.json | 180 ++++++++++++++++++++++++ tap_linkedin_ads/streams.py | 44 +++--- tap_linkedin_ads/sync.py | 7 +- tap_linkedin_ads/transform.py | 21 +++ tests/base.py | 14 +- tests/test_all_fields.py | 1 + tests/unittests/test_streams.py | 50 ++----- 7 files changed, 251 insertions(+), 66 deletions(-) diff --git a/tap_linkedin_ads/schemas/video_ads.json b/tap_linkedin_ads/schemas/video_ads.json index adb192c..efaf754 100644 --- a/tap_linkedin_ads/schemas/video_ads.json +++ b/tap_linkedin_ads/schemas/video_ads.json @@ -11,6 +11,186 @@ "string" ] }, + "lifecycle_state": { + "type": [ + "null", + "string" + ] + }, + "visibility": { + "type": [ + "null", + "string" + ] + }, + "published_at": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "author": { + "type": [ + "null", + "string" + ] + }, + "content_call_to_action_label": { + "type": [ + "null", + "string" + ] + }, + "distribution": { + "type": [ + "null", + "object" + ], + "additionalProperties": false, + "properties": { + "feed_distribution": { + "type": [ + "null", + "string" + ] + }, + "third_party_distribution_channels": { + "type": [ + "null", + "array" + ], + "items": { + "type": [ + "null", + "string" + ] + } + } + } + }, + "content": { + "type": [ + "null", + "object" + ], + "additionalProperties": false, + "properties": { + "media": { + "type": [ + "null", + "object" + ], + "additionalProperties": false, + "properties": { + "title": { + "type": [ + "null", + "string" + ] + }, + "id": { + "type": [ + "null", + "string" + ] + } + } + } + } + }, + "content_landing_page": { + "type": [ + "null", + "string" + ] + }, + "lifecycle_state_info": { + "type": [ + "null", + "object" + ], + "additionalProperties": false, + "properties": { + "is_edited_by_author": { + "type": [ + "null", + "boolean" + ] + } + } + }, + "is_reshare_disabled_by_author": { + "type": [ + "null", + "boolean" + ] + }, + "created_at": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "last_modified_at": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "id": { + "type": [ + "null", + "string" + ] + }, + "commentary": { + "type": [ + "null", + "string" + ] + }, + "ad_context": { + "type": [ + "null", + "object" + ], + "additionalProperties": false, + "properties": { + "dsc_status": { + "type": [ + "null", + "string" + ] + }, + "dsc_name": { + "type": [ + "null", + "string" + ] + }, + "dsc_ad_type": { + "type": [ + "null", + "string" + ] + }, + "is_dsc": { + "type": [ + "null", + "boolean" + ] + }, + "dsc_ad_account": { + "type": [ + "null", + "string" + ] + } + } + }, "account_id": { "type": [ "null", diff --git a/tap_linkedin_ads/streams.py b/tap_linkedin_ads/streams.py index 2fb42c3..745f567 100644 --- a/tap_linkedin_ads/streams.py +++ b/tap_linkedin_ads/streams.py @@ -92,12 +92,11 @@ def get_next_url(stream_name, next_url, data): next_page_token = data.get('metadata', {}).get('nextPageToken', None) if next_page_token: if 'pageToken=' in next_url: - next_url = re.sub(r'pageToken=[^&]+', f'pageToken={next_page_token}', next_url) + next_url = re.sub(r'pageToken=[^&]+', 'pageToken={}'.format(next_page_token), next_url) else: - next_url = f"{next_url}&pageToken={next_page_token}" + next_url = next_url + "&pageToken={}".format(next_page_token) else: next_url = None - return next_url else: next_url = None links = data.get('paging', {}).get('links', []) @@ -112,7 +111,7 @@ def get_next_url(stream_name, next_url, data): return 'https://api.linkedin.com{}'.format(href) # Prepare next page URL next_url = 'https://api.linkedin.com{}'.format(urllib.parse.unquote(href)) - return next_url + return next_url def shift_sync_window(params, today, date_window_size, forced_window_size=None): """ @@ -130,7 +129,6 @@ def shift_sync_window(params, today, date_window_size, forced_window_size=None): 'dateRange.start.day': current_end.day, 'dateRange.start.month': current_end.month, 'dateRange.start.year': current_end.year, - 'dateRange.end.day': new_end.day, 'dateRange.end.month': new_end.month, 'dateRange.end.year': new_end.year,} @@ -279,7 +277,7 @@ def process_records(self, return max_bookmark_value, counter.value - # pylint: disable=too-many-branches,too-many-statements,too-many-arguments,too-many-locals + # pylint: disable=too-many-branches,too-many-statements,too-many-arguments,too-many-locals,too-many-nested-blocks def sync_endpoint(self, client, catalog, @@ -344,10 +342,13 @@ def sync_endpoint(self, urllist = [] if self.tap_stream_id in NEW_PATH_STREAMS: for account in account_list: - url = f"{BASE_URL}/adAccounts/{account}/{self.path}?{querystring}" + url = "{}/adAccounts/{}/{}?{}".format(BASE_URL, account, self.path, querystring) urllist.append((account, url)) else: - url = 'https://api.linkedin.com/rest/{}?{}'.format(self.path, querystring) + if self.path == 'posts': + url = '{}/{}?{}&dscAdAccount=urn%3Ali%3AsponsoredAccount%3A{}'.format(BASE_URL, self.path, querystring, parent_id) + else: + url = '{}/{}?{}'.format(BASE_URL, self.path, querystring) urllist.append((None, url)) for acct_id, next_url in urllist: @@ -403,14 +404,6 @@ def sync_endpoint(self, # Add children filter params based on parent IDs if self.tap_stream_id == 'accounts': account = 'urn:li:sponsoredAccount:{}'.format(parent_id) - owner_id = record.get('reference_organization_id', None) - owner = 'urn:li:organization:{}'.format(owner_id) - if child_stream_name == 'video_ads' and owner_id is not None: - child_stream_params['account'] = account - child_stream_params['owner'] = owner - else: - LOGGER.warning("Skipping video_ads call for %s account as reference_organization_id is not found.", account) - continue elif self.tap_stream_id == 'campaigns': campaign = 'urn:li:sponsoredCampaign:{}'.format(parent_id) if child_stream_name == 'creatives': @@ -616,12 +609,25 @@ class VideoAds(LinkedInAds): replication_method = "INCREMENTAL" key_properties = ["content_reference"] foreign_key = "id" - path = "adDirectSponsoredContents" + path = "posts" data_key = "elements" parent = "accounts" params = { - "q": "account" + "q": "dscAdAccount", + "dscAdTypes": "List(VIDEO)" } + headers = {'X-Restli-Protocol-Version': "2.0.0"} + + def sync_endpoint(self, *args, **kwargs): + try: + return super().sync_endpoint(*args, **kwargs) + except Exception as error: + if "Not enough permissions to access: partnerApiPostsExternal" in str(error): + LOGGER.info("Access to the video-ads API is denied due to insufficient permissions. Please reauthenticate or verify the required permissions.") + LOGGER.error(error) + # total record count (zero), initial bookmark returned to supress this failure + return 0, self.get_bookmark(kwargs.get("state"), kwargs.get("start_date")) + raise error class AccountUsers(LinkedInAds): """ @@ -736,7 +742,7 @@ class AdAnalyticsByCreative(LinkedInAds): # Dictionary of the stream classes STREAMS = { "accounts": Accounts, - # "video_ads": VideoAds, + "video_ads": VideoAds, "account_users": AccountUsers, "campaign_groups": CampaignGroups, "campaigns": Campaigns, diff --git a/tap_linkedin_ads/sync.py b/tap_linkedin_ads/sync.py index ad01778..2e85b6e 100644 --- a/tap_linkedin_ads/sync.py +++ b/tap_linkedin_ads/sync.py @@ -109,15 +109,14 @@ def sync(client, config, catalog, state): params = stream_obj.params if account_filter == 'search_id_values_param': # Convert account IDs to URN format - urn_list = [f"urn%3Ali%3AsponsoredAccount%3A{account_id}" for account_id in account_list] + urn_list = ["urn%3Ali%3AsponsoredAccount%3A{}".format(account_id) for account_id in account_list] # Create the query parameter string - param_value = f"(id:(values:List({','.join(urn_list)})))" + param_value = "(id:(values:List({})))".format(','.join(urn_list)) params['search'] = param_value elif account_filter == 'accounts_param': for idx, account in enumerate(account_list): params['accounts[{}]'.format(idx)] = \ 'urn:li:sponsoredAccount:{}'.format(account) - # Update params of specific stream stream_obj.params = params @@ -129,7 +128,7 @@ def sync(client, config, catalog, state): stream_obj.write_schema(catalog) total_records, max_bookmark_value = stream_obj.sync_endpoint( - client=client, catalog=catalog, + client=client, catalog=catalog, state=state, page_size=page_size, start_date=start_date, selected_streams=selected_streams, diff --git a/tap_linkedin_ads/transform.py b/tap_linkedin_ads/transform.py index 02c4e20..948a333 100644 --- a/tap_linkedin_ads/transform.py +++ b/tap_linkedin_ads/transform.py @@ -294,6 +294,25 @@ def transform_urn(data_dict): return data_dict +def transform_video_ads(data_dict): + # pylint: disable=fixme + # TODO: To be removed in next major version release + if 'author' in data_dict: + data_dict['owner'] = data_dict["author"] + if 'id' in data_dict: + data_dict['content_reference'] = data_dict["id"] + if 'ad_context' in data_dict: + if 'dsc_name' in data_dict['ad_context']: + data_dict['name'] = data_dict["ad_context"]['dsc_name'] + if 'dsc_ad_type' in data_dict['ad_context']: + data_dict['type'] = data_dict["ad_context"]['dsc_ad_type'] + if 'dsc_ad_account' in data_dict['ad_context']: + data_dict['account'] = data_dict["ad_context"]['dsc_ad_account'] + if 'last_modified_at' in data_dict: + data_dict['last_modified_time'] = data_dict["last_modified_at"] + if 'created_at' in data_dict: + data_dict['created_time'] = data_dict["created_at"] + return data_dict def transform_data(data_dict, stream_name): new_dict = data_dict @@ -308,6 +327,8 @@ def transform_data(data_dict, stream_name): this_dict = transform_campaigns(this_dict) elif stream_name == 'creatives': this_dict = transform_creatives(this_dict) + elif stream_name == 'video_ads': + this_dict = transform_video_ads(this_dict) this_dict = transform_urn(this_dict) this_dict = transform_audit_fields(this_dict) diff --git a/tests/base.py b/tests/base.py index f437720..dbc018a 100644 --- a/tests/base.py +++ b/tests/base.py @@ -81,7 +81,7 @@ def get_credentials(self): def expected_check_streams(): return { 'accounts', - # 'video_ads', + 'video_ads', 'account_users', 'campaign_groups', 'campaigns', @@ -100,12 +100,12 @@ def expected_metadata(self): self.OBEYS_START_DATE: True, self.REPLICATION_KEYS: {'last_modified_time'} }, - # 'video_ads': { - # self.PRIMARY_KEYS: {'content_reference'}, - # self.REPLICATION_METHOD: self.INCREMENTAL, - # self.OBEYS_START_DATE: True, - # self.REPLICATION_KEYS: {'last_modified_time'} - # }, + 'video_ads': { + self.PRIMARY_KEYS: {'content_reference'}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.OBEYS_START_DATE: True, + self.REPLICATION_KEYS: {'last_modified_time'} + }, 'account_users': { self.PRIMARY_KEYS: {'account_id', 'user_person_id'}, self.REPLICATION_METHOD: self.INCREMENTAL, diff --git a/tests/test_all_fields.py b/tests/test_all_fields.py index 4af6562..1744d20 100644 --- a/tests/test_all_fields.py +++ b/tests/test_all_fields.py @@ -22,6 +22,7 @@ "video_ads": { "content_reference_share_id", "content_reference_ucg_post_id", + "change_audit_stamps" }, "accounts": { "total_budget_ends_at", diff --git a/tests/unittests/test_streams.py b/tests/unittests/test_streams.py index 8a69193..5dfab8f 100644 --- a/tests/unittests/test_streams.py +++ b/tests/unittests/test_streams.py @@ -70,7 +70,7 @@ class TestStreamsUtils(unittest.TestCase): """ Test all utility functions of streams module """ - + def test_split_into_chunks(self): """ Test that `test_split_into_chunks` split 65 fields into 4 chunk of MAX_CHUNK_LENGTH @@ -126,11 +126,11 @@ def test_sync_analytics_endpoint(self, name, next_url, expected_call_count, mock mock_next_url.side_effect = next_url client = _client.LinkedinClient('client_id', 'client_secret', 'refresh_token', 'access_token', 'config_path') data = list(sync_analytics_endpoint(client, "stream", "path", "query=query")) - + # Verify that get method of client is called expected times. self.assertEqual(expected_call_count, mock_get.call_count) - - + + @parameterized.expand([ ["test_single_page", [], None], ["test_multiple_page", [{'rel': 'next', 'href': '/foo'}], 'https://api.linkedin.com/foo'] @@ -168,13 +168,13 @@ def test_shift_sync_window(self, name, today_month, today_date): 'dateRange.end.month': expected_end_date.month, 'dateRange.end.day': expected_end_date.day, } - + params = { 'dateRange.end.year': 2020, 'dateRange.end.month': 10, 'dateRange.end.day': 1, } - + today = datetime.date(year=2020, month=today_month, day=today_date) actual_start_date, actual_end_date, actual_params = shift_sync_window(params, today, 30) @@ -282,11 +282,11 @@ def test_get_bookmark(self, name, state, expected_output): Case 3: Return default value if stream_name is not found in the bookmarks Cas 4: Return actual bookmark value if it is found in the state """ - + actual_output = ACCOUNT_OBJ.get_bookmark(state, "default") - + self.assertEqual(expected_output, actual_output) - + @parameterized.expand([ ['test_zero_records', ACCOUNT_OBJ, [], "last_modified_time", "2021-07-20T08:50:30.169000Z", 0], ['test_zero_latest_records', ACCOUNT_OBJ, [{'id': 1, 'last_modified_time': '2021-06-26T08:50:30.169000Z'}], "last_modified_time", "2021-07-20T08:50:30.169000Z", 0], @@ -300,7 +300,7 @@ def test_process_records(self, name, stream_obj, records, replication_key, expec """ max_bookmark_value = last_datetime = "2021-07-20T08:50:30.169000Z" actual_max_bookmark, actual_record_count = stream_obj.process_records(CATALOG, records, utils.now(), replication_key, max_bookmark_value, last_datetime) - + # Verify maximum bookmark and total records. self.assertEqual(expected_max_bookmark, actual_max_bookmark) self.assertEqual(expected_record_count, actual_record_count) @@ -323,7 +323,7 @@ def test_process_records(self, name, stream_obj, records, replication_key, expec ['test_only_parent_selcted_stream', ['campaigns'], CAMPAIGN_OBJ, [{'paging': {'start': 0, 'count': 100, 'links': [], 'total': 1},'elements': [{'changeAuditStamps': {'created': {'time': 1564585620000}, 'lastModified': {'time': 1564585620000}}, 'id': 1}]}], 0, 1 - ] + ] ]) @mock.patch("tap_linkedin_ads.streams.LinkedInAds.sync_ad_analytics", return_value=(1, "2019-07-31T15:07:00.000000Z")) @mock.patch("tap_linkedin_ads.streams.LinkedInAds.get_bookmark", return_value = "2019-07-31T15:07:00.000000Z") @@ -344,7 +344,7 @@ def test_sync_endpoint(self, name, selected_streams, stream_obj, mock_response, mock_client.side_effect = mock_response mock_process_records.return_value = "2019-07-31T15:07:00.000000Z",1 actual_total_record, actual_max_bookmark = stream_obj.sync_endpoint(client, CATALOG, state, page_size, start_date, selected_streams, date_window_size) - + # Verify total no of records self.assertEqual(actual_total_record, mock_record_count) # Verify maximum bookmark @@ -352,28 +352,6 @@ def test_sync_endpoint(self, name, selected_streams, stream_obj, mock_response, # Verify total no of write_schema function call. sync_endpoint calls write_schema single time for each child. self.assertEqual(mock_write_schema.call_count, expected_write_schema_count) - @mock.patch("tap_linkedin_ads.sync.LOGGER.warning") - @mock.patch("tap_linkedin_ads.streams.LinkedInAds.get_bookmark", return_value = "2019-07-31T15:07:00.000000Z") - @mock.patch("tap_linkedin_ads.client.LinkedinClient.request") - @mock.patch("tap_linkedin_ads.streams.LinkedInAds.process_records") - @mock.patch("tap_linkedin_ads.streams.LinkedInAds.write_schema") - def test_sync_endpoint_for_reference_organization_id_is_None(self, mock_write_schema,mock_process_records,mock_client,mock_get_bookmark, - mock_warning): - """ - Verify that tap skips API call for video_ads stream if owner_id in the parent's record is None. - """ - client = LinkedinClient('client_id', 'client_secret', 'refresh_token', 'access_token', 'config_path') - state={'currently_syncing': 'accounts'} - start_date='2019-06-01T00:00:00Z' - page_size = 100 - date_window_size = 7 - selected_streams = ['accounts', 'video_ads'] - - mock_client.side_effect = [{'paging': {'start': 0, 'count': 100, 'links': [], 'total': 1},'elements': [{'changeAuditStamps': {'created': {'time': 1564585620000}, 'lastModified': {'time': 1564585620000}}, 'id': 1}]}] - mock_process_records.return_value = "2019-07-31T15:07:00.000000Z",1 - ACCOUNT_OBJ.sync_endpoint(client, CATALOG, state, page_size, start_date, selected_streams, date_window_size) - - mock_warning.assert_called_with('Skipping video_ads call for %s account as reference_organization_id is not found.', 'urn:li:sponsoredAccount:1') @parameterized.expand([ @@ -398,7 +376,7 @@ def test_sync_ad_analytics(self, name, expected_record_count, expected_max_bookm mock_transform.return_value = mock_tranform_data mock_process_record.return_value = (expected_max_bookmark, expected_record_count) actual_record_count, actual_max_bookmark = AD_ANALYTICS_BY_CAMPAIGN.sync_ad_analytics(client, CATALOG, bookmark, date_window_size) - + # Verify maximum bookmark self.assertEqual(actual_max_bookmark, expected_max_bookmark) # Verify total no of records @@ -423,5 +401,5 @@ def test_write_record(self, mock_logger, mock_write_record): """ with self.assertRaises(OSError) as e: ACCOUNT_OBJ.write_record([], '') - + mock_logger.assert_called_with('record: %s', []) From bcf7a97d90bd60ecdd62a0877b593b4849e20e5d Mon Sep 17 00:00:00 2001 From: Sourabh Gandhi Date: Mon, 15 Apr 2024 02:47:21 +0530 Subject: [PATCH 10/12] unit test fix --- tests/unittests/test_streams.py | 84 +++++++++++++-------------------- tests/unittests/test_sync.py | 2 +- 2 files changed, 35 insertions(+), 51 deletions(-) diff --git a/tests/unittests/test_streams.py b/tests/unittests/test_streams.py index 5dfab8f..5dd983b 100644 --- a/tests/unittests/test_streams.py +++ b/tests/unittests/test_streams.py @@ -124,7 +124,7 @@ def test_sync_analytics_endpoint(self, name, next_url, expected_call_count, mock Test that sync_analytics_endpoint function works properly for single page as well as multiple pages. """ mock_next_url.side_effect = next_url - client = _client.LinkedinClient('client_id', 'client_secret', 'refresh_token', 'access_token', 'config_path') + client = _client.LinkedinClient('client_id', 'client_secret', 'refresh_token', 'access_token', 'config_path') data = list(sync_analytics_endpoint(client, "stream", "path", "query=query")) # Verify that get method of client is called expected times. @@ -135,17 +135,18 @@ def test_sync_analytics_endpoint(self, name, next_url, expected_call_count, mock ["test_single_page", [], None], ["test_multiple_page", [{'rel': 'next', 'href': '/foo'}], 'https://api.linkedin.com/foo'] ]) - def test_get_next_url(self, name, links, expected_url): + def test_get_next_url_index_pagination(self, name, links, expected_url): """ - Test that get_next_url return link of next page in case of 'href' + Test that get_next_url return link of next page in case of 'href' """ data = { 'paging': { 'links': links } } - - actual_url = get_next_url(data) + mock_next_url = "initial_url" + mock_stream_name = "account_users" + actual_url = get_next_url(mock_stream_name, mock_next_url, data) # Verify the next page url self.assertEqual(expected_url, actual_url) @@ -189,36 +190,32 @@ def test_merge_responses_no_overlap(self): Test merge_response function with records of unique date range value. """ expected_output = { - ('urn:li:sponsoredCampaign:123456789', '2020-10-1') : {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 1}}, - 'a': 1, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, - ('urn:li:sponsoredCampaign:123456789', '2020-10-2') : {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 2}}, - 'b': 2, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, - ('urn:li:sponsoredCampaign:123456789', '2020-10-3') : {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 3}}, - 'c': 3, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, - ('urn:li:sponsoredCampaign:123456789', '2020-10-4') : {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 4}}, - 'd': 4, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, - ('urn:li:sponsoredCampaign:123456789', '2020-10-5') : {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 5}}, - 'e': 5, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, - ('urn:li:sponsoredCampaign:123456789', '2020-10-6') : {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 6}}, - 'f': 6, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, + ('urn:li:sponsoredCampaign:123456789', '2020-10-1'): {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 1}}, 'a': 1, 'pivotValues': ['urn:li:sponsoredCampaign:123456789'], 'pivot': 'CAMPAIGNS', 'pivot_value': 'urn:li:sponsoredCampaign:123456789'}, + ('urn:li:sponsoredCampaign:123456789', '2020-10-2'): {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 2}}, 'b': 2, 'pivotValues': ['urn:li:sponsoredCampaign:123456789'], 'pivot': 'CAMPAIGNS', 'pivot_value': 'urn:li:sponsoredCampaign:123456789'}, + ('urn:li:sponsoredCampaign:123456789', '2020-10-3'): {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 3}}, 'c': 3, 'pivotValues': ['urn:li:sponsoredCampaign:123456789'], 'pivot': 'CAMPAIGNS', 'pivot_value': 'urn:li:sponsoredCampaign:123456789'}, + ('urn:li:sponsoredCampaign:123456789', '2020-10-4'): {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 4}}, 'd': 4, 'pivotValues': ['urn:li:sponsoredCampaign:123456789'], 'pivot': 'CAMPAIGNS', 'pivot_value': 'urn:li:sponsoredCampaign:123456789'}, + ('urn:li:sponsoredCampaign:123456789', '2020-10-5'): {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 5}}, 'e': 5, 'pivotValues': ['urn:li:sponsoredCampaign:123456789'], 'pivot': 'CAMPAIGNS', 'pivot_value': 'urn:li:sponsoredCampaign:123456789'}, + ('urn:li:sponsoredCampaign:123456789', '2020-10-6'): {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 6}}, 'f': 6, 'pivotValues': ['urn:li:sponsoredCampaign:123456789'], 'pivot': 'CAMPAIGNS', 'pivot_value': 'urn:li:sponsoredCampaign:123456789'} } data = [ [{'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 1}}, - 'a': 1, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, + 'a': 1, 'pivotValues': ['urn:li:sponsoredCampaign:123456789']}, {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 2}}, - 'b': 2, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, + 'b': 2, 'pivotValues': ['urn:li:sponsoredCampaign:123456789']}, {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 3}}, - 'c': 3, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'},], + 'c': 3, 'pivotValues': ['urn:li:sponsoredCampaign:123456789']},], [{'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 4}}, - 'd': 4, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, + 'd': 4, 'pivotValues': ['urn:li:sponsoredCampaign:123456789']}, {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 5}}, - 'e': 5, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, + 'e': 5, 'pivotValues': ['urn:li:sponsoredCampaign:123456789']}, {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 6}}, - 'f': 6, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'},], + 'f': 6, 'pivotValues': ['urn:li:sponsoredCampaign:123456789']},], ] - actual_output = merge_responses(data) + mock_pivot = "CAMPAIGNS" + actual_output = merge_responses(mock_pivot, data) + print(actual_output) self.assertEqual(expected_output, actual_output) @@ -228,38 +225,24 @@ def test_merge_responses_with_overlap(self): """ data = [ [{'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 1}}, - 'a': 1, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, + 'a': 1, 'pivotValues': ['urn:li:sponsoredCampaign:123456789']}, {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 1}}, - 'b': 7, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, + 'b': 7, 'pivotValues': ['urn:li:sponsoredCampaign:123456789']}, {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 2}}, - 'b': 2, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, + 'b': 2, 'pivotValues': ['urn:li:sponsoredCampaign:123456789']}, {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 3}}, - 'c': 3, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'},], + 'c': 3, 'pivotValues': ['urn:li:sponsoredCampaign:123456789']},], [{'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 4}}, - 'd': 4, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, + 'd': 4, 'pivotValues': ['urn:li:sponsoredCampaign:123456789']}, {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 5}}, - 'e': 5, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, + 'e': 5, 'pivotValues': ['urn:li:sponsoredCampaign:123456789']}, {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 6}}, - 'f': 6, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'},], + 'f': 6, 'pivotValues': ['urn:li:sponsoredCampaign:123456789']},], ] - expected_output = { - ('urn:li:sponsoredCampaign:123456789', '2020-10-1') : {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 1}}, - 'a': 1, 'b': 7, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, - ('urn:li:sponsoredCampaign:123456789', '2020-10-2') : {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 2}}, - 'b': 2, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, - ('urn:li:sponsoredCampaign:123456789', '2020-10-3') : {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 3}}, - 'c': 3, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, - ('urn:li:sponsoredCampaign:123456789', '2020-10-4') : {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 4}}, - 'd': 4, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, - ('urn:li:sponsoredCampaign:123456789', '2020-10-5') : {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 5}}, - 'e': 5, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, - ('urn:li:sponsoredCampaign:123456789', '2020-10-6') : {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 6}}, - 'f': 6, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, - } - - actual_output = merge_responses(data) - + expected_output = {('urn:li:sponsoredCampaign:123456789', '2020-10-4'): {'pivot': 'CAMPAIGNS', 'pivotValues': ['urn:li:sponsoredCampaign:123456789'], 'dateRange': {'start': {'month': 10, 'day': 4, 'year': 2020}}, 'pivot_value': 'urn:li:sponsoredCampaign:123456789', 'd': 4}, ('urn:li:sponsoredCampaign:123456789', '2020-10-3'): {'pivot': 'CAMPAIGNS', 'c': 3, 'pivotValues': ['urn:li:sponsoredCampaign:123456789'], 'dateRange': {'start': {'month': 10, 'day': 3, 'year': 2020}}, 'pivot_value': 'urn:li:sponsoredCampaign:123456789'}, ('urn:li:sponsoredCampaign:123456789', '2020-10-1'): {'pivotValues': ['urn:li:sponsoredCampaign:123456789'], 'pivot': 'CAMPAIGNS', 'pivot_value': 'urn:li:sponsoredCampaign:123456789', 'dateRange': {'start': {'month': 10, 'day': 1, 'year': 2020}}, 'a': 1, 'b': 7}, ('urn:li:sponsoredCampaign:123456789', '2020-10-6'): {'pivot': 'CAMPAIGNS', 'f': 6, 'pivotValues': ['urn:li:sponsoredCampaign:123456789'], 'dateRange': {'start': {'month': 10, 'day': 6, 'year': 2020}}, 'pivot_value': 'urn:li:sponsoredCampaign:123456789'}, ('urn:li:sponsoredCampaign:123456789', '2020-10-5'): {'pivot': 'CAMPAIGNS', 'pivotValues': ['urn:li:sponsoredCampaign:123456789'], 'dateRange': {'start': {'month': 10, 'day': 5, 'year': 2020}}, 'pivot_value': 'urn:li:sponsoredCampaign:123456789', 'e': 5}, ('urn:li:sponsoredCampaign:123456789', '2020-10-2'): {'pivot': 'CAMPAIGNS', 'pivotValues': ['urn:li:sponsoredCampaign:123456789'], 'dateRange': {'start': {'month': 10, 'day': 2, 'year': 2020}}, 'pivot_value': 'urn:li:sponsoredCampaign:123456789', 'b': 2}} + mock_pivot = "CAMPAIGNS" + actual_output = merge_responses(mock_pivot, data) # Verify that merge_responses function merge records by primary with same date range value. self.assertEqual(expected_output, actual_output) @@ -340,10 +323,11 @@ def test_sync_endpoint(self, name, selected_streams, stream_obj, mock_response, start_date='2019-06-01T00:00:00Z' page_size = 100 date_window_size = 7 + account_list = ["12345"] mock_client.side_effect = mock_response mock_process_records.return_value = "2019-07-31T15:07:00.000000Z",1 - actual_total_record, actual_max_bookmark = stream_obj.sync_endpoint(client, CATALOG, state, page_size, start_date, selected_streams, date_window_size) + actual_total_record, actual_max_bookmark = stream_obj.sync_endpoint(client, CATALOG, state, page_size, start_date, selected_streams, date_window_size, account_list=account_list) # Verify total no of records self.assertEqual(actual_total_record, mock_record_count) @@ -363,7 +347,7 @@ def test_sync_endpoint(self, name, selected_streams, stream_obj, mock_response, @mock.patch("tap_linkedin_ads.streams.transform_json") @mock.patch("tap_linkedin_ads.streams.sync_analytics_endpoint") @mock.patch("tap_linkedin_ads.streams.merge_responses") - def test_sync_ad_analytics(self, name, expected_record_count, expected_max_bookmark, mock_tranform_data, + def test_sync_ad_analytics(self, name, expected_record_count, expected_max_bookmark, mock_tranform_data, mock_merge_response, mock_endpoint, mock_transform, mock_shift_windows, mock_process_record): """ Test that `sync_ad_analytics` function work properly for zero records as well as multiple records. diff --git a/tests/unittests/test_sync.py b/tests/unittests/test_sync.py index 7cc0137..d0fa1c1 100644 --- a/tests/unittests/test_sync.py +++ b/tests/unittests/test_sync.py @@ -162,4 +162,4 @@ def test_sync(self, name, config, expected_date_window, mock_sync_endpoint): page_size=100, start_date="2019-06-01T00:00:00Z", selected_streams=['accounts', 'video_ads', 'account_users', 'campaigns', 'ad_analytics_by_campaign'], - date_window_size=expected_date_window) \ No newline at end of file + date_window_size=expected_date_window, account_list=[config['accounts']]) From 9885719ece383662d2a30cce1ac5da47e102b986 Mon Sep 17 00:00:00 2001 From: Sourabh Gandhi Date: Mon, 15 Apr 2024 10:06:53 +0530 Subject: [PATCH 11/12] update setup.py and changelog.md --- CHANGELOG.md | 18 ++++++++++++++++++ setup.py | 2 +- tap_linkedin_ads/streams.py | 5 ++++- 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c665500..581f8bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,23 @@ # Changelog +## 2.3.0 + +### Features +- **API Updates** + - Bumped to API version `202304` ([#64](https://github.com/singer-io/tap-linkedin-ads/pull/69)) + - Updated API endpoints for the following streams: `campaign_groups`, `campaigns`, `creatives` + +- **Pagination Enhancements** + - Implemented cursor-based pagination for the following streams: `accounts`, `campaign_groups`, `campaigns`, `creatives` + +- **API Query Param Adjustments** + - Removed unsupported `pivot` and `pivotValue` from `fields` query parameters in Analytics API requests + - Incorporated these values within the tap to ensure consistency with the previous tap version + +- **Video Ads Stream** + - Added new fields ([#71](https://github.com/singer-io/tap-linkedin-ads/pull/71)) + - Requires scope - `r_organization_social` to sync the records. + ## 2.2.0 * Bump to API version `202304` diff --git a/setup.py b/setup.py index ca398a9..0e8f9e1 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup, find_packages setup(name='tap-linkedin-ads', - version='2.2.0', + version='2.3.0', description='Singer.io tap for extracting data from the LinkedIn Marketing Ads API API 2.0', author='jeff.huth@bytecode.io', classifiers=['Programming Language :: Python :: 3 :: Only'], diff --git a/tap_linkedin_ads/streams.py b/tap_linkedin_ads/streams.py index 745f567..c733fa2 100644 --- a/tap_linkedin_ads/streams.py +++ b/tap_linkedin_ads/streams.py @@ -98,6 +98,7 @@ def get_next_url(stream_name, next_url, data): else: next_url = None else: + # handles index based paination next_url = None links = data.get('paging', {}).get('links', []) for link in links: @@ -341,6 +342,8 @@ def sync_endpoint(self, urllist = [] if self.tap_stream_id in NEW_PATH_STREAMS: + # As per the latest linkedin version, few url formats are modified, it expects advertiser + # account_id in each url path for account in account_list: url = "{}/adAccounts/{}/{}?{}".format(BASE_URL, account, self.path, querystring) urllist.append((account, url)) @@ -477,7 +480,7 @@ def sync_ad_analytics(self, client, catalog, last_datetime, date_window_size, pa """ # LinkedIn has a max of 20 fields per request. We cap the chunks at 18 # to make sure there's always room for us to append `dateRange`, and `pivotValues` - MAX_CHUNK_LENGTH = 17 + MAX_CHUNK_LENGTH = 18 bookmark_field = next(iter(self.replication_keys)) From ba25b15b382d629817abdd624ee0aba3d93f0732 Mon Sep 17 00:00:00 2001 From: Sourabh Gandhi Date: Mon, 15 Apr 2024 08:28:34 +0000 Subject: [PATCH 12/12] fix review comments --- CHANGELOG.md | 1 - README.md | 5 +++-- tap_linkedin_ads/streams.py | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 581f8bc..17c96e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,7 +16,6 @@ - **Video Ads Stream** - Added new fields ([#71](https://github.com/singer-io/tap-linkedin-ads/pull/71)) - - Requires scope - `r_organization_social` to sync the records. ## 2.2.0 * Bump to API version `202304` diff --git a/README.md b/README.md index f1fcd15..e5409cf 100644 --- a/README.md +++ b/README.md @@ -31,10 +31,11 @@ This tap: - Transformations: Fields camelCase to snake_case. URNs to ids. Unix epoch millisecond integers to date-times. Audit date-times created_at and last_modified_at de-nested. String to decimal for total_budget field. - Children: video_ads -[**video_ads**](https://docs.microsoft.com/en-us/linkedin/marketing/integrations/ads/advertising-targeting/create-and-manage-video#finders) -- Endpoint: https://api.linkedin.com/rest/adDirectSponsoredContents +[**video_ads**](https://learn.microsoft.com/en-us/linkedin/marketing/community-management/shares/posts-api?view=li-lms-2024-03&tabs=curl#find-posts-by-account) +- Endpoint: https://api.linkedin.com/rest/posts - Primary key field: content_reference - Foreign keys: account_id (accounts), owner_organization_id (organizations) +- Required scope - `r_organization_social` - Replication strategy: Incremental (query all, filter results) - Filter: account (from parent account) and owner (from parent account) (see NOTE below) - Bookmark: last_modified_time (date-time) diff --git a/tap_linkedin_ads/streams.py b/tap_linkedin_ads/streams.py index c733fa2..e8e5799 100644 --- a/tap_linkedin_ads/streams.py +++ b/tap_linkedin_ads/streams.py @@ -148,7 +148,7 @@ def merge_responses(pivot, data): for element in page: temp_start = element['dateRange']['start'] temp_pivotValue = element['pivotValues'][0] - # adding pivot and pivot_value to make it compatible with the previous tap version 2.2.0 + # adding pivot and pivot_value to make it compatible with the previous tap version element['pivot'] = pivot element["pivot_value"] = temp_pivotValue string_start = '{}-{}-{}'.format(temp_start['year'], temp_start['month'], temp_start['day']) @@ -626,7 +626,7 @@ def sync_endpoint(self, *args, **kwargs): return super().sync_endpoint(*args, **kwargs) except Exception as error: if "Not enough permissions to access: partnerApiPostsExternal" in str(error): - LOGGER.info("Access to the video-ads API is denied due to insufficient permissions. Please reauthenticate or verify the required permissions.") + LOGGER.warning("Access to the video-ads API is denied due to insufficient permissions. Please reauthenticate or verify the required permissions.") LOGGER.error(error) # total record count (zero), initial bookmark returned to supress this failure return 0, self.get_bookmark(kwargs.get("state"), kwargs.get("start_date"))