Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shopify: Incremental and other improvements #222

Merged
merged 14 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 1 addition & 30 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,39 +29,41 @@ requests-mock = "^1.10.0"
include = '.*py$'



[tool.poetry.group.sql_database.dependencies]
sqlalchemy = ">=1.4"
pymysql = "^1.0.3"



[tool.poetry.group.google_sheets.dependencies]
google-api-python-client = "^2.78.0"



[tool.poetry.group.google_analytics.dependencies]
google-analytics-data = "^0.16.2"
google-api-python-client = "^2.86.0"
google-auth-oauthlib = "^1.0.0"
requests-oauthlib = "^1.3.1"



[tool.poetry.group.stripe_analytics.dependencies]
pandas = "^2.0.0"
stripe = "^5.0.0"



[tool.poetry.group.asana_dlt.dependencies]
asana = "^3.2.1"


[tool.poetry.group.shopify_dlt.dependencies]
shopifyapi = "^12.3.0"


[tool.poetry.group.facebook_ads.dependencies]
facebook-business = "^16.0.2"



[tool.poetry.group.salesforce.dependencies]
simple-salesforce = "^1.12.4"

Expand Down
116 changes: 84 additions & 32 deletions sources/shopify_dlt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,100 +2,152 @@

from typing import Any, Dict, Iterator, Iterator, Optional, Iterable

import shopify

import dlt

from dlt.extract.source import DltResource
from dlt.common.typing import TDataItem
from dlt.common import pendulum

from .settings import DEFAULT_API_VERSION, FIRST_DAY_OF_MILLENNIUM
from .helpers import iterate_page
from .settings import (
DEFAULT_API_VERSION,
FIRST_DAY_OF_MILLENNIUM,
DEFAULT_ITEMS_PER_PAGE,
)
from .helpers import ShopifyApi, TOrderStatus
from .date_helper import TAnyDateTime, ensure_pendulum_datetime


@dlt.source(name="shopify")
def shopify_source(
private_app_password: str = dlt.secrets.value,
api_version: str = DEFAULT_API_VERSION,
shop_url: str = dlt.config.value,
start_date: Optional[str] = FIRST_DAY_OF_MILLENNIUM,
start_date: TAnyDateTime = FIRST_DAY_OF_MILLENNIUM,
end_date: Optional[TAnyDateTime] = None,
created_at_min: TAnyDateTime = FIRST_DAY_OF_MILLENNIUM,
items_per_page: int = DEFAULT_ITEMS_PER_PAGE,
order_status: TOrderStatus = "any",
) -> Iterable[DltResource]:
"""
The source for the Shopify pipeline. Available resources are products, orders, and customers.

`start_time` argument can be used on its own or together with `end_time`. When both are provided
data is limited to items updated in that time range.
The range is "half-open", meaning elements equal and newer than `start_time` and elements older than `end_time` are included.

Args:
private_app_password (str): The app password to the app on your shop.
api_version (str): The API version to use.
shop_url (str): The URL of your shop.
start_date (Optional[str]): The date from which to import items. It will import items initially created on or after this date.
private_app_password: The app password to the app on your shop.
api_version: The API version to use (e.g. 2023-01).
shop_url: The URL of your shop (e.g. https://my-shop.myshopify.com).
items_per_page: The max number of items to fetch per page. Defaults to 250.
start_date: Items updated on or after this date are imported. Defaults to 2000-01-01.
If end date is not provided, this is used as the initial value for incremental loading and after the initial run, only new data will be retrieved.
Accepts any `date`/`datetime` object or a date/datetime string in ISO 8601 format.
end_time: The end time of the range for which to load data.
Should be used together with `start_date` to limit the data to items updated in that time range.
If end time is not provided, the incremental loading will be enabled and after initial run, only new data will be retrieved
created_at_min: The minimum creation date of items to import. Items created on or after this date are loaded. Defaults to 2000-01-01.
order_status: The order status to filter by. Can be 'open', 'closed', 'cancelled', or 'any'. Defaults to 'any'.

Returns:
Iterable[DltResource]: A list of DltResource objects representing the data resources.
"""

