Skip to content

Commit

Permalink
Implement incremental leads resource from /leads endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed Jul 3, 2023
1 parent 43555ca commit 8e9eba2
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 7 deletions.
33 changes: 32 additions & 1 deletion sources/pipedrive/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

from typing import Any, Dict, Iterator, List, Optional, Union, Iterable, Iterator, Tuple


import dlt

from .helpers.custom_fields_munger import update_fields_mapping, rename_fields
Expand Down Expand Up @@ -92,6 +91,8 @@ def pipedrive_source(
name="deals_flow", write_disposition="merge", primary_key="id"
)(_get_deals_flow)(pipedrive_api_key)

yield leads(pipedrive_api_key, update_time=since_timestamp)


def _get_deals_flow(
deals_page: TDataPage, pipedrive_api_key: str
Expand Down Expand Up @@ -167,3 +168,33 @@ def parsed_mapping(
}
for hash_string, names in data_item_mapping.items()
]


@dlt.resource(primary_key="id", write_disposition="merge")
def leads(
pipedrive_api_key: str = dlt.secrets.value,
update_time: Optional[dlt.sources.incremental[str]] = dlt.sources.incremental(
"update_time", "1970-01-01 00:00:00"
),
) -> Iterator[TDataPage]:
"""Resource to incrementally load pipedrive leads by update_time"""
# Leads inherit custom fields from deals
fields_mapping = (
dlt.current.source_state().get("custom_fields_mapping", {}).get("deals", {})
)
# Load leads pages sorted from newest to oldest and stop loading when
# last incremental value is reached
last_value = update_time.last_value
pages = get_pages(
"leads",
pipedrive_api_key,
extra_params={"sort": "update_time DESC"},
)
for page in pages:
if last_value:
# Just check whether first item is lower, worst case we load 1 redundant page before break
first_item = page[0] if page else None
if first_item and first_item["update_time"] < last_value:
return
yield rename_fields(page, fields_mapping)
return
4 changes: 1 addition & 3 deletions sources/pipedrive/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
("pipeline", None, None),
("stage", None, None),
("user", None, None),
("lead", None, None),
]

RECENTS_ENTITIES = {
Expand All @@ -25,5 +24,4 @@
"product": "products",
"stage": "stages",
"user": "users",
"lead": "leads",
}
}
34 changes: 31 additions & 3 deletions tests/pipedrive/test_pipedrive_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,13 @@
from dlt.common.schema import Schema
from dlt.sources.helpers import requests

from sources.pipedrive import pipedrive_source
from tests.utils import ALL_DESTINATIONS, assert_load_info, assert_query_data
from sources.pipedrive import pipedrive_source, leads
from tests.utils import (
ALL_DESTINATIONS,
assert_load_info,
assert_query_data,
load_table_counts,
)
from sources.pipedrive.helpers.custom_fields_munger import (
update_fields_mapping,
rename_fields,
Expand Down Expand Up @@ -47,7 +52,6 @@
"files",
"activityTypes",
"notes",
"leads",
}
)

Expand All @@ -73,6 +77,30 @@ def test_all_resources(destination_name: str) -> None:
assert "deals_flow_deal_change" in schema_tables


@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
def test_leads_resource_incremental(destination_name: str) -> None:
pipeline = dlt.pipeline(
pipeline_name="pipedrive",
destination=destination_name,
dataset_name="pipedrive_data",
full_refresh=True,
)
# Load all leads from beginning
data = leads()
info = pipeline.run(data, write_disposition="append")
assert_load_info(info)

counts = load_table_counts(pipeline, "leads")
assert counts["leads"] > 1

# Run again and assert no new data is added
data = leads()
info = pipeline.run(data, write_disposition="append")
assert_load_info(info, expected_load_packages=0)

assert load_table_counts(pipeline, "leads") == counts


@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
def test_custom_fields_munger(destination_name: str) -> None:
pipeline = dlt.pipeline(
Expand Down

0 comments on commit 8e9eba2

Please sign in to comment.