-
Notifications
You must be signed in to change notification settings - Fork 71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement all_fields test using new framework-tdl-7499 #159
Changes from 30 commits
2f55a8e
c2366e5
86bed57
be7de62
f73479c
410d972
8d48a2d
b576c02
e6ed1a0
3b928f8
8370c1d
683f332
33ca793
5e1bf83
ab06d29
a5481e3
409f816
34c2458
9776995
23a0816
ab4f841
8e6b8d4
89f629a
390660b
cac3542
9815beb
8d4caad
724f1d5
ae0f9a9
0c5127b
b662005
4a903b0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,9 +5,10 @@ | |
""" | ||
import unittest | ||
import os | ||
import math | ||
from datetime import timedelta | ||
from datetime import datetime as dt | ||
|
||
from operator import itemgetter | ||
from tap_tester import connections, menagerie, runner, LOGGER | ||
from tap_tester.base_suite_tests.base_case import BaseCase | ||
|
||
|
@@ -22,7 +23,8 @@ class SFBaseTest(BaseCase): | |
total_quota = '95' | ||
per_run_quota = None | ||
# default start date which can be overridden in the tests | ||
start_date = '2020-11-23T00:00:00Z' | ||
start_date = '2000-11-23T00:00:00Z' | ||
partitioned_streams = [] | ||
|
||
@staticmethod | ||
def tap_name(): | ||
|
@@ -34,6 +36,19 @@ def get_type(): | |
"""the expected url route ending""" | ||
return "platform.salesforce" | ||
|
||
|
||
@staticmethod | ||
def count_custom_non_custom_fields(fields): | ||
custom = 0 | ||
non_custom =0 | ||
for field in fields: | ||
if not field.endswith("__c"): | ||
non_custom += 1 | ||
else: | ||
custom += 1 | ||
return (custom, non_custom) | ||
|
||
|
||
def get_properties(self): | ||
"""Configuration properties required for the tap.""" | ||
|
||
|
@@ -847,15 +862,6 @@ def expected_metadata(): | |
} | ||
|
||
|
||
def expected_streams(self): | ||
"""A set of expected stream names""" | ||
streams = set(self.expected_metadata().keys()) | ||
|
||
if self.salesforce_api == 'BULK': | ||
return streams.difference(self.rest_only_streams()) | ||
return streams | ||
|
||
|
||
@staticmethod | ||
def rest_only_streams(): | ||
"""A group of streams that is only discovered when the REST API is in use.""" | ||
|
@@ -873,6 +879,14 @@ def rest_only_streams(): | |
'UndecidedEventRelation', | ||
} | ||
|
||
def expected_streams(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some of these self references may have to change to cls references for some of the methods according to recent changes in sfbase. |
||
"""A set of expected stream names""" | ||
streams = set(self.expected_metadata().keys()) | ||
|
||
if self.salesforce_api == 'BULK': | ||
return streams.difference(self.rest_only_streams()) | ||
return streams | ||
|
||
def set_replication_methods(self, conn_id, catalogs, replication_methods): | ||
|
||
replication_keys = self.expected_replication_keys() | ||
|
@@ -883,9 +897,13 @@ def set_replication_methods(self, conn_id, catalogs, replication_methods): | |
|
||
if replication_method == self.INCREMENTAL: | ||
replication_key = list(replication_keys.get(catalog['stream_name']))[0] | ||
replication_md = [{ "breadcrumb": [], "metadata": {'replication-key': replication_key, "replication-method" : replication_method, "selected" : True}}] | ||
replication_md = [{ "breadcrumb": [], | ||
"metadata": {'replication-key': replication_key, | ||
"replication-method" : replication_method, "selected" : True}}] | ||
else: | ||
replication_md = [{ "breadcrumb": [], "metadata": {'replication-key': None, "replication-method" : "FULL_TABLE", "selected" : True}}] | ||
replication_md = [{ "breadcrumb": [], | ||
"metadata": {'replication-key': None, | ||
"replication-method" : "FULL_TABLE", "selected" : True}}] | ||
|
||
connections.set_non_discoverable_metadata( | ||
conn_id, catalog, menagerie.get_annotated_schema(conn_id, catalog['stream_id']), replication_md) | ||
|
@@ -900,3 +918,241 @@ def setUpClass(cls): | |
|
||
if missing_envs: | ||
raise Exception("set environment variables") | ||
|
||
def get_custom_fields(self, found_catalogs, conn_id): | ||
""" List all the custom_fields for each stream""" | ||
custom_fields = {} | ||
for stream in self.streams_to_test(): | ||
with self.subTest(stream=stream): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think there's any assertions in this function, so we can remove this subTest line There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, will do that |
||
|
||
catalog = [catalog for catalog in found_catalogs if catalog["stream_name"] == stream][0] | ||
schema = menagerie.get_annotated_schema(conn_id, catalog['stream_id'])["annotated-schema"] | ||
custom_fields[stream] = {key for key in schema['properties'].keys() if key.endswith("__c")} | ||
return custom_fields | ||
|
||
def get_non_custom_fields(self, found_catalogs, conn_id): | ||
""" List all the non_custom_fields for each stream""" | ||
non_custom_fields = {} | ||
for stream in self.streams_to_test(): | ||
with self.subTest(stream=stream): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think there's any assertions in this function, so we can remove this subTest line |
||
catalog = [catalog for catalog in found_catalogs if catalog["stream_name"] == stream][0] | ||
schema = menagerie.get_annotated_schema(conn_id, catalog['stream_id'])["annotated-schema"] | ||
non_custom_fields[stream] = {key for key in schema['properties'].keys() if not key.endswith("__c") | ||
and schema['properties'][key]['inclusion']!="unsupported"} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Optional: I personally think if there's an non_custom_fields[stream] = {key
for key in schema['properties'].keys()
if not key.endswith("__c")
and schema['properties'][key]['inclusion'] != "unsupported"} |
||
return non_custom_fields | ||
|
||
@staticmethod | ||
def count_custom_non_custom_fields(fields): | ||
custom = 0 | ||
non_custom =0 | ||
for field in fields: | ||
if not field.endswith("__c"): | ||
non_custom += 1 | ||
else: | ||
custom += 1 | ||
return (custom, non_custom) | ||
JYOTHINARAYANSETTY marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
@staticmethod | ||
def get_streams_with_data(): | ||
#the streams listed here are the streams that have data currently | ||
streams_with_data = { | ||
'ActiveFeatureLicenseMetric', | ||
'AppMenuItem', | ||
'AuthSession', | ||
'Account', | ||
'Calendar', | ||
'AppointmentSchedulingPolicy', | ||
'Campaign', | ||
'AssignmentRule', | ||
'ActivePermSetLicenseMetric', | ||
'AppDefinition', | ||
'BusinessHours', | ||
'ActiveProfileMetric','Calendar', | ||
'ContentWorkspacePermission', | ||
'CampaignMemberStatus', | ||
'Community', | ||
'Contact', | ||
'Case', | ||
'ClientBrowser', | ||
'BusinessHours', | ||
'ContentWorkspace', | ||
'Campaign','FieldPermissions', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason these are on the same line? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that was a miss , will fix it |
||
'Group', | ||
'EventLogFile', | ||
'FiscalYearSettings', | ||
'ListView', | ||
'LoginHistory', | ||
'LeadStatus', | ||
'Lead', | ||
'LightningUsageByFlexiPageMetrics', | ||
'FormulaFunctionAllowedType', | ||
'LoginIp', | ||
'LightningUsageByAppTypeMetrics', | ||
'FileSearchActivity', | ||
'FormulaFunctionCategory', | ||
'LightningUsageByBrowserMetrics', | ||
'Folder', | ||
'FormulaFunction', | ||
'MatchingRule', | ||
'LightningUsageByPageMetrics', | ||
'LoginGeo', | ||
'FlowDefinitionView', | ||
'LightningToggleMetrics', | ||
'LightningExitByPageMetrics','PermissionSetTabSetting', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason these are on the same line? |
||
'MilestoneType', | ||
'Period', | ||
'MatchingRule', | ||
'OpportunityStage', | ||
'PlatformEventUsageMetric', | ||
'Organization', | ||
'OpportunityHistory', | ||
'Pricebook2', | ||
'PermissionSetLicense', | ||
'ObjectPermissions', | ||
'Opportunity', | ||
'PermissionSetAssignment', | ||
'OauthToken', | ||
'PricebookEntry', | ||
'Profile', | ||
'PermissionSet', | ||
'Product2','PromptAction', | ||
'SetupEntityAccess', | ||
'Profile', | ||
'Publisher', | ||
'ServiceSetupProvisioning', | ||
'Report', | ||
'Solution', | ||
'PromptActionShare', | ||
'SlaProcess', | ||
'SetupAuditTrail','UiFormulaRule', | ||
'WebLink', | ||
'UserPermissionAccess', | ||
'UserRole', | ||
'TabDefinition', | ||
'UserLogin', | ||
'UserAppMenuItem', | ||
'TenantUsageEntitlement', | ||
'UserLicense', | ||
'User', | ||
'TapTester__c', | ||
'SlaProcess', | ||
'UserAppInfo', | ||
'UiFormulaCriterion', | ||
'Solution', | ||
'FieldPermissions', | ||
'EntityDefinition', | ||
'ContentWorkspace', | ||
'DuplicateRule', | ||
'CronTrigger', | ||
'Domain', | ||
'ContentWorkspacePermission', | ||
'EmailTemplate', | ||
'EventLogFile', | ||
'CronJobDetail', | ||
'Entitlement', | ||
} | ||
return streams_with_data | ||
|
||
@staticmethod | ||
def get_unsupported_by_rest_api(): | ||
"""The streams listed here are not supported by the REST API""" | ||
unsupported_streams = { | ||
'Announcement', | ||
'CollaborationGroupRecord', | ||
'ContentDocumentLink', | ||
'ContentFolderMember', | ||
'DataStatistics', | ||
'EntityParticle', | ||
'FieldDefinition', | ||
'FlexQueueItem', | ||
'IdeaComment', | ||
'OwnerChangeOptionInfo', | ||
'PicklistValueInfo', | ||
'PlatformAction', | ||
'RelationshipDomain', | ||
'RelationshipInfo', | ||
'SearchLayout', | ||
'SiteDetail', | ||
'UserEntityAccess', | ||
'UserFieldAccess', | ||
'Vote', | ||
'RecordActionHistory', | ||
'FlowVersionView', | ||
'FlowVariableView', | ||
'AppTabMember', | ||
'ColorDefinition', | ||
'IconDefinition', | ||
} | ||
|
||
return unsupported_streams | ||
|
||
def get_unsupported_by_bulk_api(self): | ||
unsupported_streams_rest = self.get_unsupported_by_rest_api() | ||
unsupported_streams_bulk_only= { | ||
'AcceptedEventRelation', | ||
'AssetTokenEvent', | ||
'AttachedContentNote', | ||
'CaseStatus', | ||
'ContentFolderItem', | ||
'ContractStatus', | ||
'DeclinedEventRelation', | ||
'EventWhoRelation', | ||
'PartnerRole', | ||
'QuoteTemplateRichTextData', | ||
'RecentlyViewed', | ||
'SolutionStatus', | ||
'TaskPriority', | ||
'TaskWhoRelation', | ||
'TaskStatus', | ||
'UndecidedEventRelation', | ||
'OrderStatus', | ||
'WorkOrderStatus', | ||
'WorkOrderLineItemStatus', | ||
'ServiceAppointmentStatus', | ||
'ServiceAppointmentStatus', | ||
'FieldSecurityClassification', | ||
# BUG_TODO | the following streams are undocumented | ||
'WorkStepStatus', | ||
'ShiftStatus', | ||
} | ||
|
||
return unsupported_streams_bulk_only | unsupported_streams_rest | ||
|
||
def is_unsupported_by_rest_api(self, stream): | ||
"""returns True if stream is unsupported by REST API""" | ||
|
||
return stream in self.get_unsupported_by_rest_api() | ||
|
||
def is_unsupported_by_bulk_api(self, stream): | ||
""" | ||
returns True if stream is unsupported by BULK API | ||
|
||
BULK API does not support any streams that are unsupported by the REST API and | ||
in addition does not support the streams listed below. | ||
""" | ||
return stream in self.get_unsupported_by_bulk_api() | ||
|
||
def partition_streams(self,list_of_streams): | ||
|
||
weekday = dt.weekday(dt.now()) # weekdays 0-6, Mon-Sun | ||
luandy64 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
partition_size = math.ceil(len(list_of_streams)/7) | ||
|
||
# if partition_size increases in a given week the start of subsequent slices will be pushed | ||
# forward allowing for skipped streams, buffer start by 15 to help prevent this | ||
start_of_slice = max(partition_size * weekday - 15, 0) | ||
end_of_slice = min(partition_size * (weekday + 1), len(list_of_streams)) | ||
sorted_streams = sorted(list_of_streams) | ||
|
||
LOGGER.info("Using weekday based subset of found_catalogs, weekday = %s", weekday) | ||
|
||
# select certain... catalogs | ||
if self.salesforce_api == 'BULK': | ||
self.partitioned_streams = [stream | ||
for stream in sorted_streams[start_of_slice:end_of_slice] | ||
if not self.is_unsupported_by_bulk_api(stream)] | ||
else: | ||
self.partitioned_streams = [stream | ||
for stream in sorted_streams[start_of_slice:end_of_slice] | ||
if not self.is_unsupported_by_rest_api(stream)] | ||
|
||
return self.partitioned_streams |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,49 @@ | ||||||
""" | ||||||
Test that with only custom fields selected for a stream automatic fields and custom fields are still replicated | ||||||
""" | ||||||
import copy | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is unused |
||||||
from tap_tester.base_suite_tests.all_fields_test import AllFieldsTest | ||||||
from sfbase import SFBaseTest | ||||||
from tap_tester.logger import LOGGER | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is unused |
||||||
|
||||||
|
||||||
class SFCustomFieldsTest(AllFieldsTest, SFBaseTest): | ||||||
|
||||||
salesforce_api = 'BULK' | ||||||
|
||||||
@staticmethod | ||||||
def name(): | ||||||
return "tt_sf_all_fields_custom" | ||||||
|
||||||
|
||||||
def streams_to_test(self): | ||||||
if self.partitioned_streams: | ||||||
return self.partitioned_streams | ||||||
return self.partition_streams(self.get_streams_with_data()) | ||||||
|
||||||
|
||||||
def streams_to_selected_fields(self): | ||||||
found_catalogs = AllFieldsTest.found_catalogs | ||||||
conn_id = AllFieldsTest.conn_id | ||||||
custom_fields = self.get_custom_fields(found_catalogs, conn_id) | ||||||
return custom_fields | ||||||
|
||||||
def test_all_fields_for_streams_are_replicated(self): | ||||||
for stream in self.streams_to_test(): | ||||||
|
||||||
expected_custom_fields = self.selected_fields.get(stream, set()).union( | ||||||
self.expected_automatic_fields(stream)) | ||||||
replicated_custom_fields = self.actual_fields.get(stream, set()) | ||||||
|
||||||
#Verify at least one custom field is replicated | ||||||
self.assertIsNotNone(replicated_custom_fields.difference(self.expected_automatic_fields(stream)), | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This assert is always true because x is not None And here's what I get when I test this assert >>> set() is not None
True |
||||||
msg = f"Replication didn't return any custom fields for stream {stream}") | ||||||
|
||||||
#Verify that custom and automatic fields are replicated | ||||||
self.assertSetEqual(expected_custom_fields, replicated_custom_fields, | ||||||
logging=f"verify all fields are replicated for stream {stream}") | ||||||
|
||||||
#Verify that only custome fields are replicated besides automatic fields | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
num_custom, num_non_custom = self.count_custom_non_custom_fields(replicated_custom_fields) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Typically in python, when a value is not used, we assign it to
Suggested change
|
||||||
self.assertEqual(num_non_custom, len(self.expected_automatic_fields(stream)), | ||||||
"Replicated some fields that are not custom fields for stream {stream}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is unused