Skip to content

Commit

Permalink
remove custom_ prefix from the table name
Browse files Browse the repository at this point in the history
  • Loading branch information
sgandhi1311 committed Nov 28, 2023
1 parent 62121c3 commit d0c3110
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 22 deletions.
34 changes: 19 additions & 15 deletions tap_hubspot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,16 +200,17 @@ def get_field_schema(field_type, extras=False):
}
}

def parse_custom_schema(entity_name, data):
if entity_name == "tickets":
def parse_custom_schema(entity_name, data, isCustomObject=False):

if isCustomObject:
return {
field['name']: get_field_type_schema(field['type'])
for field in data["results"]
for field in data
}
elif entity_name.startswith("custom_"):
if entity_name == "tickets":
return {
field['name']: get_field_type_schema(field['type'])
for field in data
for field in data["results"]
}

return {
Expand Down Expand Up @@ -1134,13 +1135,13 @@ def gen_request_custom_objects(tap_stream_id, url, params, path, more_key):
if params['after'] is None:
break

def sync_records(stream_id, primary_key, bookmark_key, catalog, STATE, params):
def sync_records(stream_id, primary_key, bookmark_key, catalog, STATE, params, isCustomObject=False):
"""
Synchronize records from a data source
"""
mdata = metadata.to_map(catalog.get('metadata'))
if stream_id.startswith("custom_"):
url = get_url("custom_objects", object_name=stream_id.split("custom_")[1])
if isCustomObject:
url = get_url("custom_objects", object_name=stream_id)
else:
url = get_url(stream_id)
max_bk_value = bookmark_value = utils.strptime_with_tz(
Expand Down Expand Up @@ -1190,7 +1191,7 @@ def sync_custom_object_records(STATE, ctx, stream_id):
'properties': get_selected_property_fields(catalog, mdata),
'archived': False
}
return sync_records(stream_id, primary_key, bookmark_key, catalog, STATE, params)
return sync_records(stream_id, primary_key, bookmark_key, catalog, STATE, params, isCustomObject=True)


@attr.s
Expand Down Expand Up @@ -1222,16 +1223,16 @@ class Stream:


def add_custom_streams(mode, catalog=None):
custom_objects_schema_url = get_url("custom_objects_schema")
if mode == "DISCOVER":
custom_objects_schema_url = get_url("custom_objects_schema")
# Load Hubspot's shared schemas
refs = load_shared_schema_refs()
try:
for custom_object in gen_request_custom_objects("custom_objects_schema", custom_objects_schema_url, {}, 'results', "paging"):
stream_id = "custom_" + custom_object["name"]
stream_id = custom_object["name"]
STREAMS.append(Stream(stream_id, sync_custom_object_records, ['id'], 'updatedAt', 'INCREMENTAL'))
schema = utils.load_json(get_abs_path('schemas/shared/custom_objects.json'))
custom_schema = parse_custom_schema(stream_id, custom_object["properties"])
custom_schema = parse_custom_schema(stream_id, custom_object["properties"], isCustomObject=True)
schema["properties"]["properties"] = {
"type": "object",
"properties": custom_schema,
Expand All @@ -1253,8 +1254,11 @@ def add_custom_streams(mode, catalog=None):
LOGGER.warning(warning_message)

elif mode == "SYNC":
custom_objects = [custom_object["name"] for custom_object in gen_request_custom_objects("custom_objects_schema", custom_objects_schema_url, {}, 'results', "paging")]
for stream in catalog["streams"]:
STREAMS.append(Stream(stream["tap_stream_id"], sync_custom_object_records, ['id'], 'updatedAt', 'INCREMENTAL'))
if stream["tap_stream_id"] in custom_objects:
STREAMS.append(Stream(stream["tap_stream_id"], sync_custom_object_records, ['id'], 'updatedAt', 'INCREMENTAL'))
return custom_objects

def load_shared_schema_refs():
shared_schemas_path = get_abs_path('schemas/shared')
Expand Down Expand Up @@ -1290,7 +1294,7 @@ def get_selected_streams(remaining_streams, ctx):
return selected_streams

def do_sync(STATE, catalog):
add_custom_streams(mode="SYNC", catalog=catalog)
custom_objects = add_custom_streams(mode="SYNC", catalog=catalog)
# Clear out keys that are no longer used
clean_state(STATE)

Expand All @@ -1307,7 +1311,7 @@ def do_sync(STATE, catalog):
singer.write_state(STATE)

try:
if stream.tap_stream_id.startswith("custom_"):
if stream.tap_stream_id in custom_objects:
stream.sync(STATE, ctx, stream.tap_stream_id)
else:
STATE = stream.sync(STATE, ctx) # pylint: disable=not-callable
Expand Down
14 changes: 7 additions & 7 deletions tap_hubspot/tests/unittests/test_custom_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
MOCK_CATALOG = {
"streams": [
{
"stream": "custom_cars",
"tap_stream_id": "custom_cars",
"stream": "cars",
"tap_stream_id": "cars",
"schema": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -105,7 +105,7 @@ def test_add_custom_streams(
mock_load_shared_schema_refs.assert_called_once()
mock_get_url.assert_called_once_with("custom_objects_schema")
mock_parse_custom_schema.assert_called_once_with(
"custom_fake_object", {"prop1": "type1", "prop2": "type2"}
"fake_object", {"prop1": "type1", "prop2": "type2"}, isCustomObject=True
)
mock_resolve_schema.assert_called_once_with(
{
Expand Down Expand Up @@ -135,9 +135,9 @@ def test_sync_custom_objects(
"""

# Set up mocks and fake data
STATE = {"currently_syncing": "custom_cars"}
STATE = {"currently_syncing": "cars"}
ctx = Context(MOCK_CATALOG)
stream_id = "custom_cars"
stream_id = "cars"
mock_custom_objects.return_value = [
{
"id": "11111",
Expand All @@ -146,8 +146,8 @@ def test_sync_custom_objects(
}
]
expected_output = {
"currently_syncing": "custom_cars",
"bookmarks": {"custom_cars": {"updatedAt": "2023-11-09T13:14:22.956000Z"}},
"currently_syncing": "cars",
"bookmarks": {"cars": {"updatedAt": "2023-11-09T13:14:22.956000Z"}},
}

# Call the function
Expand Down

0 comments on commit d0c3110

Please sign in to comment.