diff --git a/tap_hubspot/__init__.py b/tap_hubspot/__init__.py index d2e026b4..9a7ad37b 100644 --- a/tap_hubspot/__init__.py +++ b/tap_hubspot/__init__.py @@ -19,6 +19,7 @@ UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING, Transformer, _transform_datetime) + LOGGER = singer.get_logger() SESSION = requests.Session() @@ -1143,8 +1144,7 @@ def sync_records(stream_id, primary_key, bookmark_key, catalog, STATE, params): get_start(STATE, stream_id, bookmark_key)) LOGGER.info(f"Sync record for {stream_id} from {bookmark_value}") - - schema = load_schema(stream_id) + schema = catalog.get('schema') singer.write_schema(stream_id, schema, [primary_key], [bookmark_key], catalog.get('stream_alias')) @@ -1177,7 +1177,7 @@ def sync_records(stream_id, primary_key, bookmark_key, catalog, STATE, params): def sync_custom_objects(STATE, ctx): """ - Function to sync `custom_objects` schema records + Function to sync all the `custom_objects` schema """ catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE)) mdata = metadata.to_map(catalog.get('metadata')) @@ -1194,7 +1194,7 @@ def sync_custom_objects(STATE, ctx): def sync_custom_object_records(STATE, ctx, stream_id): """ - Function to sync `custom_'object'` stream records + Function to sync record for each `custom_object` stream """ catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE)) mdata = metadata.to_map(catalog.get('metadata')) @@ -1241,36 +1241,14 @@ class Stream: def add_custom_streams(mode): custom_objects_url = get_url("custom_objects") data = request(custom_objects_url).json() + # Load Hubspot's shared schemas + refs = load_shared_schema_refs() if "results" in data.keys(): for custom_object in data["results"]: stream_id = "custom_" + custom_object["name"] STREAMS.append(Stream(stream_id, sync_custom_object_records, ['id'], 'updatedAt', 'INCREMENTAL')) if mode == "DISCOVER": - schema = { - "type": "object", - "properties": { - "id": { - "type": [ - "null", - "string" - ] - }, - "createdAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "updatedAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "archived": { - "type": [ - "null", - "boolean" - ] - } - } - } + schema = utils.load_json(get_abs_path('schemas/shared/custom_object_record.json')) custom_schema = parse_custom_schema(stream_id, custom_object["properties"]) schema["properties"]["properties"] = { "type": "object", @@ -1281,12 +1259,25 @@ def add_custom_streams(mode): custom_schema_top_level = {'property_{}'.format(k): v for k, v in custom_schema.items()} schema['properties'].update(custom_schema_top_level) + final_schema = singer.resolve_schema_references(schema, refs) custom_schema_path = get_abs_path('schemas/{}.json'.format(stream_id)) # Write data to the JSON file with open(custom_schema_path, 'w') as json_file: - json.dump(schema, json_file) + json.dump(final_schema, json_file) + +def load_shared_schema_refs(): + shared_schemas_path = get_abs_path('schemas/shared') + + shared_file_names = [f for f in os.listdir(shared_schemas_path) + if os.path.isfile(os.path.join(shared_schemas_path, f))] + + shared_schema_refs = {} + for shared_file in shared_file_names: + with open(os.path.join(shared_schemas_path, shared_file)) as data_file: + shared_schema_refs[shared_file] = json.load(data_file) + return shared_schema_refs def get_streams_to_sync(streams, state): target_stream = singer.get_currently_syncing(state) diff --git a/tap_hubspot/schemas/shared/associations_schema.json b/tap_hubspot/schemas/shared/associations_schema.json new file mode 100644 index 00000000..87f71ff5 --- /dev/null +++ b/tap_hubspot/schemas/shared/associations_schema.json @@ -0,0 +1,34 @@ +{ + "type": [ + "null", + "object" + ], + "properties": { + "results": { + "type": [ + "null", + "array" + ], + "items": { + "type": [ + "null", + "object" + ], + "properties": { + "id": { + "type": [ + "null", + "string" + ] + }, + "type": { + "type": [ + "null", + "string" + ] + } + } + } + } + } +} diff --git a/tap_hubspot/schemas/shared/custom_object_record.json b/tap_hubspot/schemas/shared/custom_object_record.json new file mode 100644 index 00000000..7cf6395b --- /dev/null +++ b/tap_hubspot/schemas/shared/custom_object_record.json @@ -0,0 +1,44 @@ +{ + "type": "object", + "properties": { + "id": { + "type": [ + "null", + "string" + ] + }, + "createdAt": { + "type": ["null", "string"], + "format": "date-time" + }, + "updatedAt": { + "type": ["null", "string"], + "format": "date-time" + }, + "archived": { + "type": [ + "null", + "boolean" + ] + }, + "associations": { + "type": [ + "null", + "object" + ], + "properties": { + "emails": {"$ref": "associations_schema.json"}, + "meetings": {"$ref": "associations_schema.json"}, + "notes": {"$ref": "associations_schema.json"}, + "tasks": {"$ref": "associations_schema.json"}, + "calls": {"$ref": "associations_schema.json"}, + "conversation_session": {"$ref": "associations_schema.json"}, + "companies": {"$ref": "associations_schema.json"}, + "contacts": {"$ref": "associations_schema.json"}, + "deals": {"$ref": "associations_schema.json"}, + "products": {"$ref": "associations_schema.json"}, + "tickets": {"$ref": "associations_schema.json"} + } + } + } +}