diff --git a/tests/sfbase.py b/tests/sfbase.py index d1e8c634..a4019ca4 100644 --- a/tests/sfbase.py +++ b/tests/sfbase.py @@ -5,9 +5,9 @@ """ import unittest import os +import math from datetime import timedelta from datetime import datetime as dt - from tap_tester import connections, menagerie, runner, LOGGER from tap_tester.base_suite_tests.base_case import BaseCase @@ -22,7 +22,8 @@ class SFBaseTest(BaseCase): total_quota = '95' per_run_quota = None # default start date which can be overridden in the tests - start_date = '2020-11-23T00:00:00Z' + start_date = '2000-11-23T00:00:00Z' + partitioned_streams = {} @staticmethod def tap_name(): @@ -847,15 +848,6 @@ def expected_metadata(): } - def expected_streams(self): - """A set of expected stream names""" - streams = set(self.expected_metadata().keys()) - - if self.salesforce_api == 'BULK': - return streams.difference(self.rest_only_streams()) - return streams - - @staticmethod def rest_only_streams(): """A group of streams that is only discovered when the REST API is in use.""" @@ -873,6 +865,14 @@ def rest_only_streams(): 'UndecidedEventRelation', } + def expected_streams(self): + """A set of expected stream names""" + streams = set(self.expected_metadata().keys()) + + if self.salesforce_api == 'BULK': + return streams.difference(self.rest_only_streams()) + return streams + def set_replication_methods(self, conn_id, catalogs, replication_methods): replication_keys = self.expected_replication_keys() @@ -883,9 +883,13 @@ def set_replication_methods(self, conn_id, catalogs, replication_methods): if replication_method == self.INCREMENTAL: replication_key = list(replication_keys.get(catalog['stream_name']))[0] - replication_md = [{ "breadcrumb": [], "metadata": {'replication-key': replication_key, "replication-method" : replication_method, "selected" : True}}] + replication_md = [{ "breadcrumb": [], + "metadata": {'replication-key': replication_key, + "replication-method" : replication_method, "selected" : True}}] else: - replication_md = [{ "breadcrumb": [], "metadata": {'replication-key': None, "replication-method" : "FULL_TABLE", "selected" : True}}] + replication_md = [{ "breadcrumb": [], + "metadata": {'replication-key': None, + "replication-method" : "FULL_TABLE", "selected" : True}}] connections.set_non_discoverable_metadata( conn_id, catalog, menagerie.get_annotated_schema(conn_id, catalog['stream_id']), replication_md) @@ -900,3 +904,253 @@ def setUpClass(cls): if missing_envs: raise Exception("set environment variables") + + def get_custom_fields(self, found_catalogs, conn_id): + """ List all the custom_fields for each stream""" + custom_fields = {} + for stream in self.streams_to_test(): + + catalog = [catalog for catalog in found_catalogs + if catalog["stream_name"] == stream][0] + schema = menagerie.get_annotated_schema(conn_id, catalog['stream_id'])["annotated-schema"] + custom_fields[stream] = {key for key in schema['properties'].keys() + if key.endswith("__c")} + return custom_fields + + def get_non_custom_fields(self, found_catalogs, conn_id): + """ List all the non_custom_fields for each stream""" + non_custom_fields = {} + for stream in self.streams_to_test(): + catalog = [catalog for catalog in found_catalogs + if catalog["stream_name"] == stream][0] + schema = menagerie.get_annotated_schema(conn_id, catalog['stream_id'])["annotated-schema"] + non_custom_fields[stream] = {key for key in schema['properties'].keys() + if not key.endswith("__c") + and schema['properties'][key]['inclusion'] != "unsupported"} + return non_custom_fields + + @staticmethod + def count_custom_non_custom_fields(fields): + custom = 0 + non_custom =0 + for field in fields: + if not field.endswith("__c"): + non_custom += 1 + else: + custom += 1 + return (custom, non_custom) + + @staticmethod + def get_streams_with_data(): + #the streams listed here are the streams that have data currently + streams_with_data = { + 'ActiveFeatureLicenseMetric', + 'AppMenuItem', + 'AuthSession', + 'Account', + 'Calendar', + 'AppointmentSchedulingPolicy', + 'Campaign', + 'AssignmentRule', + 'ActivePermSetLicenseMetric', + 'AppDefinition', + 'BusinessHours', + 'ActiveProfileMetric', + 'Calendar', + 'ContentWorkspacePermission', + 'CampaignMemberStatus', + 'Community', + 'Contact', + 'Case', + 'ClientBrowser', + 'BusinessHours', + 'ContentWorkspace', + 'Campaign', + 'FieldPermissions', + 'Group', + 'EventLogFile', + 'FiscalYearSettings', + 'ListView', + 'LoginHistory', + 'LeadStatus', + 'Lead', + 'LightningUsageByFlexiPageMetrics', + 'FormulaFunctionAllowedType', + 'LoginIp', + 'LightningUsageByAppTypeMetrics', + 'FileSearchActivity', + 'FormulaFunctionCategory', + 'LightningUsageByBrowserMetrics', + 'Folder', + 'FormulaFunction', + 'MatchingRule', + 'LightningUsageByPageMetrics', + 'LoginGeo', + 'FlowDefinitionView', + 'LightningToggleMetrics', + 'LightningExitByPageMetrics', + 'PermissionSetTabSetting', + 'MilestoneType', + 'Period', + 'MatchingRule', + 'OpportunityStage', + 'PlatformEventUsageMetric', + 'Organization', + 'OpportunityHistory', + 'Pricebook2', + 'PermissionSetLicense', + 'ObjectPermissions', + 'Opportunity', + 'PermissionSetAssignment', + 'OauthToken', + 'PricebookEntry', + 'Profile', + 'PermissionSet', + 'Product2', + 'PromptAction', + 'SetupEntityAccess', + 'Profile', + 'Publisher', + 'ServiceSetupProvisioning', + 'Report', + 'Solution', + 'PromptActionShare', + 'SlaProcess', + 'SetupAuditTrail', + 'UiFormulaRule', + 'WebLink', + 'UserPermissionAccess', + 'UserRole', + 'TabDefinition', + 'UserLogin', + 'UserAppMenuItem', + 'TenantUsageEntitlement', + 'UserLicense', + 'User', + 'TapTester__c', + 'SlaProcess', + 'UserAppInfo', + 'UiFormulaCriterion', + 'Solution', + 'FieldPermissions', + 'EntityDefinition', + 'ContentWorkspace', + 'DuplicateRule', + 'CronTrigger', + 'Domain', + 'ContentWorkspacePermission', + 'EmailTemplate', + 'EventLogFile', + 'CronJobDetail', + 'Entitlement', + } + return streams_with_data + + @staticmethod + def get_unsupported_by_rest_api(): + """The streams listed here are not supported by the REST API""" + unsupported_streams = { + 'Announcement', + 'CollaborationGroupRecord', + 'ContentDocumentLink', + 'ContentFolderMember', + 'DataStatistics', + 'EntityParticle', + 'FieldDefinition', + 'FlexQueueItem', + 'IdeaComment', + 'OwnerChangeOptionInfo', + 'PicklistValueInfo', + 'PlatformAction', + 'RelationshipDomain', + 'RelationshipInfo', + 'SearchLayout', + 'SiteDetail', + 'UserEntityAccess', + 'UserFieldAccess', + 'Vote', + 'RecordActionHistory', + 'FlowVersionView', + 'FlowVariableView', + 'AppTabMember', + 'ColorDefinition', + 'IconDefinition', + } + + return unsupported_streams + + def get_unsupported_by_bulk_api(self): + unsupported_streams_rest = self.get_unsupported_by_rest_api() + unsupported_streams_bulk_only= { + 'AcceptedEventRelation', + 'AssetTokenEvent', + 'AttachedContentNote', + 'CaseStatus', + 'ContentFolderItem', + 'ContractStatus', + 'DeclinedEventRelation', + 'EventWhoRelation', + 'PartnerRole', + 'QuoteTemplateRichTextData', + 'RecentlyViewed', + 'SolutionStatus', + 'TaskPriority', + 'TaskWhoRelation', + 'TaskStatus', + 'UndecidedEventRelation', + 'OrderStatus', + 'WorkOrderStatus', + 'WorkOrderLineItemStatus', + 'ServiceAppointmentStatus', + 'ServiceAppointmentStatus', + 'FieldSecurityClassification', + # BUG_TODO | the following streams are undocumented + 'WorkStepStatus', + 'ShiftStatus', + } + + return unsupported_streams_bulk_only | unsupported_streams_rest + + def is_unsupported_by_rest_api(self, stream): + """returns True if stream is unsupported by REST API""" + + return stream in self.get_unsupported_by_rest_api() + + def is_unsupported_by_bulk_api(self, stream): + """ + returns True if stream is unsupported by BULK API + + BULK API does not support any streams that are unsupported by the REST API and + in addition does not support the streams listed below. + """ + return stream in self.get_unsupported_by_bulk_api() + + def partition_streams(self,list_of_streams): + + weekday = dt.weekday(dt.now()) # weekdays 0-6, Mon-Sun + partition_size = math.ceil(len(list_of_streams)/7) + + # if partition_size increases in a given week the start of subsequent slices will be pushed + # forward allowing for skipped streams, buffer start by 15 to help prevent this + start_of_slice = max(partition_size * weekday - 15, 0) + end_of_slice = min(partition_size * (weekday + 1), len(list_of_streams)) + sorted_streams = sorted(list_of_streams) + + LOGGER.info("Using weekday based subset of found_catalogs, weekday = %s", weekday) + + # select certain... catalogs + if self.salesforce_api == 'BULK': + self.partitioned_streams = {stream + for stream in sorted_streams[start_of_slice:end_of_slice] + if not self.is_unsupported_by_bulk_api(stream)} + else: + self.partitioned_streams = {stream + for stream in sorted_streams[start_of_slice:end_of_slice] + if not self.is_unsupported_by_rest_api(stream)} + + return self.partitioned_streams + + def streams_to_test(self): + if self.partitioned_streams: + return self.partitioned_streams + return self.partition_streams(self.get_streams_with_data()) diff --git a/tests/test_salesforce_all_fields_custom.py b/tests/test_salesforce_all_fields_custom.py new file mode 100644 index 00000000..c0c48d61 --- /dev/null +++ b/tests/test_salesforce_all_fields_custom.py @@ -0,0 +1,42 @@ +""" +Test that with only custom fields selected for a stream automatic fields and custom fields are still replicated +""" +from tap_tester.base_suite_tests.all_fields_test import AllFieldsTest +from sfbase import SFBaseTest + +class SFCustomFieldsTest(AllFieldsTest, SFBaseTest): + + salesforce_api = 'BULK' + + @staticmethod + def name(): + return "tt_sf_all_fields_custom" + + streams_to_test = SFBaseTest.streams_to_test + + def streams_to_selected_fields(self): + found_catalogs = AllFieldsTest.found_catalogs + conn_id = AllFieldsTest.conn_id + custom_fields = self.get_custom_fields(found_catalogs, conn_id) + return custom_fields + + def test_all_fields_for_streams_are_replicated(self): + for stream in self.streams_to_test(): + with self.subTest(stream=stream): + automatic_fields = self.expected_automatic_fields(stream) + expected_custom_fields = self.selected_fields.get(stream, set()).union(automatic_fields) + replicated_custom_fields = self.actual_fields.get(stream, set()) + + #Verify that custom and automatic fields are replicated + self.assertSetEqual(expected_custom_fields, replicated_custom_fields, + msg = f"verify all fields are replicated for stream {stream}") + + #Verify at least one custom field is replicated if exists + if len(expected_custom_fields) > len(automatic_fields): + self.assertGreater(len(replicated_custom_fields.difference(automatic_fields)),0, + msg = f"Replication didn't return any custom fields for stream {stream}") + + #Verify that only custom fields are replicated besides automatic fields + _, num_non_custom = self.count_custom_non_custom_fields(replicated_custom_fields) + self.assertEqual(num_non_custom, len(automatic_fields), + msg = f"Replicated some fields that are not custom fields for stream {stream}") diff --git a/tests/test_salesforce_all_fields_custom_rest.py b/tests/test_salesforce_all_fields_custom_rest.py new file mode 100644 index 00000000..74f017ef --- /dev/null +++ b/tests/test_salesforce_all_fields_custom_rest.py @@ -0,0 +1,42 @@ +""" +Test that with only custom fields selected for a stream automatic fields and custom fields are still replicated +""" +from tap_tester.base_suite_tests.all_fields_test import AllFieldsTest +from sfbase import SFBaseTest + +class SFCustomFieldsTestRest(AllFieldsTest, SFBaseTest): + + salesforce_api = 'REST' + + @staticmethod + def name(): + return "tt_sf_all_fields_custom_rest" + + streams_to_test = SFBaseTest.streams_to_test + + def streams_to_selected_fields(self): + found_catalogs = AllFieldsTest.found_catalogs + conn_id = AllFieldsTest.conn_id + custom_fields = self.get_custom_fields(found_catalogs, conn_id) + return custom_fields + + def test_all_fields_for_streams_are_replicated(self): + for stream in self.streams_to_test(): + with self.subTest(stream=stream): + automatic_fields = self.expected_automatic_fields(stream) + expected_custom_fields = self.selected_fields.get(stream, set()).union(automatic_fields) + replicated_custom_fields = self.actual_fields.get(stream, set()) + + #Verify that custom and automatic fields are replicated + self.assertSetEqual(expected_custom_fields, replicated_custom_fields, + msg = f"verify all fields are replicated for stream {stream}") + + #Verify at least one custom field is replicated + if len(expected_custom_fields) > len(automatic_fields): + self.assertGreater(len(replicated_custom_fields.difference(automatic_fields)),0, + msg = f"Replication didn't return any custom fields for stream {stream}") + + #Verify that only custom fields are replicated besides automatic fields + _, num_non_custom = self.count_custom_non_custom_fields(replicated_custom_fields) + self.assertEqual(num_non_custom, len(automatic_fields), + msg = f"Replicated some fields that are not custom fields for stream {stream}") diff --git a/tests/test_salesforce_all_fields_non_custom.py b/tests/test_salesforce_all_fields_non_custom.py new file mode 100644 index 00000000..51f6ce6b --- /dev/null +++ b/tests/test_salesforce_all_fields_non_custom.py @@ -0,0 +1,45 @@ +""" +Test that with only non-custom fields selected for a stream automatic fields and non custom fields are still replicated +""" +from tap_tester.base_suite_tests.all_fields_test import AllFieldsTest +from sfbase import SFBaseTest + + +class SFNonCustomFieldsTest(AllFieldsTest, SFBaseTest): + + salesforce_api = 'BULK' + + @staticmethod + def name(): + return "tt_sf_all_fields_non_custom" + + streams_to_test = SFBaseTest.streams_to_test + + def streams_to_selected_fields(self): + found_catalogs = AllFieldsTest.found_catalogs + conn_id = AllFieldsTest.conn_id + non_custom_fields = self.get_non_custom_fields(found_catalogs, conn_id) + return non_custom_fields + + def test_non_custom_fields(self): + for stream in self.streams_to_selected_fields(): + with self.subTest(stream=stream): + expected_non_custom_fields = self.selected_fields.get(stream,set()) + replicated_non_custom_fields = self.actual_fields.get(stream, set()) + + #Verify at least one non-custom field is replicated + self.assertGreater(len(replicated_non_custom_fields),0, + msg = f"Replication didn't return any non-custom fields for stream {stream}") + + #verify that all the non_custom fields are replicated + self.assertEqual(replicated_non_custom_fields, expected_non_custom_fields, + msg = f"All non_custom fields are not no replicated for stream {stream}") + + #verify that automatic fields are also replicated along with non_custom_fields + self.assertTrue(self.expected_automatic_fields(stream).issubset(replicated_non_custom_fields), + msg = f"Automatic fields are not replicated for stream {stream}") + + #Verify custom fields are not replicated by checking the field name + num_custom, _ = self.count_custom_non_custom_fields(replicated_non_custom_fields) + self.assertEqual(num_custom, 0, + msg = f"replicated some fields that are custom for stream {stream}") diff --git a/tests/test_salesforce_all_fields_non_custom_rest.py b/tests/test_salesforce_all_fields_non_custom_rest.py new file mode 100644 index 00000000..aa9aad0e --- /dev/null +++ b/tests/test_salesforce_all_fields_non_custom_rest.py @@ -0,0 +1,46 @@ +""" +Test that with only non-custom fields selected for a stream automatic fields and non custom fields are still replicated +""" + +from tap_tester.base_suite_tests.all_fields_test import AllFieldsTest +from sfbase import SFBaseTest + + +class SFNonCustomFieldsTestRest(AllFieldsTest, SFBaseTest): + + salesforce_api = 'REST' + + @staticmethod + def name(): + return "tt_sf_all_fields_non_custom_rest" + + streams_to_test = SFBaseTest.streams_to_test + + def streams_to_selected_fields(self): + found_catalogs = AllFieldsTest.found_catalogs + conn_id = AllFieldsTest.conn_id + non_custom_fields = self.get_non_custom_fields(found_catalogs, conn_id) + return non_custom_fields + + def test_non_custom_fields(self): + for stream in self.streams_to_selected_fields(): + with self.subTest(stream=stream): + expected_non_custom_fields = self.selected_fields.get(stream,set()) + replicated_non_custom_fields = self.actual_fields.get(stream, set()) + #Verify at least one non-custom field is replicated + self.assertGreater(len(replicated_non_custom_fields),0, + msg = f"Replication didn't return any non-custom fields for stream {stream}") + + #verify that all the non_custom fields are replicated + self.assertEqual(replicated_non_custom_fields, expected_non_custom_fields, + msg = f"All non_custom fields are not no replicated for stream {stream}") + + #verify that automatic fields are also replicated along with non_custom_fields + self.assertTrue(self.expected_automatic_fields(stream).issubset(replicated_non_custom_fields), + msg = f"Automatic fields are not replicated for stream {stream}") + + #Verify custom fields are not replicated by checking the field name + num_custom, _ = self.count_custom_non_custom_fields(replicated_non_custom_fields) + self.assertEqual(num_custom, 0, + msg = f"Replicated some fields that are custom fields for stream {stream}") + diff --git a/tests/test_salesforce_sync_canary.py b/tests/test_salesforce_sync_canary.py index 3b79c16d..8aea0a41 100644 --- a/tests/test_salesforce_sync_canary.py +++ b/tests/test_salesforce_sync_canary.py @@ -1,14 +1,12 @@ -import math -import unittest + from datetime import datetime, timedelta -from operator import itemgetter from tap_tester import menagerie, connections, LOGGER -from base import SalesforceBaseTest +from sfbase import SFBaseTest -class SalesforceSyncCanary(SalesforceBaseTest): +class SalesforceSyncCanary(SFBaseTest): """ Run the tap in discovery mode, select all tables/fields, and run a short timespan sync of all objects to root out any potential issues syncing some objects. @@ -16,7 +14,7 @@ class SalesforceSyncCanary(SalesforceBaseTest): @staticmethod def name(): - return "tap_tester_salesforce_unsupported_objects" + return "tt_sf_unsupported_objects" @staticmethod def get_properties(): # pylint: disable=arguments-differ @@ -46,30 +44,16 @@ def test_run(self): # run in check mode found_catalogs = self.run_and_verify_check_mode(conn_id) - # partition the found catalogs into 7 groups, 1 for each day of the week to save on both - # time and API quota - weekday = datetime.weekday(datetime.now()) # weekdays 0-6, Mon-Sun - partition_size = math.ceil(len(found_catalogs)/7) - - # if partition_size increases in a given week the start of subsequent slices will be pushed - # forward allowing for skipped streams, buffer start by 15 to help prevent this - start_of_slice = max(partition_size * weekday - 15, 0) - end_of_slice = min(partition_size * (weekday + 1), len(found_catalogs)) - - # sort catalogs to help mitigate problems from drastic changes in stream ordering - sorted_catalogs = sorted(found_catalogs, key=itemgetter('tap_stream_id')) - - LOGGER.info("Using weekday based subset of found_catalogs, weekday = %s", weekday) - # select certain... catalogs - expected_streams = self.expected_sync_streams() - allowed_catalogs = [catalog - for catalog in sorted_catalogs[start_of_slice:end_of_slice] - if not self.is_unsupported_by_bulk_api(catalog['stream_name']) and - catalog['stream_name'] in expected_streams] + expected_streams = self.partition_streams(self.expected_sync_streams()) + allowed_catalogs = [catalog for catalog in found_catalogs + if catalog['stream_name'] in expected_streams] self.select_all_streams_and_fields(conn_id, allowed_catalogs) - # Run sync menagerie.set_state(conn_id, {}) - _ = self.run_and_verify_sync(conn_id) + record_count_by_stream = self.run_and_verify_sync_mode(conn_id) + actual_streams_with_data ={stream for stream in record_count_by_stream + if record_count_by_stream[stream] > 0} + self.assertTrue(actual_streams_with_data.issubset(self.get_streams_with_data()), + msg = f"New streams with data are synced {self.get_streams_with_data().difference(actual_streams_with_data)}")