Skip to content

Commit

Permalink
Implement all_fields test using new framework-tdl-7499 (#159)
Browse files Browse the repository at this point in the history
* Implement all_fields test using new framework-tdl-7499

* Added couple of methods for table_reset test

* Removed the conflicts

* Added comments for future work

* Test if custom fields are replicated fine

* Test if non custom fields are replicated fine

* Added custom_fields and non_custom_fields and method to select streams based on the api type

* Renamed the testfiles for custom and non-custom fields tests

* Added comments for the todo tests

* Renamed to different filename

* Removed as custom and non-custom fields are split to separate testfiles

* Added comment for future work for custom and non-custom fields

* override test_all_fields_for_streams_are_replicated to exclude automatic fields

* Use automatic fields instead of hardcoding

* Implemented PR Review changes to remove the overriden method and add assertion for the custom field name

* New method to count custom and non-custom fields from replicated fields

* Get the count of custom and non-custom fields and assert

* Removed blank spaces around paranthesis

* Changed the assertions

* fixed assertions

* tdl-23781,23563,23654

* Add new tests

* review changes

* move streams_to test method to base class

---------

Co-authored-by: bhtowles <[email protected]>
Co-authored-by: JYOTHINARAYANSETTY <[email protected]>
Co-authored-by: Jyothi Narayanasetty <[email protected]>
  • Loading branch information
4 people authored Oct 9, 2023
1 parent 4c92b4d commit 5ae6b93
Show file tree
Hide file tree
Showing 6 changed files with 454 additions and 41 deletions.
280 changes: 267 additions & 13 deletions tests/sfbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
"""
import unittest
import os
import math
from datetime import timedelta
from datetime import datetime as dt

from tap_tester import connections, menagerie, runner, LOGGER
from tap_tester.base_suite_tests.base_case import BaseCase

Expand All @@ -22,7 +22,8 @@ class SFBaseTest(BaseCase):
total_quota = '95'
per_run_quota = None
# default start date which can be overridden in the tests
start_date = '2020-11-23T00:00:00Z'
start_date = '2000-11-23T00:00:00Z'
partitioned_streams = {}

@staticmethod
def tap_name():
Expand Down Expand Up @@ -847,15 +848,6 @@ def expected_metadata():
}


def expected_streams(self):
"""A set of expected stream names"""
streams = set(self.expected_metadata().keys())

if self.salesforce_api == 'BULK':
return streams.difference(self.rest_only_streams())
return streams


@staticmethod
def rest_only_streams():
"""A group of streams that is only discovered when the REST API is in use."""
Expand All @@ -873,6 +865,14 @@ def rest_only_streams():
'UndecidedEventRelation',
}

def expected_streams(self):
"""A set of expected stream names"""
streams = set(self.expected_metadata().keys())

if self.salesforce_api == 'BULK':
return streams.difference(self.rest_only_streams())
return streams

def set_replication_methods(self, conn_id, catalogs, replication_methods):

replication_keys = self.expected_replication_keys()
Expand All @@ -883,9 +883,13 @@ def set_replication_methods(self, conn_id, catalogs, replication_methods):

if replication_method == self.INCREMENTAL:
replication_key = list(replication_keys.get(catalog['stream_name']))[0]
replication_md = [{ "breadcrumb": [], "metadata": {'replication-key': replication_key, "replication-method" : replication_method, "selected" : True}}]
replication_md = [{ "breadcrumb": [],
"metadata": {'replication-key': replication_key,
"replication-method" : replication_method, "selected" : True}}]
else:
replication_md = [{ "breadcrumb": [], "metadata": {'replication-key': None, "replication-method" : "FULL_TABLE", "selected" : True}}]
replication_md = [{ "breadcrumb": [],
"metadata": {'replication-key': None,
"replication-method" : "FULL_TABLE", "selected" : True}}]

connections.set_non_discoverable_metadata(
conn_id, catalog, menagerie.get_annotated_schema(conn_id, catalog['stream_id']), replication_md)
Expand All @@ -900,3 +904,253 @@ def setUpClass(cls):

if missing_envs:
raise Exception("set environment variables")

def get_custom_fields(self, found_catalogs, conn_id):
""" List all the custom_fields for each stream"""
custom_fields = {}
for stream in self.streams_to_test():

catalog = [catalog for catalog in found_catalogs
if catalog["stream_name"] == stream][0]
schema = menagerie.get_annotated_schema(conn_id, catalog['stream_id'])["annotated-schema"]
custom_fields[stream] = {key for key in schema['properties'].keys()
if key.endswith("__c")}
return custom_fields

def get_non_custom_fields(self, found_catalogs, conn_id):
""" List all the non_custom_fields for each stream"""
non_custom_fields = {}
for stream in self.streams_to_test():
catalog = [catalog for catalog in found_catalogs
if catalog["stream_name"] == stream][0]
schema = menagerie.get_annotated_schema(conn_id, catalog['stream_id'])["annotated-schema"]
non_custom_fields[stream] = {key for key in schema['properties'].keys()
if not key.endswith("__c")
and schema['properties'][key]['inclusion'] != "unsupported"}
return non_custom_fields

@staticmethod
def count_custom_non_custom_fields(fields):
custom = 0
non_custom =0
for field in fields:
if not field.endswith("__c"):
non_custom += 1
else:
custom += 1
return (custom, non_custom)

@staticmethod
def get_streams_with_data():
#the streams listed here are the streams that have data currently
streams_with_data = {
'ActiveFeatureLicenseMetric',
'AppMenuItem',
'AuthSession',
'Account',
'Calendar',
'AppointmentSchedulingPolicy',
'Campaign',
'AssignmentRule',
'ActivePermSetLicenseMetric',
'AppDefinition',
'BusinessHours',
'ActiveProfileMetric',
'Calendar',
'ContentWorkspacePermission',
'CampaignMemberStatus',
'Community',
'Contact',
'Case',
'ClientBrowser',
'BusinessHours',
'ContentWorkspace',
'Campaign',
'FieldPermissions',
'Group',
'EventLogFile',
'FiscalYearSettings',
'ListView',
'LoginHistory',
'LeadStatus',
'Lead',
'LightningUsageByFlexiPageMetrics',
'FormulaFunctionAllowedType',
'LoginIp',
'LightningUsageByAppTypeMetrics',
'FileSearchActivity',
'FormulaFunctionCategory',
'LightningUsageByBrowserMetrics',
'Folder',
'FormulaFunction',
'MatchingRule',
'LightningUsageByPageMetrics',
'LoginGeo',
'FlowDefinitionView',
'LightningToggleMetrics',
'LightningExitByPageMetrics',
'PermissionSetTabSetting',
'MilestoneType',
'Period',
'MatchingRule',
'OpportunityStage',
'PlatformEventUsageMetric',
'Organization',
'OpportunityHistory',
'Pricebook2',
'PermissionSetLicense',
'ObjectPermissions',
'Opportunity',
'PermissionSetAssignment',
'OauthToken',
'PricebookEntry',
'Profile',
'PermissionSet',
'Product2',
'PromptAction',
'SetupEntityAccess',
'Profile',
'Publisher',
'ServiceSetupProvisioning',
'Report',
'Solution',
'PromptActionShare',
'SlaProcess',
'SetupAuditTrail',
'UiFormulaRule',
'WebLink',
'UserPermissionAccess',
'UserRole',
'TabDefinition',
'UserLogin',
'UserAppMenuItem',
'TenantUsageEntitlement',
'UserLicense',
'User',
'TapTester__c',
'SlaProcess',
'UserAppInfo',
'UiFormulaCriterion',
'Solution',
'FieldPermissions',
'EntityDefinition',
'ContentWorkspace',
'DuplicateRule',
'CronTrigger',
'Domain',
'ContentWorkspacePermission',
'EmailTemplate',
'EventLogFile',
'CronJobDetail',
'Entitlement',
}
return streams_with_data

@staticmethod
def get_unsupported_by_rest_api():
"""The streams listed here are not supported by the REST API"""
unsupported_streams = {
'Announcement',
'CollaborationGroupRecord',
'ContentDocumentLink',
'ContentFolderMember',
'DataStatistics',
'EntityParticle',
'FieldDefinition',
'FlexQueueItem',
'IdeaComment',
'OwnerChangeOptionInfo',
'PicklistValueInfo',
'PlatformAction',
'RelationshipDomain',
'RelationshipInfo',
'SearchLayout',
'SiteDetail',
'UserEntityAccess',
'UserFieldAccess',
'Vote',
'RecordActionHistory',
'FlowVersionView',
'FlowVariableView',
'AppTabMember',
'ColorDefinition',
'IconDefinition',
}

return unsupported_streams

def get_unsupported_by_bulk_api(self):
unsupported_streams_rest = self.get_unsupported_by_rest_api()
unsupported_streams_bulk_only= {
'AcceptedEventRelation',
'AssetTokenEvent',
'AttachedContentNote',
'CaseStatus',
'ContentFolderItem',
'ContractStatus',
'DeclinedEventRelation',
'EventWhoRelation',
'PartnerRole',
'QuoteTemplateRichTextData',
'RecentlyViewed',
'SolutionStatus',
'TaskPriority',
'TaskWhoRelation',
'TaskStatus',
'UndecidedEventRelation',
'OrderStatus',
'WorkOrderStatus',
'WorkOrderLineItemStatus',
'ServiceAppointmentStatus',
'ServiceAppointmentStatus',
'FieldSecurityClassification',
# BUG_TODO | the following streams are undocumented
'WorkStepStatus',
'ShiftStatus',
}

return unsupported_streams_bulk_only | unsupported_streams_rest

def is_unsupported_by_rest_api(self, stream):
"""returns True if stream is unsupported by REST API"""

return stream in self.get_unsupported_by_rest_api()

def is_unsupported_by_bulk_api(self, stream):
"""
returns True if stream is unsupported by BULK API
BULK API does not support any streams that are unsupported by the REST API and
in addition does not support the streams listed below.
"""
return stream in self.get_unsupported_by_bulk_api()

def partition_streams(self,list_of_streams):

weekday = dt.weekday(dt.now()) # weekdays 0-6, Mon-Sun
partition_size = math.ceil(len(list_of_streams)/7)

# if partition_size increases in a given week the start of subsequent slices will be pushed
# forward allowing for skipped streams, buffer start by 15 to help prevent this
start_of_slice = max(partition_size * weekday - 15, 0)
end_of_slice = min(partition_size * (weekday + 1), len(list_of_streams))
sorted_streams = sorted(list_of_streams)

LOGGER.info("Using weekday based subset of found_catalogs, weekday = %s", weekday)

# select certain... catalogs
if self.salesforce_api == 'BULK':
self.partitioned_streams = {stream
for stream in sorted_streams[start_of_slice:end_of_slice]
if not self.is_unsupported_by_bulk_api(stream)}
else:
self.partitioned_streams = {stream
for stream in sorted_streams[start_of_slice:end_of_slice]
if not self.is_unsupported_by_rest_api(stream)}

return self.partitioned_streams

def streams_to_test(self):
if self.partitioned_streams:
return self.partitioned_streams
return self.partition_streams(self.get_streams_with_data())
42 changes: 42 additions & 0 deletions tests/test_salesforce_all_fields_custom.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""
Test that with only custom fields selected for a stream automatic fields and custom fields are still replicated
"""
from tap_tester.base_suite_tests.all_fields_test import AllFieldsTest
from sfbase import SFBaseTest

class SFCustomFieldsTest(AllFieldsTest, SFBaseTest):

salesforce_api = 'BULK'

@staticmethod
def name():
return "tt_sf_all_fields_custom"

streams_to_test = SFBaseTest.streams_to_test

def streams_to_selected_fields(self):
found_catalogs = AllFieldsTest.found_catalogs
conn_id = AllFieldsTest.conn_id
custom_fields = self.get_custom_fields(found_catalogs, conn_id)
return custom_fields

def test_all_fields_for_streams_are_replicated(self):
for stream in self.streams_to_test():
with self.subTest(stream=stream):
automatic_fields = self.expected_automatic_fields(stream)
expected_custom_fields = self.selected_fields.get(stream, set()).union(automatic_fields)
replicated_custom_fields = self.actual_fields.get(stream, set())

#Verify that custom and automatic fields are replicated
self.assertSetEqual(expected_custom_fields, replicated_custom_fields,
msg = f"verify all fields are replicated for stream {stream}")

#Verify at least one custom field is replicated if exists
if len(expected_custom_fields) > len(automatic_fields):
self.assertGreater(len(replicated_custom_fields.difference(automatic_fields)),0,
msg = f"Replication didn't return any custom fields for stream {stream}")

#Verify that only custom fields are replicated besides automatic fields
_, num_non_custom = self.count_custom_non_custom_fields(replicated_custom_fields)
self.assertEqual(num_non_custom, len(automatic_fields),
msg = f"Replicated some fields that are not custom fields for stream {stream}")
Loading

0 comments on commit 5ae6b93

Please sign in to comment.