# build client
session = shopify.Session(shop_url, api_version, private_app_password)
shopify.ShopifyResource.activate_session(session)
client = ShopifyApi(shop_url, private_app_password, api_version)

start_date_obj = ensure_pendulum_datetime(start_date)
end_date_obj = ensure_pendulum_datetime(end_date) if end_date else None
created_at_min_obj = ensure_pendulum_datetime(created_at_min)

# define resources
@dlt.resource(primary_key="id", write_disposition="merge")
def products(
updated_at: dlt.sources.incremental[str] = dlt.sources.incremental(
"updated_at", initial_value=FIRST_DAY_OF_MILLENNIUM
)
updated_at: dlt.sources.incremental[
pendulum.DateTime
] = dlt.sources.incremental(
"updated_at", initial_value=start_date_obj, end_value=end_date_obj
),
created_at_min: pendulum.DateTime = created_at_min_obj,
items_per_page: int = items_per_page,
) -> Iterable[TDataItem]:
"""
The resource for products on your shop, supports incremental loading and pagination.

Args:
updated_at (dlt.sources.incremental[str]): The saved state of the last 'updated_at' value.
updated_at: The saved state of the last 'updated_at' value.

Returns:
Iterable[TDataItem]: A generator of products.
"""
page = shopify.Product.find(
updated_at_min=updated_at.last_value, created_at_min=start_date
params = dict(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah interesting question if we should set created_at_min=start_date as we did in the previous version.

  • if we set it items that were created before start_date will never be extracted, even if they were updated after that date
  • if we do not set it, any updated item may be extracted
    @adrianbr any opinions? if not, I'd set it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better if we use a separate min_created_at argument in the source I think.
It's a little problematic with end value loading to use the same start date, because an item's created and updated can each be in range of different chunks so they'd never get loaded even if it should be in the total range.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes you are right. we need to handle backloading differently.

  • when end date is specified we use only created_at_min and created_at_max to select range
  • when we have incremental load we specify created_at_min (set to start_date - which is also initial_value) and updated_at_min set to last_value
    there's a grey are here: should we also track updates of records from backloaded range? in case of the above it is not the case. we'd need another parameter to set created_at and updated_at separately

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm yeah I'd say it makes sense to load updates from backloaded range too.
What is the idea behind the created_at cutoff originally?
I'm struggling with seeing the usefulness in it beyond just filtering by updated_at. I'd think old orders you want to ignore aren't updated anyway usually.

Copy link
Contributor

@rudolfix rudolfix Jul 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are always getting the most recent state of the entity and update/create just filter which entities you want to get. so there's no way IMO to get previous state of it. anyway. I kind of agree with what you say.

  1. let's have our start/end date settings working only with updated_at_min and updated_at_max (this one you need to add)
  2. I'd add another argument created_at_min to cutoff the entities created before that date. the argument is optional and defaults to FIRST_DAY_OF_MILLENNIUM

the typical usage would be like in the incremental_load_with_backloading example: backloading and then incremental load. all based on updated_at.
On top of that we give possibility to filter out all records that were created before initial value of current_start_date in the demo. so the updated_at_min argument is always set to it, during backloading and on incremental load.

WDYT?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes a lot of sense, and I think it's more clear with the separate argument.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added created_at_min arg now and test for it.
updated_at_max is used now too.

updated_at_min=updated_at.last_value.isoformat(),
limit=items_per_page,
order="updated_at asc",
created_at_min=created_at_min.isoformat(),
)
yield iterate_page(page)
if updated_at.end_value is not None:
params["updated_at_max"] = updated_at.end_value.isoformat()
yield from client.get_pages("products", params)

@dlt.resource(primary_key="id", write_disposition="merge")
def orders(
updated_at: dlt.sources.incremental[str] = dlt.sources.incremental(
"updated_at", initial_value=FIRST_DAY_OF_MILLENNIUM
)
updated_at: dlt.sources.incremental[
pendulum.DateTime
] = dlt.sources.incremental(
"updated_at", initial_value=start_date_obj, end_value=end_date_obj
),
created_at_min: pendulum.DateTime = created_at_min_obj,
items_per_page: int = items_per_page,
status: TOrderStatus = order_status,
) -> Iterable[TDataItem]:
"""
The resource for orders on your shop, supports incremental loading and pagination.

Args:
updated_at (dlt.sources.incremental[str]): The saved state of the last 'updated_at' value.
updated_at: The saved state of the last 'updated_at' value.

Returns:
Iterable[TDataItem]: A generator of orders.
"""
page = shopify.Order.find(
updated_at_min=updated_at.last_value, created_at_min=start_date
params = dict(
updated_at_min=updated_at.last_value.isoformat(),
limit=items_per_page,
status=status,
order="updated_at asc",
created_at_min=created_at_min.isoformat(),
)
yield iterate_page(page)
if updated_at.end_value is not None:
params["updated_at_max"] = updated_at.end_value.isoformat()
yield from client.get_pages("orders", params)

@dlt.resource(primary_key="id", write_disposition="merge")
def customers(
updated_at: dlt.sources.incremental[str] = dlt.sources.incremental(
"updated_at", initial_value=FIRST_DAY_OF_MILLENNIUM
)
updated_at: dlt.sources.incremental[
pendulum.DateTime
] = dlt.sources.incremental(
"updated_at", initial_value=start_date_obj, end_value=end_date_obj
),
created_at_min: pendulum.DateTime = created_at_min_obj,
items_per_page: int = items_per_page,
) -> Iterable[TDataItem]:
"""
The resource for customers on your shop, supports incremental loading and pagination.

Args:
updated_at (dlt.sources.incremental[str]): The saved state of the last 'updated_at' value.
updated_at: The saved state of the last 'updated_at' value.

Returns:
Iterable[TDataItem]: A generator of customers.
"""
page = shopify.Customer.find(
updated_at_min=updated_at.last_value, created_at_min=start_date
params = dict(
updated_at_min=updated_at.last_value.isoformat(),
limit=items_per_page,
order="updated_at asc",
created_at_min=created_at_min.isoformat(),
)
yield iterate_page(page)
if updated_at.end_value is not None:
params["updated_at_max"] = updated_at.end_value.isoformat()
yield from client.get_pages("customers", params)

return (products, orders, customers)
35 changes: 35 additions & 0 deletions sources/shopify_dlt/date_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from typing import Union, Optional
from datetime import datetime, date # noqa: I251

from dlt.common import pendulum
from dlt.common.time import parse_iso_like_datetime


TAnyDateTime = Union[pendulum.DateTime, pendulum.Date, datetime, date, str]


def ensure_pendulum_datetime(value: TAnyDateTime) -> pendulum.DateTime:
"""Coerce a date/time value to a `pendulum.DateTime` object.

UTC is assumed if the value is not timezone aware.

Args:
value: The value to coerce. Can be a pendulum.DateTime, pendulum.Date, datetime, date or iso date/time str.

Returns:
A timezone aware pendulum.DateTime object.
"""
if isinstance(value, datetime):
# both py datetime and pendulum datetime are handled here
ret = pendulum.instance(value)
if ret.tz is None:
return ret.in_tz("UTC")
return ret
elif isinstance(value, date):
return pendulum.datetime(value.year, value.month, value.day)
elif isinstance(value, str):
result = parse_iso_like_datetime(value)
if not isinstance(result, datetime): # TODO: iso date parses to date object
return pendulum.datetime(result.year, result.month, result.day)
return result
raise TypeError(f"Cannot coerce {value} to a pendulum.DateTime object.")
Loading
Loading