diff --git a/CHANGELOG.md b/CHANGELOG.md index c665500..17c96e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,22 @@ # Changelog +## 2.3.0 + +### Features +- **API Updates** + - Bumped to API version `202304` ([#64](https://github.com/singer-io/tap-linkedin-ads/pull/69)) + - Updated API endpoints for the following streams: `campaign_groups`, `campaigns`, `creatives` + +- **Pagination Enhancements** + - Implemented cursor-based pagination for the following streams: `accounts`, `campaign_groups`, `campaigns`, `creatives` + +- **API Query Param Adjustments** + - Removed unsupported `pivot` and `pivotValue` from `fields` query parameters in Analytics API requests + - Incorporated these values within the tap to ensure consistency with the previous tap version + +- **Video Ads Stream** + - Added new fields ([#71](https://github.com/singer-io/tap-linkedin-ads/pull/71)) + ## 2.2.0 * Bump to API version `202304` diff --git a/README.md b/README.md index f1fcd15..e5409cf 100644 --- a/README.md +++ b/README.md @@ -31,10 +31,11 @@ This tap: - Transformations: Fields camelCase to snake_case. URNs to ids. Unix epoch millisecond integers to date-times. Audit date-times created_at and last_modified_at de-nested. String to decimal for total_budget field. - Children: video_ads -[**video_ads**](https://docs.microsoft.com/en-us/linkedin/marketing/integrations/ads/advertising-targeting/create-and-manage-video#finders) -- Endpoint: https://api.linkedin.com/rest/adDirectSponsoredContents +[**video_ads**](https://learn.microsoft.com/en-us/linkedin/marketing/community-management/shares/posts-api?view=li-lms-2024-03&tabs=curl#find-posts-by-account) +- Endpoint: https://api.linkedin.com/rest/posts - Primary key field: content_reference - Foreign keys: account_id (accounts), owner_organization_id (organizations) +- Required scope - `r_organization_social` - Replication strategy: Incremental (query all, filter results) - Filter: account (from parent account) and owner (from parent account) (see NOTE below) - Bookmark: last_modified_time (date-time) diff --git a/setup.py b/setup.py index ca398a9..0e8f9e1 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup, find_packages setup(name='tap-linkedin-ads', - version='2.2.0', + version='2.3.0', description='Singer.io tap for extracting data from the LinkedIn Marketing Ads API API 2.0', author='jeff.huth@bytecode.io', classifiers=['Programming Language :: Python :: 3 :: Only'], diff --git a/tap_linkedin_ads/client.py b/tap_linkedin_ads/client.py index a221d08..6c7eaa3 100644 --- a/tap_linkedin_ads/client.py +++ b/tap_linkedin_ads/client.py @@ -11,7 +11,7 @@ BASE_URL = 'https://api.linkedin.com/rest' LINKEDIN_TOKEN_URI = 'https://www.linkedin.com/oauth/v2/accessToken' INTROSPECTION_URI = 'https://www.linkedin.com/oauth/v2/introspectToken' -LINKEDIN_VERSION = '202304' +LINKEDIN_VERSION = '202403' # set default timeout of 300 seconds REQUEST_TIMEOUT = 300 diff --git a/tap_linkedin_ads/schemas/video_ads.json b/tap_linkedin_ads/schemas/video_ads.json index adb192c..efaf754 100644 --- a/tap_linkedin_ads/schemas/video_ads.json +++ b/tap_linkedin_ads/schemas/video_ads.json @@ -11,6 +11,186 @@ "string" ] }, + "lifecycle_state": { + "type": [ + "null", + "string" + ] + }, + "visibility": { + "type": [ + "null", + "string" + ] + }, + "published_at": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "author": { + "type": [ + "null", + "string" + ] + }, + "content_call_to_action_label": { + "type": [ + "null", + "string" + ] + }, + "distribution": { + "type": [ + "null", + "object" + ], + "additionalProperties": false, + "properties": { + "feed_distribution": { + "type": [ + "null", + "string" + ] + }, + "third_party_distribution_channels": { + "type": [ + "null", + "array" + ], + "items": { + "type": [ + "null", + "string" + ] + } + } + } + }, + "content": { + "type": [ + "null", + "object" + ], + "additionalProperties": false, + "properties": { + "media": { + "type": [ + "null", + "object" + ], + "additionalProperties": false, + "properties": { + "title": { + "type": [ + "null", + "string" + ] + }, + "id": { + "type": [ + "null", + "string" + ] + } + } + } + } + }, + "content_landing_page": { + "type": [ + "null", + "string" + ] + }, + "lifecycle_state_info": { + "type": [ + "null", + "object" + ], + "additionalProperties": false, + "properties": { + "is_edited_by_author": { + "type": [ + "null", + "boolean" + ] + } + } + }, + "is_reshare_disabled_by_author": { + "type": [ + "null", + "boolean" + ] + }, + "created_at": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "last_modified_at": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "id": { + "type": [ + "null", + "string" + ] + }, + "commentary": { + "type": [ + "null", + "string" + ] + }, + "ad_context": { + "type": [ + "null", + "object" + ], + "additionalProperties": false, + "properties": { + "dsc_status": { + "type": [ + "null", + "string" + ] + }, + "dsc_name": { + "type": [ + "null", + "string" + ] + }, + "dsc_ad_type": { + "type": [ + "null", + "string" + ] + }, + "is_dsc": { + "type": [ + "null", + "boolean" + ] + }, + "dsc_ad_account": { + "type": [ + "null", + "string" + ] + } + } + }, "account_id": { "type": [ "null", diff --git a/tap_linkedin_ads/streams.py b/tap_linkedin_ads/streams.py index e13ee36..e8e5799 100644 --- a/tap_linkedin_ads/streams.py +++ b/tap_linkedin_ads/streams.py @@ -1,4 +1,5 @@ import urllib.parse +import re import copy import datetime from datetime import timedelta @@ -19,8 +20,15 @@ 'endAt', 'creative', 'creativeId', + # `pivot` and `pivotValue` is not supported anymore, adding values within the code + 'pivot', + 'pivotValue' } +CURSOR_BASED_PAGINATION_STREAMS = ["accounts", "campaign_groups", "campaigns", "creatives"] +NEW_PATH_STREAMS = ["campaign_groups", "campaigns", "creatives"] +BASE_URL = 'https://api.linkedin.com/rest' + def write_bookmark(state, value, stream_name): """ Write the bookmark in the state corresponding to the stream. @@ -71,28 +79,39 @@ def sync_analytics_endpoint(client, stream_name, path, query_string): data = client.get(url=next_url, endpoint=stream_name) yield data # Fetch next page - next_url = get_next_url(data) + next_url = get_next_url(stream_name, next_url, data) LOGGER.info('%s: Synced page %s', stream_name, page) page = page + 1 -def get_next_url(data): +def get_next_url(stream_name, next_url, data): """ Prepare and return the URL to fetch the next page of records. """ - next_url = None - links = data.get('paging', {}).get('links', []) - for link in links: - rel = link.get('rel') - if rel == 'next': - href = link.get('href') - if href: - # url must be kept encoded for the creatives endpoint. - # Ref - https://learn.microsoft.com/en-us/linkedin/marketing/integrations/ads/account-structure/create-and-manage-creatives?view=li-lms-2023-01&tabs=http#sample-request-3 - if "rest/creatives" in href: - return 'https://api.linkedin.com{}'.format(href) - # Prepare next page URL - next_url = 'https://api.linkedin.com{}'.format(urllib.parse.unquote(href)) + if stream_name in CURSOR_BASED_PAGINATION_STREAMS: + next_page_token = data.get('metadata', {}).get('nextPageToken', None) + if next_page_token: + if 'pageToken=' in next_url: + next_url = re.sub(r'pageToken=[^&]+', 'pageToken={}'.format(next_page_token), next_url) + else: + next_url = next_url + "&pageToken={}".format(next_page_token) + else: + next_url = None + else: + # handles index based paination + next_url = None + links = data.get('paging', {}).get('links', []) + for link in links: + rel = link.get('rel') + if rel == 'next': + href = link.get('href') + if href: + # url must be kept encoded for the creatives endpoint. + # Ref - https://learn.microsoft.com/en-us/linkedin/marketing/integrations/ads/account-structure/create-and-manage-creatives?view=li-lms-2023-01&tabs=http#sample-request-3 + if "rest/creatives" in href: + return 'https://api.linkedin.com{}'.format(href) + # Prepare next page URL + next_url = 'https://api.linkedin.com{}'.format(urllib.parse.unquote(href)) return next_url def shift_sync_window(params, today, date_window_size, forced_window_size=None): @@ -111,13 +130,12 @@ def shift_sync_window(params, today, date_window_size, forced_window_size=None): 'dateRange.start.day': current_end.day, 'dateRange.start.month': current_end.month, 'dateRange.start.year': current_end.year, - 'dateRange.end.day': new_end.day, 'dateRange.end.month': new_end.month, 'dateRange.end.year': new_end.year,} return current_end, new_end, new_params -def merge_responses(data): +def merge_responses(pivot, data): """ Prepare map with key as primary key and value as the record itself for analytics streams. The primary key is a combination of pivotValue and start date fields value. @@ -129,7 +147,10 @@ def merge_responses(data): # Loop through each record of the page for element in page: temp_start = element['dateRange']['start'] - temp_pivotValue = element['pivotValue'] + temp_pivotValue = element['pivotValues'][0] + # adding pivot and pivot_value to make it compatible with the previous tap version + element['pivot'] = pivot + element["pivot_value"] = temp_pivotValue string_start = '{}-{}-{}'.format(temp_start['year'], temp_start['month'], temp_start['day']) primary_key = (temp_pivotValue, string_start) if primary_key in full_records: @@ -257,7 +278,7 @@ def process_records(self, return max_bookmark_value, counter.value - # pylint: disable=too-many-branches,too-many-statements,too-many-arguments,too-many-locals + # pylint: disable=too-many-branches,too-many-statements,too-many-arguments,too-many-locals,too-many-nested-blocks def sync_endpoint(self, client, catalog, @@ -266,7 +287,8 @@ def sync_endpoint(self, start_date, selected_streams, date_window_size, - parent_id=None): + parent_id=None, + account_list=None): """ Sync a specific parent or child endpoint. """ @@ -301,133 +323,149 @@ def sync_endpoint(self, total_records = 0 page = 1 - endpoint_params = { - 'start': start, - 'count': page_size, - **self.params # adds in endpoint specific, sort, filter params - } + if self.tap_stream_id in CURSOR_BASED_PAGINATION_STREAMS: + # hardcoding the pagesize to 1000 for stream - accounts, as search and pageToken param can't be present at the same time. + if self.tap_stream_id == "accounts": + page_size = 1000 + endpoint_params = { + 'pageSize': page_size, + **self.params + } + else: + endpoint_params = { + 'start': start, + 'count': page_size, + **self.params # adds in endpoint specific, sort, filter params + } querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in endpoint_params.items()]) - next_url = 'https://api.linkedin.com/rest/{}?{}'.format(self.path, querystring) - while next_url: #pylint: disable=too-many-nested-blocks - LOGGER.info('URL for %s: %s', self.tap_stream_id, next_url) - - # Get data, API request - data = client.get( - url=next_url, - endpoint=self.tap_stream_id, - headers=self.headers) - # time_extracted: datetime when the data was extracted from the API - time_extracted = utils.now() - - # Transform data with transform_json from transform.py - # This function converts unix datetimes, de-nests audit fields, - # tranforms URNs to IDs, tranforms/abstracts variably named fields, - # converts camelCase to snake_case for fieldname keys. - # For the Linkedin Ads API, 'elements' is always the root data_key for records. - # The data_key identifies the collection of records below the element - transformed_data = [] # initialize the record list - if self.data_key in data: - transformed_data = transform_json(data, self.tap_stream_id)[self.data_key] - if not transformed_data or transformed_data is None: - LOGGER.info('No transformed_data') - break # No data results - - pre_singer_transformed_data = copy.deepcopy(transformed_data) - if self.tap_stream_id in selected_streams: - # Process records and gets the max_bookmark_value and record_count for the set of records - max_bookmark_value, record_count = self.process_records( - catalog=catalog, - records=transformed_data, - time_extracted=time_extracted, - bookmark_field=bookmark_field, - max_bookmark_value=max_bookmark_value, - last_datetime=last_datetime, - parent_id=parent_id) - LOGGER.info('%s, records processed: %s', self.tap_stream_id, record_count) - total_records = total_records + record_count - - # Loop thru parent batch records for each children objects - for child_stream_name in children: - if child_stream_name in selected_streams: - # For each parent record - child_obj = STREAMS[child_stream_name]() - - for record in pre_singer_transformed_data: - - parent_id = record.get(child_obj.foreign_key) - - child_stream_params = child_obj.params - # Add children filter params based on parent IDs - if self.tap_stream_id == 'accounts': - account = 'urn:li:sponsoredAccount:{}'.format(parent_id) - owner_id = record.get('reference_organization_id', None) - owner = 'urn:li:organization:{}'.format(owner_id) - if child_stream_name == 'video_ads' and owner_id is not None: - child_stream_params['account'] = account - child_stream_params['owner'] = owner + urllist = [] + if self.tap_stream_id in NEW_PATH_STREAMS: + # As per the latest linkedin version, few url formats are modified, it expects advertiser + # account_id in each url path + for account in account_list: + url = "{}/adAccounts/{}/{}?{}".format(BASE_URL, account, self.path, querystring) + urllist.append((account, url)) + else: + if self.path == 'posts': + url = '{}/{}?{}&dscAdAccount=urn%3Ali%3AsponsoredAccount%3A{}'.format(BASE_URL, self.path, querystring, parent_id) + else: + url = '{}/{}?{}'.format(BASE_URL, self.path, querystring) + urllist.append((None, url)) + + for acct_id, next_url in urllist: + while next_url: #pylint: disable=too-many-nested-blocks + LOGGER.info('URL for %s: %s', self.tap_stream_id, next_url) + + # Get data, API request + data = client.get( + url=next_url, + endpoint=self.tap_stream_id, + headers=self.headers) + # time_extracted: datetime when the data was extracted from the API + time_extracted = utils.now() + + # Transform data with transform_json from transform.py + # This function converts unix datetimes, de-nests audit fields, + # tranforms URNs to IDs, tranforms/abstracts variably named fields, + # converts camelCase to snake_case for fieldname keys. + # For the Linkedin Ads API, 'elements' is always the root data_key for records. + # The data_key identifies the collection of records below the element + transformed_data = [] # initialize the record list + if self.data_key in data: + transformed_data = transform_json(data, self.tap_stream_id)[self.data_key] + if not transformed_data or transformed_data is None: + LOGGER.info('No transformed_data') + break # No data results + + pre_singer_transformed_data = copy.deepcopy(transformed_data) + if self.tap_stream_id in selected_streams: + # Process records and gets the max_bookmark_value and record_count for the set of records + max_bookmark_value, record_count = self.process_records( + catalog=catalog, + records=transformed_data, + time_extracted=time_extracted, + bookmark_field=bookmark_field, + max_bookmark_value=max_bookmark_value, + last_datetime=last_datetime, + parent_id=parent_id) + LOGGER.info('%s, records processed: %s', self.tap_stream_id, record_count) + total_records = total_records + record_count + + # Loop thru parent batch records for each children objects + for child_stream_name in children: + if child_stream_name in selected_streams: + # For each parent record + child_obj = STREAMS[child_stream_name]() + + for record in pre_singer_transformed_data: + + parent_id = record.get(child_obj.foreign_key) + + child_stream_params = child_obj.params + # Add children filter params based on parent IDs + if self.tap_stream_id == 'accounts': + account = 'urn:li:sponsoredAccount:{}'.format(parent_id) + elif self.tap_stream_id == 'campaigns': + campaign = 'urn:li:sponsoredCampaign:{}'.format(parent_id) + if child_stream_name == 'creatives': + # The value of the campaigns in the query params should be passed in the encoded format. + # Ref - https://learn.microsoft.com/en-us/linkedin/marketing/integrations/ads/account-structure/create-and-manage-creatives?view=li-lms-2023-01&tabs=http#sample-request-3 + child_stream_params['campaigns'] = 'List(urn%3Ali%3AsponsoredCampaign%3A{})'.format(parent_id) + elif child_stream_name in ('ad_analytics_by_campaign', 'ad_analytics_by_creative'): + child_stream_params['campaigns[0]'] = campaign + + # Update params for the child stream + child_obj.params = child_stream_params + LOGGER.info('Syncing: %s, parent_stream: %s, parent_id: %s', + child_stream_name, + self.tap_stream_id, + parent_id) + + # Call sync method for the child stream + if child_stream_name in {'ad_analytics_by_campaign', 'ad_analytics_by_creative'}: + child_total_records, child_batch_bookmark_value = child_obj.sync_ad_analytics( + client=client, + catalog=catalog, + last_datetime=child_obj.get_bookmark(state, start_date), + date_window_size=date_window_size, + parent_id=parent_id) else: - LOGGER.warning("Skipping video_ads call for %s account as reference_organization_id is not found.", account) - continue - elif self.tap_stream_id == 'campaigns': - campaign = 'urn:li:sponsoredCampaign:{}'.format(parent_id) - if child_stream_name == 'creatives': - # The value of the campaigns in the query params should be passed in the encoded format. - # Ref - https://learn.microsoft.com/en-us/linkedin/marketing/integrations/ads/account-structure/create-and-manage-creatives?view=li-lms-2023-01&tabs=http#sample-request-3 - child_stream_params['campaigns'] = 'List(urn%3Ali%3AsponsoredCampaign%3A{})'.format(parent_id) - elif child_stream_name in ('ad_analytics_by_campaign', 'ad_analytics_by_creative'): - child_stream_params['campaigns[0]'] = campaign - - # Update params for the child stream - child_obj.params = child_stream_params - LOGGER.info('Syncing: %s, parent_stream: %s, parent_id: %s', - child_stream_name, - self.tap_stream_id, - parent_id) - - # Call sync method for the child stream - if child_stream_name in {'ad_analytics_by_campaign', 'ad_analytics_by_creative'}: - child_total_records, child_batch_bookmark_value = child_obj.sync_ad_analytics( - client=client, - catalog=catalog, - last_datetime=child_obj.get_bookmark(state, start_date), - date_window_size=date_window_size, - parent_id=parent_id) - else: - child_total_records, child_batch_bookmark_value = child_obj.sync_endpoint( - client=client, - catalog=catalog, - state=state, - page_size=page_size, - start_date=start_date, - selected_streams=selected_streams, - date_window_size=date_window_size, - parent_id=parent_id) - - child_batch_bookmark_dttm = strptime_to_utc(child_batch_bookmark_value) - child_max_bookmark = child_max_bookmarks.get(child_stream_name) - child_max_bookmark_dttm = strptime_to_utc(child_max_bookmark) - if child_batch_bookmark_dttm > child_max_bookmark_dttm: - # Update bookmark for child stream. - child_max_bookmarks[child_stream_name] = strftime(child_batch_bookmark_dttm) - - LOGGER.info('Synced: %s, parent_id: %s, total_records: %s', - child_stream_name, - parent_id, - child_total_records) - LOGGER.info('FINISHED Syncing: %s', child_stream_name) - - # Pagination: Get next_url - next_url = get_next_url(data) - - if self.tap_stream_id in selected_streams: - LOGGER.info('%s: Synced page %s, this page: %s. Total records processed: %s', - self.tap_stream_id, - page, - record_count, - total_records) - page = page + 1 + child_total_records, child_batch_bookmark_value = child_obj.sync_endpoint( + client=client, + catalog=catalog, + state=state, + page_size=page_size, + start_date=start_date, + selected_streams=selected_streams, + date_window_size=date_window_size, + parent_id=parent_id, + account_list=[acct_id]) + + child_batch_bookmark_dttm = strptime_to_utc(child_batch_bookmark_value) + child_max_bookmark = child_max_bookmarks.get(child_stream_name) + child_max_bookmark_dttm = strptime_to_utc(child_max_bookmark) + if child_batch_bookmark_dttm > child_max_bookmark_dttm: + # Update bookmark for child stream. + child_max_bookmarks[child_stream_name] = strftime(child_batch_bookmark_dttm) + + LOGGER.info('Synced: %s, parent_id: %s, total_records: %s', + child_stream_name, + parent_id, + child_total_records) + LOGGER.info('FINISHED Syncing: %s', child_stream_name) + + # Pagination: Get next_url + next_url = get_next_url(self.tap_stream_id, next_url, data) + + if self.tap_stream_id in selected_streams: + LOGGER.info('%s: Synced page %s, this page: %s. Total records processed: %s', + self.tap_stream_id, + page, + record_count, + total_records) + page = page + 1 # Write child stream's bookmarks for key, val in list(child_max_bookmarks.items()): @@ -440,10 +478,9 @@ def sync_ad_analytics(self, client, catalog, last_datetime, date_window_size, pa """ Sync method for ad_analytics_by_campaign, ad_analytics_by_creative """ - # LinkedIn has a max of 20 fields per request. We cap the chunks at 17 - # to make sure there's always room for us to append `dateRange`, - # `pivot`, and `pivotValue` - MAX_CHUNK_LENGTH = 17 + # LinkedIn has a max of 20 fields per request. We cap the chunks at 18 + # to make sure there's always room for us to append `dateRange`, and `pivotValues` + MAX_CHUNK_LENGTH = 18 bookmark_field = next(iter(self.replication_keys)) @@ -480,7 +517,7 @@ def sync_ad_analytics(self, client, catalog, last_datetime, date_window_size, pa # (even if this means the values are all `0`) and a day with null # values. We found that requesting these fields gives you the days with # non-null values - first_chunk = [['dateRange', 'pivot', 'pivotValue']] + first_chunk = [['dateRange', 'pivotValues']] chunks = first_chunk + list(split_into_chunks(valid_selected_fields, MAX_CHUNK_LENGTH)) @@ -488,7 +525,7 @@ def sync_ad_analytics(self, client, catalog, last_datetime, date_window_size, pa # so that we can create the composite primary key for the record and # to merge the multiple responses based on this primary key for chunk in chunks: - for field in ['dateRange', 'pivot', 'pivotValue']: + for field in ['dateRange', 'pivotValues']: if field not in chunk: chunk.append(field) @@ -513,7 +550,8 @@ def sync_ad_analytics(self, client, catalog, last_datetime, date_window_size, pa for page in sync_analytics_endpoint(client, self.tap_stream_id, self.path, query_string): if page.get(self.data_key): responses.append(page.get(self.data_key)) - raw_records = merge_responses(responses) + pivot = params["pivot"] if "pivot" in params.keys() else None + raw_records = merge_responses(pivot, responses) time_extracted = utils.now() # While we broke the ad_analytics streams out from @@ -561,10 +599,9 @@ class Accounts(LinkedInAds): data_key = "elements" children = ["video_ads"] params = { - "q": "search", - "sort.field": "ID", - "sort.order": "ASCENDING" + "q": "search" } + headers = {'X-Restli-Protocol-Version': "2.0.0"} class VideoAds(LinkedInAds): """ @@ -575,12 +612,25 @@ class VideoAds(LinkedInAds): replication_method = "INCREMENTAL" key_properties = ["content_reference"] foreign_key = "id" - path = "adDirectSponsoredContents" + path = "posts" data_key = "elements" parent = "accounts" params = { - "q": "account" + "q": "dscAdAccount", + "dscAdTypes": "List(VIDEO)" } + headers = {'X-Restli-Protocol-Version': "2.0.0"} + + def sync_endpoint(self, *args, **kwargs): + try: + return super().sync_endpoint(*args, **kwargs) + except Exception as error: + if "Not enough permissions to access: partnerApiPostsExternal" in str(error): + LOGGER.warning("Access to the video-ads API is denied due to insufficient permissions. Please reauthenticate or verify the required permissions.") + LOGGER.error(error) + # total record count (zero), initial bookmark returned to supress this failure + return 0, self.get_bookmark(kwargs.get("state"), kwargs.get("start_date")) + raise error class AccountUsers(LinkedInAds): """ @@ -609,9 +659,7 @@ class CampaignGroups(LinkedInAds): path = "adCampaignGroups" data_key = "elements" params = { - "q": "search", - "sort.field": "ID", - "sort.order": "ASCENDING" + "q": "search" } class Campaigns(LinkedInAds): @@ -627,9 +675,7 @@ class Campaigns(LinkedInAds): data_key = "elements" children = ["ad_analytics_by_campaign", "creatives", "ad_analytics_by_creative"] params = { - "q": "search", - "sort.field": "ID", - "sort.order": "ASCENDING" + "q": "search" } class Creatives(LinkedInAds): diff --git a/tap_linkedin_ads/sync.py b/tap_linkedin_ads/sync.py index 85a5b08..2e85b6e 100644 --- a/tap_linkedin_ads/sync.py +++ b/tap_linkedin_ads/sync.py @@ -105,19 +105,20 @@ def sync(client, config, catalog, state): account_filter = stream_obj.account_filter if config.get("accounts") and account_filter is not None: account_list = config['accounts'].replace(" ", "").split(",") - params = stream_obj.params - for idx, account in enumerate(account_list): + if len(account_list) > 0: + params = stream_obj.params if account_filter == 'search_id_values_param': - params['search.id.values[{}]'.format(idx)] = int(account) - elif account_filter == 'search_account_values_param': - params['search.account.values[{}]'.format(idx)] = \ - 'urn:li:sponsoredAccount:{}'.format(account) + # Convert account IDs to URN format + urn_list = ["urn%3Ali%3AsponsoredAccount%3A{}".format(account_id) for account_id in account_list] + # Create the query parameter string + param_value = "(id:(values:List({})))".format(','.join(urn_list)) + params['search'] = param_value elif account_filter == 'accounts_param': - params['accounts[{}]'.format(idx)] = \ - 'urn:li:sponsoredAccount:{}'.format(account) - - # Update params of specific stream - stream_obj.params = params + for idx, account in enumerate(account_list): + params['accounts[{}]'.format(idx)] = \ + 'urn:li:sponsoredAccount:{}'.format(account) + # Update params of specific stream + stream_obj.params = params LOGGER.info('START Syncing: %s', stream_name) update_currently_syncing(state, stream_name) @@ -128,11 +129,11 @@ def sync(client, config, catalog, state): total_records, max_bookmark_value = stream_obj.sync_endpoint( client=client, catalog=catalog, - state=state, - page_size=page_size, + state=state, page_size=page_size, start_date=start_date, selected_streams=selected_streams, - date_window_size=date_window_size) + date_window_size=date_window_size, + account_list=account_list) # Write parent stream's bookmarks if stream_obj.replication_keys and stream_name in selected_streams: diff --git a/tap_linkedin_ads/transform.py b/tap_linkedin_ads/transform.py index 02c4e20..948a333 100644 --- a/tap_linkedin_ads/transform.py +++ b/tap_linkedin_ads/transform.py @@ -294,6 +294,25 @@ def transform_urn(data_dict): return data_dict +def transform_video_ads(data_dict): + # pylint: disable=fixme + # TODO: To be removed in next major version release + if 'author' in data_dict: + data_dict['owner'] = data_dict["author"] + if 'id' in data_dict: + data_dict['content_reference'] = data_dict["id"] + if 'ad_context' in data_dict: + if 'dsc_name' in data_dict['ad_context']: + data_dict['name'] = data_dict["ad_context"]['dsc_name'] + if 'dsc_ad_type' in data_dict['ad_context']: + data_dict['type'] = data_dict["ad_context"]['dsc_ad_type'] + if 'dsc_ad_account' in data_dict['ad_context']: + data_dict['account'] = data_dict["ad_context"]['dsc_ad_account'] + if 'last_modified_at' in data_dict: + data_dict['last_modified_time'] = data_dict["last_modified_at"] + if 'created_at' in data_dict: + data_dict['created_time'] = data_dict["created_at"] + return data_dict def transform_data(data_dict, stream_name): new_dict = data_dict @@ -308,6 +327,8 @@ def transform_data(data_dict, stream_name): this_dict = transform_campaigns(this_dict) elif stream_name == 'creatives': this_dict = transform_creatives(this_dict) + elif stream_name == 'video_ads': + this_dict = transform_video_ads(this_dict) this_dict = transform_urn(this_dict) this_dict = transform_audit_fields(this_dict) diff --git a/tests/test_all_fields.py b/tests/test_all_fields.py index 6f86039..1744d20 100644 --- a/tests/test_all_fields.py +++ b/tests/test_all_fields.py @@ -22,6 +22,7 @@ "video_ads": { "content_reference_share_id", "content_reference_ucg_post_id", + "change_audit_stamps" }, "accounts": { "total_budget_ends_at", @@ -34,6 +35,32 @@ "average_previous_thirty_day_reach_metrics", #BUG: TDL-22692 "approximate_unique_impressions", + #BUG: TDL-25618 + 'job_applications', + 'viral_post_view_job_applications', + 'post_click_job_applications', + 'external_website_post_view_conversions', + 'viral_external_website_conversions', + 'viral_post_view_registrations', + 'talent_leads', + 'viral_registrations', + 'post_view_job_applications', + 'viral_external_website_post_click_conversions', + 'post_click_registrations', + 'job_apply_clicks', + 'conversion_value_in_local_currency', + 'registrations', + 'post_view_registrations', + 'viral_post_view_job_apply_clicks', + 'viral_external_website_post_view_conversions', + 'viral_post_click_job_apply_clicks', + 'viral_job_apply_clicks', + 'viral_job_applications', + 'viral_post_click_job_applications', + 'viral_post_click_registrations', + 'external_website_post_click_conversions', + 'post_view_job_apply_clicks', + 'post_click_job_apply_clicks' }, "ad_analytics_by_campaign": { "average_daily_reach_metrics" @@ -41,6 +68,32 @@ "average_previous_thirty_day_reach_metrics", #BUG: TDL-22692 "approximate_unique_impressions", + #BUG: TDL-25618 + 'job_applications', + 'viral_post_view_job_applications', + 'post_click_job_applications', + 'external_website_post_view_conversions', + 'viral_external_website_conversions', + 'viral_post_view_registrations', + 'talent_leads', + 'viral_registrations', + 'post_view_job_applications', + 'viral_external_website_post_click_conversions', + 'post_click_registrations', + 'job_apply_clicks', + 'conversion_value_in_local_currency', + 'registrations', + 'post_view_registrations', + 'viral_post_view_job_apply_clicks', + 'viral_external_website_post_view_conversions', + 'viral_post_click_job_apply_clicks', + 'viral_job_apply_clicks', + 'viral_job_applications', + 'viral_post_click_job_applications', + 'viral_post_click_registrations', + 'external_website_post_click_conversions', + 'post_view_job_apply_clicks', + 'post_click_job_apply_clicks' }, } diff --git a/tests/unittests/test_streams.py b/tests/unittests/test_streams.py index 8a69193..5dd983b 100644 --- a/tests/unittests/test_streams.py +++ b/tests/unittests/test_streams.py @@ -70,7 +70,7 @@ class TestStreamsUtils(unittest.TestCase): """ Test all utility functions of streams module """ - + def test_split_into_chunks(self): """ Test that `test_split_into_chunks` split 65 fields into 4 chunk of MAX_CHUNK_LENGTH @@ -124,28 +124,29 @@ def test_sync_analytics_endpoint(self, name, next_url, expected_call_count, mock Test that sync_analytics_endpoint function works properly for single page as well as multiple pages. """ mock_next_url.side_effect = next_url - client = _client.LinkedinClient('client_id', 'client_secret', 'refresh_token', 'access_token', 'config_path') + client = _client.LinkedinClient('client_id', 'client_secret', 'refresh_token', 'access_token', 'config_path') data = list(sync_analytics_endpoint(client, "stream", "path", "query=query")) - + # Verify that get method of client is called expected times. self.assertEqual(expected_call_count, mock_get.call_count) - - + + @parameterized.expand([ ["test_single_page", [], None], ["test_multiple_page", [{'rel': 'next', 'href': '/foo'}], 'https://api.linkedin.com/foo'] ]) - def test_get_next_url(self, name, links, expected_url): + def test_get_next_url_index_pagination(self, name, links, expected_url): """ - Test that get_next_url return link of next page in case of 'href' + Test that get_next_url return link of next page in case of 'href' """ data = { 'paging': { 'links': links } } - - actual_url = get_next_url(data) + mock_next_url = "initial_url" + mock_stream_name = "account_users" + actual_url = get_next_url(mock_stream_name, mock_next_url, data) # Verify the next page url self.assertEqual(expected_url, actual_url) @@ -168,13 +169,13 @@ def test_shift_sync_window(self, name, today_month, today_date): 'dateRange.end.month': expected_end_date.month, 'dateRange.end.day': expected_end_date.day, } - + params = { 'dateRange.end.year': 2020, 'dateRange.end.month': 10, 'dateRange.end.day': 1, } - + today = datetime.date(year=2020, month=today_month, day=today_date) actual_start_date, actual_end_date, actual_params = shift_sync_window(params, today, 30) @@ -189,36 +190,32 @@ def test_merge_responses_no_overlap(self): Test merge_response function with records of unique date range value. """ expected_output = { - ('urn:li:sponsoredCampaign:123456789', '2020-10-1') : {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 1}}, - 'a': 1, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, - ('urn:li:sponsoredCampaign:123456789', '2020-10-2') : {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 2}}, - 'b': 2, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, - ('urn:li:sponsoredCampaign:123456789', '2020-10-3') : {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 3}}, - 'c': 3, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, - ('urn:li:sponsoredCampaign:123456789', '2020-10-4') : {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 4}}, - 'd': 4, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, - ('urn:li:sponsoredCampaign:123456789', '2020-10-5') : {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 5}}, - 'e': 5, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, - ('urn:li:sponsoredCampaign:123456789', '2020-10-6') : {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 6}}, - 'f': 6, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, + ('urn:li:sponsoredCampaign:123456789', '2020-10-1'): {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 1}}, 'a': 1, 'pivotValues': ['urn:li:sponsoredCampaign:123456789'], 'pivot': 'CAMPAIGNS', 'pivot_value': 'urn:li:sponsoredCampaign:123456789'}, + ('urn:li:sponsoredCampaign:123456789', '2020-10-2'): {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 2}}, 'b': 2, 'pivotValues': ['urn:li:sponsoredCampaign:123456789'], 'pivot': 'CAMPAIGNS', 'pivot_value': 'urn:li:sponsoredCampaign:123456789'}, + ('urn:li:sponsoredCampaign:123456789', '2020-10-3'): {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 3}}, 'c': 3, 'pivotValues': ['urn:li:sponsoredCampaign:123456789'], 'pivot': 'CAMPAIGNS', 'pivot_value': 'urn:li:sponsoredCampaign:123456789'}, + ('urn:li:sponsoredCampaign:123456789', '2020-10-4'): {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 4}}, 'd': 4, 'pivotValues': ['urn:li:sponsoredCampaign:123456789'], 'pivot': 'CAMPAIGNS', 'pivot_value': 'urn:li:sponsoredCampaign:123456789'}, + ('urn:li:sponsoredCampaign:123456789', '2020-10-5'): {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 5}}, 'e': 5, 'pivotValues': ['urn:li:sponsoredCampaign:123456789'], 'pivot': 'CAMPAIGNS', 'pivot_value': 'urn:li:sponsoredCampaign:123456789'}, + ('urn:li:sponsoredCampaign:123456789', '2020-10-6'): {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 6}}, 'f': 6, 'pivotValues': ['urn:li:sponsoredCampaign:123456789'], 'pivot': 'CAMPAIGNS', 'pivot_value': 'urn:li:sponsoredCampaign:123456789'} } data = [ [{'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 1}}, - 'a': 1, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, + 'a': 1, 'pivotValues': ['urn:li:sponsoredCampaign:123456789']}, {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 2}}, - 'b': 2, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, + 'b': 2, 'pivotValues': ['urn:li:sponsoredCampaign:123456789']}, {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 3}}, - 'c': 3, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'},], + 'c': 3, 'pivotValues': ['urn:li:sponsoredCampaign:123456789']},], [{'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 4}}, - 'd': 4, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, + 'd': 4, 'pivotValues': ['urn:li:sponsoredCampaign:123456789']}, {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 5}}, - 'e': 5, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, + 'e': 5, 'pivotValues': ['urn:li:sponsoredCampaign:123456789']}, {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 6}}, - 'f': 6, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'},], + 'f': 6, 'pivotValues': ['urn:li:sponsoredCampaign:123456789']},], ] - actual_output = merge_responses(data) + mock_pivot = "CAMPAIGNS" + actual_output = merge_responses(mock_pivot, data) + print(actual_output) self.assertEqual(expected_output, actual_output) @@ -228,38 +225,24 @@ def test_merge_responses_with_overlap(self): """ data = [ [{'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 1}}, - 'a': 1, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, + 'a': 1, 'pivotValues': ['urn:li:sponsoredCampaign:123456789']}, {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 1}}, - 'b': 7, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, + 'b': 7, 'pivotValues': ['urn:li:sponsoredCampaign:123456789']}, {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 2}}, - 'b': 2, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, + 'b': 2, 'pivotValues': ['urn:li:sponsoredCampaign:123456789']}, {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 3}}, - 'c': 3, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'},], + 'c': 3, 'pivotValues': ['urn:li:sponsoredCampaign:123456789']},], [{'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 4}}, - 'd': 4, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, + 'd': 4, 'pivotValues': ['urn:li:sponsoredCampaign:123456789']}, {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 5}}, - 'e': 5, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, + 'e': 5, 'pivotValues': ['urn:li:sponsoredCampaign:123456789']}, {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 6}}, - 'f': 6, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'},], + 'f': 6, 'pivotValues': ['urn:li:sponsoredCampaign:123456789']},], ] - expected_output = { - ('urn:li:sponsoredCampaign:123456789', '2020-10-1') : {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 1}}, - 'a': 1, 'b': 7, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, - ('urn:li:sponsoredCampaign:123456789', '2020-10-2') : {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 2}}, - 'b': 2, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, - ('urn:li:sponsoredCampaign:123456789', '2020-10-3') : {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 3}}, - 'c': 3, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, - ('urn:li:sponsoredCampaign:123456789', '2020-10-4') : {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 4}}, - 'd': 4, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, - ('urn:li:sponsoredCampaign:123456789', '2020-10-5') : {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 5}}, - 'e': 5, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, - ('urn:li:sponsoredCampaign:123456789', '2020-10-6') : {'dateRange': {'start': {'year': 2020, 'month': 10, 'day': 6}}, - 'f': 6, 'pivotValue': 'urn:li:sponsoredCampaign:123456789'}, - } - - actual_output = merge_responses(data) - + expected_output = {('urn:li:sponsoredCampaign:123456789', '2020-10-4'): {'pivot': 'CAMPAIGNS', 'pivotValues': ['urn:li:sponsoredCampaign:123456789'], 'dateRange': {'start': {'month': 10, 'day': 4, 'year': 2020}}, 'pivot_value': 'urn:li:sponsoredCampaign:123456789', 'd': 4}, ('urn:li:sponsoredCampaign:123456789', '2020-10-3'): {'pivot': 'CAMPAIGNS', 'c': 3, 'pivotValues': ['urn:li:sponsoredCampaign:123456789'], 'dateRange': {'start': {'month': 10, 'day': 3, 'year': 2020}}, 'pivot_value': 'urn:li:sponsoredCampaign:123456789'}, ('urn:li:sponsoredCampaign:123456789', '2020-10-1'): {'pivotValues': ['urn:li:sponsoredCampaign:123456789'], 'pivot': 'CAMPAIGNS', 'pivot_value': 'urn:li:sponsoredCampaign:123456789', 'dateRange': {'start': {'month': 10, 'day': 1, 'year': 2020}}, 'a': 1, 'b': 7}, ('urn:li:sponsoredCampaign:123456789', '2020-10-6'): {'pivot': 'CAMPAIGNS', 'f': 6, 'pivotValues': ['urn:li:sponsoredCampaign:123456789'], 'dateRange': {'start': {'month': 10, 'day': 6, 'year': 2020}}, 'pivot_value': 'urn:li:sponsoredCampaign:123456789'}, ('urn:li:sponsoredCampaign:123456789', '2020-10-5'): {'pivot': 'CAMPAIGNS', 'pivotValues': ['urn:li:sponsoredCampaign:123456789'], 'dateRange': {'start': {'month': 10, 'day': 5, 'year': 2020}}, 'pivot_value': 'urn:li:sponsoredCampaign:123456789', 'e': 5}, ('urn:li:sponsoredCampaign:123456789', '2020-10-2'): {'pivot': 'CAMPAIGNS', 'pivotValues': ['urn:li:sponsoredCampaign:123456789'], 'dateRange': {'start': {'month': 10, 'day': 2, 'year': 2020}}, 'pivot_value': 'urn:li:sponsoredCampaign:123456789', 'b': 2}} + mock_pivot = "CAMPAIGNS" + actual_output = merge_responses(mock_pivot, data) # Verify that merge_responses function merge records by primary with same date range value. self.assertEqual(expected_output, actual_output) @@ -282,11 +265,11 @@ def test_get_bookmark(self, name, state, expected_output): Case 3: Return default value if stream_name is not found in the bookmarks Cas 4: Return actual bookmark value if it is found in the state """ - + actual_output = ACCOUNT_OBJ.get_bookmark(state, "default") - + self.assertEqual(expected_output, actual_output) - + @parameterized.expand([ ['test_zero_records', ACCOUNT_OBJ, [], "last_modified_time", "2021-07-20T08:50:30.169000Z", 0], ['test_zero_latest_records', ACCOUNT_OBJ, [{'id': 1, 'last_modified_time': '2021-06-26T08:50:30.169000Z'}], "last_modified_time", "2021-07-20T08:50:30.169000Z", 0], @@ -300,7 +283,7 @@ def test_process_records(self, name, stream_obj, records, replication_key, expec """ max_bookmark_value = last_datetime = "2021-07-20T08:50:30.169000Z" actual_max_bookmark, actual_record_count = stream_obj.process_records(CATALOG, records, utils.now(), replication_key, max_bookmark_value, last_datetime) - + # Verify maximum bookmark and total records. self.assertEqual(expected_max_bookmark, actual_max_bookmark) self.assertEqual(expected_record_count, actual_record_count) @@ -323,7 +306,7 @@ def test_process_records(self, name, stream_obj, records, replication_key, expec ['test_only_parent_selcted_stream', ['campaigns'], CAMPAIGN_OBJ, [{'paging': {'start': 0, 'count': 100, 'links': [], 'total': 1},'elements': [{'changeAuditStamps': {'created': {'time': 1564585620000}, 'lastModified': {'time': 1564585620000}}, 'id': 1}]}], 0, 1 - ] + ] ]) @mock.patch("tap_linkedin_ads.streams.LinkedInAds.sync_ad_analytics", return_value=(1, "2019-07-31T15:07:00.000000Z")) @mock.patch("tap_linkedin_ads.streams.LinkedInAds.get_bookmark", return_value = "2019-07-31T15:07:00.000000Z") @@ -340,11 +323,12 @@ def test_sync_endpoint(self, name, selected_streams, stream_obj, mock_response, start_date='2019-06-01T00:00:00Z' page_size = 100 date_window_size = 7 + account_list = ["12345"] mock_client.side_effect = mock_response mock_process_records.return_value = "2019-07-31T15:07:00.000000Z",1 - actual_total_record, actual_max_bookmark = stream_obj.sync_endpoint(client, CATALOG, state, page_size, start_date, selected_streams, date_window_size) - + actual_total_record, actual_max_bookmark = stream_obj.sync_endpoint(client, CATALOG, state, page_size, start_date, selected_streams, date_window_size, account_list=account_list) + # Verify total no of records self.assertEqual(actual_total_record, mock_record_count) # Verify maximum bookmark @@ -352,28 +336,6 @@ def test_sync_endpoint(self, name, selected_streams, stream_obj, mock_response, # Verify total no of write_schema function call. sync_endpoint calls write_schema single time for each child. self.assertEqual(mock_write_schema.call_count, expected_write_schema_count) - @mock.patch("tap_linkedin_ads.sync.LOGGER.warning") - @mock.patch("tap_linkedin_ads.streams.LinkedInAds.get_bookmark", return_value = "2019-07-31T15:07:00.000000Z") - @mock.patch("tap_linkedin_ads.client.LinkedinClient.request") - @mock.patch("tap_linkedin_ads.streams.LinkedInAds.process_records") - @mock.patch("tap_linkedin_ads.streams.LinkedInAds.write_schema") - def test_sync_endpoint_for_reference_organization_id_is_None(self, mock_write_schema,mock_process_records,mock_client,mock_get_bookmark, - mock_warning): - """ - Verify that tap skips API call for video_ads stream if owner_id in the parent's record is None. - """ - client = LinkedinClient('client_id', 'client_secret', 'refresh_token', 'access_token', 'config_path') - state={'currently_syncing': 'accounts'} - start_date='2019-06-01T00:00:00Z' - page_size = 100 - date_window_size = 7 - selected_streams = ['accounts', 'video_ads'] - - mock_client.side_effect = [{'paging': {'start': 0, 'count': 100, 'links': [], 'total': 1},'elements': [{'changeAuditStamps': {'created': {'time': 1564585620000}, 'lastModified': {'time': 1564585620000}}, 'id': 1}]}] - mock_process_records.return_value = "2019-07-31T15:07:00.000000Z",1 - ACCOUNT_OBJ.sync_endpoint(client, CATALOG, state, page_size, start_date, selected_streams, date_window_size) - - mock_warning.assert_called_with('Skipping video_ads call for %s account as reference_organization_id is not found.', 'urn:li:sponsoredAccount:1') @parameterized.expand([ @@ -385,7 +347,7 @@ def test_sync_endpoint_for_reference_organization_id_is_None(self, mock_write_sc @mock.patch("tap_linkedin_ads.streams.transform_json") @mock.patch("tap_linkedin_ads.streams.sync_analytics_endpoint") @mock.patch("tap_linkedin_ads.streams.merge_responses") - def test_sync_ad_analytics(self, name, expected_record_count, expected_max_bookmark, mock_tranform_data, + def test_sync_ad_analytics(self, name, expected_record_count, expected_max_bookmark, mock_tranform_data, mock_merge_response, mock_endpoint, mock_transform, mock_shift_windows, mock_process_record): """ Test that `sync_ad_analytics` function work properly for zero records as well as multiple records. @@ -398,7 +360,7 @@ def test_sync_ad_analytics(self, name, expected_record_count, expected_max_bookm mock_transform.return_value = mock_tranform_data mock_process_record.return_value = (expected_max_bookmark, expected_record_count) actual_record_count, actual_max_bookmark = AD_ANALYTICS_BY_CAMPAIGN.sync_ad_analytics(client, CATALOG, bookmark, date_window_size) - + # Verify maximum bookmark self.assertEqual(actual_max_bookmark, expected_max_bookmark) # Verify total no of records @@ -423,5 +385,5 @@ def test_write_record(self, mock_logger, mock_write_record): """ with self.assertRaises(OSError) as e: ACCOUNT_OBJ.write_record([], '') - + mock_logger.assert_called_with('record: %s', []) diff --git a/tests/unittests/test_sync.py b/tests/unittests/test_sync.py index 7cc0137..d0fa1c1 100644 --- a/tests/unittests/test_sync.py +++ b/tests/unittests/test_sync.py @@ -162,4 +162,4 @@ def test_sync(self, name, config, expected_date_window, mock_sync_endpoint): page_size=100, start_date="2019-06-01T00:00:00Z", selected_streams=['accounts', 'video_ads', 'account_users', 'campaigns', 'ad_analytics_by_campaign'], - date_window_size=expected_date_window) \ No newline at end of file + date_window_size=expected_date_window, account_list=[config['accounts']])