diff --git a/poetry.lock b/poetry.lock index c24ce3876..b64fc5cf6 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2383,19 +2383,6 @@ files = [ cffi = ">=1.0" six = "*" -[[package]] -name = "pyactiveresource" -version = "2.2.2" -description = "ActiveResource for Python" -optional = false -python-versions = "*" -files = [ - {file = "pyactiveresource-2.2.2.tar.gz", hash = "sha256:2f03844652dc206d9a086b0b15564d78dcec55786fa5fe0055dd2119e0dffdd8"}, -] - -[package.dependencies] -six = "*" - [[package]] name = "pyarrow" version = "12.0.0" @@ -2874,22 +2861,6 @@ docs = ["furo", "jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "pygments-g testing = ["build[virtualenv]", "filelock (>=3.4.0)", "flake8 (<5)", "flake8-2020", "ini2toml[lite] (>=0.9)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pip (>=19.1)", "pip-run (>=8.8)", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.3)", "pytest-flake8", "pytest-mypy (>=0.9.1)", "pytest-perf", "pytest-timeout", "pytest-xdist", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"] testing-integration = ["build[virtualenv]", "filelock (>=3.4.0)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pytest", "pytest-enabler", "pytest-xdist", "tomli", "virtualenv (>=13.0.0)", "wheel"] -[[package]] -name = "shopifyapi" -version = "12.3.0" -description = "Shopify API for Python" -optional = false -python-versions = "*" -files = [ - {file = "ShopifyAPI-12.3.0.tar.gz", hash = "sha256:416a2ee7c05b0d22182fbe8edfec4c8c3b37961aaae0e4c096738a82a0c543a4"}, -] - -[package.dependencies] -pyactiveresource = ">=2.2.2" -PyJWT = ">=2.0.0" -PyYAML = "*" -six = "*" - [[package]] name = "simple-salesforce" version = "1.12.4" @@ -3479,4 +3450,4 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more [metadata] lock-version = "2.0" python-versions = "^3.8.1" -content-hash = "28252ac76e7dafc7224b3150144153194dd3bc0dd8687477d167683d849a1988" +content-hash = "5666d06b2ada20ecd72d7bb270cd693fb2a966144ee5702956439d18c8bebe5b" diff --git a/pyproject.toml b/pyproject.toml index cb5cdcc74..4d11ada09 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,15 +29,18 @@ requests-mock = "^1.10.0" include = '.*py$' + [tool.poetry.group.sql_database.dependencies] sqlalchemy = ">=1.4" pymysql = "^1.0.3" + [tool.poetry.group.google_sheets.dependencies] google-api-python-client = "^2.78.0" + [tool.poetry.group.google_analytics.dependencies] google-analytics-data = "^0.16.2" google-api-python-client = "^2.86.0" @@ -45,23 +48,22 @@ google-auth-oauthlib = "^1.0.0" requests-oauthlib = "^1.3.1" + [tool.poetry.group.stripe_analytics.dependencies] pandas = "^2.0.0" stripe = "^5.0.0" + [tool.poetry.group.asana_dlt.dependencies] asana = "^3.2.1" -[tool.poetry.group.shopify_dlt.dependencies] -shopifyapi = "^12.3.0" - - [tool.poetry.group.facebook_ads.dependencies] facebook-business = "^16.0.2" + [tool.poetry.group.salesforce.dependencies] simple-salesforce = "^1.12.4" diff --git a/sources/shopify_dlt/__init__.py b/sources/shopify_dlt/__init__.py index c24116e5a..080375a75 100644 --- a/sources/shopify_dlt/__init__.py +++ b/sources/shopify_dlt/__init__.py @@ -2,15 +2,19 @@ from typing import Any, Dict, Iterator, Iterator, Optional, Iterable -import shopify - import dlt from dlt.extract.source import DltResource from dlt.common.typing import TDataItem +from dlt.common import pendulum -from .settings import DEFAULT_API_VERSION, FIRST_DAY_OF_MILLENNIUM -from .helpers import iterate_page +from .settings import ( + DEFAULT_API_VERSION, + FIRST_DAY_OF_MILLENNIUM, + DEFAULT_ITEMS_PER_PAGE, +) +from .helpers import ShopifyApi, TOrderStatus +from .date_helper import TAnyDateTime, ensure_pendulum_datetime @dlt.source(name="shopify") @@ -18,84 +22,132 @@ def shopify_source( private_app_password: str = dlt.secrets.value, api_version: str = DEFAULT_API_VERSION, shop_url: str = dlt.config.value, - start_date: Optional[str] = FIRST_DAY_OF_MILLENNIUM, + start_date: TAnyDateTime = FIRST_DAY_OF_MILLENNIUM, + end_date: Optional[TAnyDateTime] = None, + created_at_min: TAnyDateTime = FIRST_DAY_OF_MILLENNIUM, + items_per_page: int = DEFAULT_ITEMS_PER_PAGE, + order_status: TOrderStatus = "any", ) -> Iterable[DltResource]: """ The source for the Shopify pipeline. Available resources are products, orders, and customers. + `start_time` argument can be used on its own or together with `end_time`. When both are provided + data is limited to items updated in that time range. + The range is "half-open", meaning elements equal and newer than `start_time` and elements older than `end_time` are included. + Args: - private_app_password (str): The app password to the app on your shop. - api_version (str): The API version to use. - shop_url (str): The URL of your shop. - start_date (Optional[str]): The date from which to import items. It will import items initially created on or after this date. + private_app_password: The app password to the app on your shop. + api_version: The API version to use (e.g. 2023-01). + shop_url: The URL of your shop (e.g. https://my-shop.myshopify.com). + items_per_page: The max number of items to fetch per page. Defaults to 250. + start_date: Items updated on or after this date are imported. Defaults to 2000-01-01. + If end date is not provided, this is used as the initial value for incremental loading and after the initial run, only new data will be retrieved. + Accepts any `date`/`datetime` object or a date/datetime string in ISO 8601 format. + end_time: The end time of the range for which to load data. + Should be used together with `start_date` to limit the data to items updated in that time range. + If end time is not provided, the incremental loading will be enabled and after initial run, only new data will be retrieved + created_at_min: The minimum creation date of items to import. Items created on or after this date are loaded. Defaults to 2000-01-01. + order_status: The order status to filter by. Can be 'open', 'closed', 'cancelled', or 'any'. Defaults to 'any'. Returns: Iterable[DltResource]: A list of DltResource objects representing the data resources. """ # build client - session = shopify.Session(shop_url, api_version, private_app_password) - shopify.ShopifyResource.activate_session(session) + client = ShopifyApi(shop_url, private_app_password, api_version) + + start_date_obj = ensure_pendulum_datetime(start_date) + end_date_obj = ensure_pendulum_datetime(end_date) if end_date else None + created_at_min_obj = ensure_pendulum_datetime(created_at_min) # define resources @dlt.resource(primary_key="id", write_disposition="merge") def products( - updated_at: dlt.sources.incremental[str] = dlt.sources.incremental( - "updated_at", initial_value=FIRST_DAY_OF_MILLENNIUM - ) + updated_at: dlt.sources.incremental[ + pendulum.DateTime + ] = dlt.sources.incremental( + "updated_at", initial_value=start_date_obj, end_value=end_date_obj + ), + created_at_min: pendulum.DateTime = created_at_min_obj, + items_per_page: int = items_per_page, ) -> Iterable[TDataItem]: """ The resource for products on your shop, supports incremental loading and pagination. Args: - updated_at (dlt.sources.incremental[str]): The saved state of the last 'updated_at' value. + updated_at: The saved state of the last 'updated_at' value. Returns: Iterable[TDataItem]: A generator of products. """ - page = shopify.Product.find( - updated_at_min=updated_at.last_value, created_at_min=start_date + params = dict( + updated_at_min=updated_at.last_value.isoformat(), + limit=items_per_page, + order="updated_at asc", + created_at_min=created_at_min.isoformat(), ) - yield iterate_page(page) + if updated_at.end_value is not None: + params["updated_at_max"] = updated_at.end_value.isoformat() + yield from client.get_pages("products", params) @dlt.resource(primary_key="id", write_disposition="merge") def orders( - updated_at: dlt.sources.incremental[str] = dlt.sources.incremental( - "updated_at", initial_value=FIRST_DAY_OF_MILLENNIUM - ) + updated_at: dlt.sources.incremental[ + pendulum.DateTime + ] = dlt.sources.incremental( + "updated_at", initial_value=start_date_obj, end_value=end_date_obj + ), + created_at_min: pendulum.DateTime = created_at_min_obj, + items_per_page: int = items_per_page, + status: TOrderStatus = order_status, ) -> Iterable[TDataItem]: """ The resource for orders on your shop, supports incremental loading and pagination. Args: - updated_at (dlt.sources.incremental[str]): The saved state of the last 'updated_at' value. + updated_at: The saved state of the last 'updated_at' value. Returns: Iterable[TDataItem]: A generator of orders. """ - page = shopify.Order.find( - updated_at_min=updated_at.last_value, created_at_min=start_date + params = dict( + updated_at_min=updated_at.last_value.isoformat(), + limit=items_per_page, + status=status, + order="updated_at asc", + created_at_min=created_at_min.isoformat(), ) - yield iterate_page(page) + if updated_at.end_value is not None: + params["updated_at_max"] = updated_at.end_value.isoformat() + yield from client.get_pages("orders", params) @dlt.resource(primary_key="id", write_disposition="merge") def customers( - updated_at: dlt.sources.incremental[str] = dlt.sources.incremental( - "updated_at", initial_value=FIRST_DAY_OF_MILLENNIUM - ) + updated_at: dlt.sources.incremental[ + pendulum.DateTime + ] = dlt.sources.incremental( + "updated_at", initial_value=start_date_obj, end_value=end_date_obj + ), + created_at_min: pendulum.DateTime = created_at_min_obj, + items_per_page: int = items_per_page, ) -> Iterable[TDataItem]: """ The resource for customers on your shop, supports incremental loading and pagination. Args: - updated_at (dlt.sources.incremental[str]): The saved state of the last 'updated_at' value. + updated_at: The saved state of the last 'updated_at' value. Returns: Iterable[TDataItem]: A generator of customers. """ - page = shopify.Customer.find( - updated_at_min=updated_at.last_value, created_at_min=start_date + params = dict( + updated_at_min=updated_at.last_value.isoformat(), + limit=items_per_page, + order="updated_at asc", + created_at_min=created_at_min.isoformat(), ) - yield iterate_page(page) + if updated_at.end_value is not None: + params["updated_at_max"] = updated_at.end_value.isoformat() + yield from client.get_pages("customers", params) return (products, orders, customers) diff --git a/sources/shopify_dlt/date_helper.py b/sources/shopify_dlt/date_helper.py new file mode 100644 index 000000000..ed0e85ba6 --- /dev/null +++ b/sources/shopify_dlt/date_helper.py @@ -0,0 +1,35 @@ +from typing import Union, Optional +from datetime import datetime, date # noqa: I251 + +from dlt.common import pendulum +from dlt.common.time import parse_iso_like_datetime + + +TAnyDateTime = Union[pendulum.DateTime, pendulum.Date, datetime, date, str] + + +def ensure_pendulum_datetime(value: TAnyDateTime) -> pendulum.DateTime: + """Coerce a date/time value to a `pendulum.DateTime` object. + + UTC is assumed if the value is not timezone aware. + + Args: + value: The value to coerce. Can be a pendulum.DateTime, pendulum.Date, datetime, date or iso date/time str. + + Returns: + A timezone aware pendulum.DateTime object. + """ + if isinstance(value, datetime): + # both py datetime and pendulum datetime are handled here + ret = pendulum.instance(value) + if ret.tz is None: + return ret.in_tz("UTC") + return ret + elif isinstance(value, date): + return pendulum.datetime(value.year, value.month, value.day) + elif isinstance(value, str): + result = parse_iso_like_datetime(value) + if not isinstance(result, datetime): # TODO: iso date parses to date object + return pendulum.datetime(result.year, result.month, result.day) + return result + raise TypeError(f"Cannot coerce {value} to a pendulum.DateTime object.") diff --git a/sources/shopify_dlt/helpers.py b/sources/shopify_dlt/helpers.py index 1064bcb5f..00c6106c5 100644 --- a/sources/shopify_dlt/helpers.py +++ b/sources/shopify_dlt/helpers.py @@ -1,23 +1,76 @@ """Shopify source helpers""" +from urllib.parse import urljoin -from dlt.common.typing import TDataItem -from typing import Any, Iterable +from dlt.sources.helpers import requests +from dlt.common.typing import TDataItem, TDataItems, Dict +from typing import Any, Iterable, Optional, Literal +from .settings import DEFAULT_API_VERSION +from .date_helper import ensure_pendulum_datetime -def iterate_page(page: Any) -> Iterable[TDataItem]: +TOrderStatus = Literal["open", "closed", "cancelled", "any"] + + +class ShopifyApi: + """ + A Shopify API client that can be used to get pages of data from Shopify. """ - Iterates over all pages yielding all items - Args: - page (Any): The starting page object. + def __init__( + self, + shop_url: str, + private_app_password: str, + api_version: str = DEFAULT_API_VERSION, + ) -> None: + """ + Args: + shop_url: The URL of your shop (e.g. https://my-shop.myshopify.com). + private_app_password: The private app password to the app on your shop. + api_version: The API version to use (e.g. 2023-01) + """ + self.shop_url = shop_url + self.private_app_password = private_app_password + self.api_version = api_version - Yields: - TDataItem: Each item from the pages. + def get_pages( + self, resource: str, params: Optional[Dict[str, Any]] = None + ) -> Iterable[TDataItems]: + """Get all pages from shopify using requests. + Iterates through all pages and yield each page items. - Returns: - Iterable[TDataItem]: An iterable of items from the pages. - """ - while page: - for item in page: - yield item.to_dict() - page = page.next_page() if page.has_next_page() else None + Args: + resource: The resource to get pages for (e.g. products, orders, customers). + params: Query params to include in the request. + + Yields: + List of data items from the page + """ + url = urljoin(self.shop_url, f"/admin/api/{self.api_version}/{resource}.json") + + headers = {"X-Shopify-Access-Token": self.private_app_password} + while url: + response = requests.get(url, params=params, headers=headers) + response.raise_for_status() + json = response.json() + # Get item list from the page + yield [self._convert_datetime_fields(item) for item in json[resource]] + url = response.links.get("next", {}).get("url") + # Query params are included in subsequent page URLs + params = None + + def _convert_datetime_fields(self, item: Dict[str, Any]) -> Dict[str, Any]: + """Convert timestamp fields in the item to pendulum datetime objects + + The item is modified in place. + + Args: + item: The item to convert + + Returns: + The same data item (for convenience) + """ + fields = ["created_at", "updated_at"] + for field in fields: + if field in item: + item[field] = ensure_pendulum_datetime(item[field]) + return item diff --git a/sources/shopify_dlt/settings.py b/sources/shopify_dlt/settings.py index 81a485502..43a2f3939 100644 --- a/sources/shopify_dlt/settings.py +++ b/sources/shopify_dlt/settings.py @@ -1,2 +1,3 @@ FIRST_DAY_OF_MILLENNIUM = "2000-01-01" DEFAULT_API_VERSION = "2023-01" +DEFAULT_ITEMS_PER_PAGE = 250 diff --git a/sources/shopify_dlt_pipeline.py b/sources/shopify_dlt_pipeline.py index e65d869a0..86dbfbe65 100644 --- a/sources/shopify_dlt_pipeline.py +++ b/sources/shopify_dlt_pipeline.py @@ -2,18 +2,65 @@ """ import dlt -from typing import List -from shopify_dlt import shopify_source +from dlt.common import pendulum +from typing import List, Tuple +from shopify_dlt import shopify_source, TAnyDateTime -def load(resources: List[str], start_date: str) -> None: - """Execute a pipeline that will load all the resources for the given endpoints.""" +def load_all_resources(resources: List[str], start_date: TAnyDateTime) -> None: + """Execute a pipeline that will load the given Shopify resources incrementally beginning at the given start date. + Subsequent runs will load only items updated since the previous run. + """ pipeline = dlt.pipeline( pipeline_name="shopify", destination="duckdb", dataset_name="shopify_data" ) load_info = pipeline.run( - shopify_source(start_date=start_date).with_resources(*resources) + shopify_source(start_date=start_date).with_resources(*resources), + ) + print(load_info) + + +def incremental_load_with_backloading() -> None: + """Load past orders from Shopify in chunks of 1 week each using the start_date and end_date parameters. + This can useful to reduce the potiential failure window when loading large amounts of historic data. + Chunks and incremental load can also be run in parallel to speed up the initial load. + """ + + pipeline = dlt.pipeline( + pipeline_name="shopify", destination="duckdb", dataset_name="shopify_data" + ) + + # Load all orders from 2023-01-01 to now + min_start_date = current_start_date = pendulum.datetime(2023, 1, 1) + max_end_date = pendulum.now() + + # Create a list of time ranges of 1 week each, we'll use this to load the data in chunks + ranges: List[Tuple[pendulum.DateTime, pendulum.DateTime]] = [] + while current_start_date < max_end_date: + end_date = min(current_start_date.add(weeks=1), max_end_date) + ranges.append((current_start_date, end_date)) + current_start_date = end_date + + # Run the pipeline for each time range created above + for start_date, end_date in ranges: + print(f"Load orders between {start_date} and {end_date}") + # Create the source with start and end date set according to the current time range to filter + # created_at_min lets us set a cutoff to exclude orders created before the initial date of (2023-01-01) + # even if they were updated after that date + data = shopify_source( + start_date=start_date, end_date=end_date, created_at_min=min_start_date + ).with_resources("orders") + + load_info = pipeline.run(data) + print(load_info) + + # Continue loading new data incrementally starting at the end of the last range + # created_at_min still filters out items created before 2023-01-01 + load_info = pipeline.run( + shopify_source( + start_date=max_end_date, created_at_min=min_start_date + ).with_resources("orders") ) print(load_info) @@ -21,4 +68,6 @@ def load(resources: List[str], start_date: str) -> None: if __name__ == "__main__": # Add your desired resources to the list... resources = ["products", "orders", "customers"] - load(resources, start_date="2000-01-01") + load_all_resources(resources, start_date="2000-01-01") + + # incremental_load_with_backloading() diff --git a/tests/shopify_dlt/test_date_helper.py b/tests/shopify_dlt/test_date_helper.py new file mode 100644 index 000000000..37de9b5eb --- /dev/null +++ b/tests/shopify_dlt/test_date_helper.py @@ -0,0 +1,52 @@ +import pytest +from datetime import datetime, date, timezone # noqa: I251 +from dlt.common import pendulum + +from sources.shopify_dlt.date_helper import ensure_pendulum_datetime, TAnyDateTime + + +test_params = [ + # python datetime without tz + ( + datetime(2021, 1, 1, 0, 0, 0), + pendulum.datetime(2021, 1, 1, 0, 0, 0).in_tz("UTC"), + ), + # python datetime with tz + ( + datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone.utc), + pendulum.datetime(2021, 1, 1, 0, 0, 0).in_tz("UTC"), + ), + # python date object + (date(2021, 1, 1), pendulum.datetime(2021, 1, 1, 0, 0, 0).in_tz("UTC")), + # pendulum datetime with tz + ( + pendulum.datetime(2021, 1, 1, 0, 0, 0).in_tz("UTC"), + pendulum.datetime(2021, 1, 1, 0, 0, 0).in_tz("UTC"), + ), + # pendulum datetime without tz + ( + pendulum.datetime(2021, 1, 1, 0, 0, 0), + pendulum.datetime(2021, 1, 1, 0, 0, 0).in_tz("UTC"), + ), + # iso datetime in UTC + ("2021-01-01T00:00:00+00:00", pendulum.datetime(2021, 1, 1, 0, 0, 0).in_tz("UTC")), + # iso datetime with non utc tz + ( + "2021-01-01T00:00:00+05:00", + pendulum.datetime(2021, 1, 1, 0, 0, 0, tz=5), + ), + # iso datetime without tz + ( + "2021-01-01T05:02:32", + pendulum.datetime(2021, 1, 1, 5, 2, 32).in_tz("UTC"), + ), + # iso date + ("2021-01-01", pendulum.datetime(2021, 1, 1, 0, 0, 0).in_tz("UTC")), +] + + +@pytest.mark.parametrize("date_value, expected", test_params) +def test_ensure_pendulum_datetime( + date_value: TAnyDateTime, expected: pendulum.DateTime +) -> None: + assert ensure_pendulum_datetime(date_value) == expected diff --git a/tests/shopify_dlt/test_shopify_source.py b/tests/shopify_dlt/test_shopify_source.py index c76c23d34..02bd820b4 100644 --- a/tests/shopify_dlt/test_shopify_source.py +++ b/tests/shopify_dlt/test_shopify_source.py @@ -1,7 +1,15 @@ -from tests.utils import ALL_DESTINATIONS, assert_load_info, load_table_counts +import re +from urllib.parse import parse_qs, urlparse + import pytest +from requests_mock import Mocker import dlt +from dlt.common import pendulum +from dlt.sources.helpers import requests + +from tests.utils import ALL_DESTINATIONS, assert_load_info, load_table_counts from sources.shopify_dlt import shopify_source +from sources.shopify_dlt.date_helper import ensure_pendulum_datetime @pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) @@ -12,7 +20,8 @@ def test_all_resources(destination_name: str) -> None: dataset_name="shopify_data", full_refresh=True, ) - load_info = pipeline.run(shopify_source()) + # Set per page limit to ensure we use pagination + load_info = pipeline.run(shopify_source(items_per_page=5)) print(load_info) assert_load_info(load_info) table_names = [t["name"] for t in pipeline.default_schema.data_tables()] @@ -22,16 +31,16 @@ def test_all_resources(destination_name: str) -> None: expected_tables = ["products", "orders", "customers"] assert set(table_counts.keys()) > set(expected_tables) assert table_counts["products"] == 17 - assert table_counts["orders"] == 11 + assert table_counts["orders"] == 13 assert table_counts["customers"] == 3 # load again to check there are no dupicates - load_info = pipeline.run(shopify_source()) + load_info = pipeline.run(shopify_source(items_per_page=5)) table_names = [t["name"] for t in pipeline.default_schema.data_tables()] table_counts = load_table_counts(pipeline, *table_names) assert set(table_counts.keys()) > set(expected_tables) assert table_counts["products"] == 17 - assert table_counts["orders"] == 11 + assert table_counts["orders"] == 13 assert table_counts["customers"] == 3 @@ -43,9 +52,179 @@ def test_start_date() -> None: full_refresh=True, ) - # we only load objects created on 05.05. or after which is only one at this point + # we only load objects updated on 05.05. or after load_info = pipeline.run(shopify_source(start_date="2023-05-05")) assert_load_info(load_info) table_names = [t["name"] for t in pipeline.default_schema.data_tables()] table_counts = load_table_counts(pipeline, *table_names) - assert table_counts["orders"] == 1 + assert table_counts["orders"] == 4 + + +@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) +def test_end_date_incremental(destination_name: str) -> None: + """Load chunk with start/end date and incrementally with start date after""" + pipeline = dlt.pipeline( + pipeline_name="shopify", + destination="duckdb", + dataset_name="shopify_data", + full_refresh=True, + ) + + # Set start date to the exact timestamp of the first order in the test account + start_date = pendulum.DateTime(2023, 5, 4, 13, 39, 37) + # End date is exact timestamp of an order in the middle + end_date = pendulum.DateTime(2023, 5, 5, 12, 7, 15).in_tz("UTC") + + # Load orders between start/end date + # Use two different date formats and timezones to ensure they're handled correctly + data = shopify_source( + start_date=start_date, + end_date=end_date.in_timezone("EST").isoformat(), + items_per_page=5, + ).with_resources("orders") + + info = pipeline.run(data, write_disposition="append") + assert_load_info(info) + + with pipeline.sql_client() as client: + rows = [ + (row[0], pendulum.instance(row[1])) + for row in client.execute_sql( + "SELECT id, updated_at FROM orders ORDER BY updated_at" + ) + ] + + start_date_utc = start_date.in_tz("UTC") + dest_dates = [row[1] for row in rows] + + # Loaded range falls correctly between start/end date + assert min(dest_dates) == start_date_utc + assert start_date_utc < max(dest_dates) < end_date + + # Load again with incremental, starting at end_date + data = shopify_source(start_date=end_date, items_per_page=5).with_resources( + "orders" + ) + + info = pipeline.run(data, write_disposition="append") + assert_load_info(info) + + with pipeline.sql_client() as client: + rows2 = [ + (row[0], pendulum.instance(row[1])) + for row in client.execute_sql( + "SELECT id, updated_at FROM orders ORDER BY updated_at" + ) + ] + + dest_dates2 = [row[1] for row in rows2] + assert len(rows2) > len(rows) + assert end_date in dest_dates2 + assert max(dest_dates2) > end_date + + # No duplicates + assert len(rows2) == len(set(rows2)) + + +@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) +def test_order_status(destination_name: str) -> None: + pipeline = dlt.pipeline( + pipeline_name="shopify", + destination=destination_name, + dataset_name="shopify_data", + full_refresh=True, + ) + + data = shopify_source( + order_status="closed", + ).with_resources("orders") + + info = pipeline.run(data) + assert_load_info(info) + + # Check that all loaded orders are closed + with pipeline.sql_client() as client: + rows = [row[0] for row in client.execute_sql("SELECT closed_at FROM orders")] + + assert all(rows) + + +@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) +def test_min_created_at(destination_name: str) -> None: + pipeline = dlt.pipeline( + pipeline_name="shopify", + destination=destination_name, + dataset_name="shopify_data", + full_refresh=True, + ) + + # Order create before created_at_min, but updated after + # our start date should not be included. E.g. + # 3 | 2023-05-04 13:39:36+00 | 2023-07-24 18:20:09+00 + + # Set the min to 1s after the test order was created + created_at_min = ensure_pendulum_datetime("2023-05-04T13:39:37Z") + + data = shopify_source( + start_date="2020-01-01", + created_at_min=created_at_min, + ).with_resources("orders") + + info = pipeline.run(data) + + assert_load_info(info) + + with pipeline.sql_client() as client: + rows = [ + (ensure_pendulum_datetime(row[0]), ensure_pendulum_datetime(row[1])) + for row in client.execute_sql( + "SELECT updated_at, created_at FROM orders ORDER BY updated_at" + ) + ] + update_dates, create_dates = list(zip(*rows)) + + # All loaded create/update dates are higher than created_at_min + assert min(create_dates) >= created_at_min + assert min(update_dates) >= created_at_min + + +@pytest.mark.parametrize("resource_name", ["orders", "customers", "products"]) +def test_request_params(resource_name: str) -> None: + """Test source arguments are passed to the request query params""" + pipeline = dlt.pipeline( + pipeline_name="shopify", + dataset_name="shopify_data", + full_refresh=True, + ) + + data = shopify_source( + order_status="closed", + start_date="2023-05-05", + end_date="2023-05-06", + items_per_page=100, + created_at_min="2020-01-01", + ).with_resources(resource_name) + + with Mocker(session=requests.client.session) as m: + m.get( + re.compile(r"/{}.json".format(resource_name)), + json={resource_name: []}, + ) + + pipeline.extract(data) + + # verify the last request query params + params = parse_qs(urlparse(m.last_request.url).query) + + assert ensure_pendulum_datetime( + params["updated_at_min"][0] + ) == ensure_pendulum_datetime("2023-05-05") + assert ensure_pendulum_datetime( + params["updated_at_max"][0] + ) == ensure_pendulum_datetime("2023-05-06") + assert ensure_pendulum_datetime( + params["created_at_min"][0] + ) == ensure_pendulum_datetime("2020-01-01") + assert params["limit"] == ["100"] + if resource_name == "orders": + assert params["status"] == ["closed"]