Skip to content

Commit

Permalink
black
Browse files Browse the repository at this point in the history
  • Loading branch information
AstrakhantsevaAA committed Aug 3, 2023
1 parent cf79af5 commit 1aa28ad
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 17 deletions.
4 changes: 2 additions & 2 deletions sources/unstructured_data/google_drive/helpers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import io
from typing import Any

from googleapiclient.errors import HttpError # type: ignore
from googleapiclient.http import MediaIoBaseDownload # type: ignore
from googleapiclient.errors import HttpError # type: ignore
from googleapiclient.http import MediaIoBaseDownload # type: ignore


def download_file_from_google_drive(service: Any, file_id: str, file_path: str) -> None:
Expand Down
7 changes: 5 additions & 2 deletions sources/unstructured_data/inbox/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ def inbox_source(
start_date=start_date,
)
if attachments:
return uids | get_attachments_by_uid(storage_folder_path=storage_folder_path, filter_by_mime_type=filter_by_mime_type)
return uids | get_attachments_by_uid(
storage_folder_path=storage_folder_path,
filter_by_mime_type=filter_by_mime_type,
)
else:
return uids | read_messages

Expand Down Expand Up @@ -163,7 +166,7 @@ def get_attachments_by_uid(
email_account: str = dlt.secrets.value,
password: str = dlt.secrets.value,
include_body: bool = False,
filter_by_mime_type: Sequence[str] = ()
filter_by_mime_type: Sequence[str] = (),
) -> TDataItem:
"""Downloads attachments from email messages based on the provided message UIDs.
Expand Down
2 changes: 1 addition & 1 deletion sources/unstructured_data/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
"invoice_number": "What is the invoice number? Just return the number. If you don't know, then return None",
"service_description": "What is the description of the service that this invoice is for? Just return the description. If you don't know, then return None",
"phone_number": "What is the company phone number? Just return the phone number. If you don't know, then return None",
}
}
8 changes: 6 additions & 2 deletions sources/unstructured_data_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ def from_google_drive_to_structured() -> None:
full_refresh=True,
)

data_source = google_drive_source(download=True, filter_by_mime_type=("application/pdf", ))
data_source = google_drive_source(
download=True, filter_by_mime_type=("application/pdf",)
)
data_resource = data_source.resources["attachments"]

# run the pipeline with your parameters
Expand All @@ -65,7 +67,9 @@ def from_inbox_to_structured() -> None:
full_refresh=True,
)

data_source = inbox_source(attachments=True, filter_by_mime_type=("application/pdf",))
data_source = inbox_source(
attachments=True, filter_by_mime_type=("application/pdf",)
)
data_resource = data_source.resources["attachments"]
# run the pipeline with your parameters
load_info = pipeline.run(
Expand Down
33 changes: 23 additions & 10 deletions tests/unstructured_data/test_unstructured_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
from dlt.extract.source import DltResource

from sources.unstructured_data import unstructured_to_structured_resource
from sources.unstructured_data.local_folder import local_folder_resource
from sources.unstructured_data.google_drive import google_drive_source
from sources.unstructured_data.inbox import inbox_source
from sources.unstructured_data.local_folder import local_folder_resource

from tests.utils import ALL_DESTINATIONS, assert_load_info

Expand Down Expand Up @@ -38,7 +38,9 @@ class TestUnstructuredFromLocalFolder:
@pytest.fixture
def data_resource(self, data_dir: str) -> DltResource:
resource = local_folder_resource(data_dir=data_dir)
filtered_data_resource = resource.add_filter(lambda item: item["content_type"] == "application/pdf")
filtered_data_resource = resource.add_filter(
lambda item: item["content_type"] == "application/pdf"
)
return filtered_data_resource

@pytest.mark.parametrize("run_async", (False, True))
Expand Down Expand Up @@ -75,7 +77,9 @@ def test_content(
queries: dict,
data_resource: DltResource,
) -> None:
filtered_data_resource = data_resource.add_filter(lambda item: item["content_type"] == "application/pdf")
filtered_data_resource = data_resource.add_filter(
lambda item: item["content_type"] == "application/pdf"
)
pipeline, _ = run_pipeline(destination_name, queries, filtered_data_resource)
with pipeline.sql_client() as c:
# you can use parametrized queries as well, see python dbapi
Expand All @@ -90,7 +94,12 @@ def test_content(
@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
class TestUnstructuredFromGoogleDrive:
@pytest.fixture(scope="session")
def data_resource(self, tmpdir_factory, gd_folders: Sequence[str], filter_by_mime_type: Sequence[str] = ()) -> DltResource:
def data_resource(
self,
tmpdir_factory,
gd_folders: Sequence[str],
filter_by_mime_type: Sequence[str] = (),
) -> DltResource:
tmp_path = tmpdir_factory.mktemp("temp_data")
source = google_drive_source(
download=True,
Expand All @@ -99,7 +108,9 @@ def data_resource(self, tmpdir_factory, gd_folders: Sequence[str], filter_by_mim
filter_by_mime_type=filter_by_mime_type,
)
resource = source.resources["attachments"]
filtered_data_resource = resource.add_filter(lambda item: item["content_type"] == "application/pdf")
filtered_data_resource = resource.add_filter(
lambda item: item["content_type"] == "application/pdf"
)
return filtered_data_resource

@pytest.mark.parametrize("run_async", (False, True))
Expand Down Expand Up @@ -157,7 +168,9 @@ def data_resource(self, tmpdir_factory) -> DltResource:
storage_folder_path=tmp_path,
)
resource = source.resources["attachments"]
filtered_data_resource = resource.add_filter(lambda item: item["content_type"] == "application/pdf")
filtered_data_resource = resource.add_filter(
lambda item: item["content_type"] == "application/pdf"
)
return filtered_data_resource

@pytest.mark.parametrize("run_async", (False, True))
Expand Down Expand Up @@ -202,17 +215,17 @@ def test_content(
"SELECT file_path FROM unstructured_from_attachments"
) as cur:
rows = list(cur.fetchall())
assert len(rows) == 4 # 4 files were processed, other content types were skipped except pdf
assert (
len(rows) == 4
) # 4 files were processed, other content types were skipped except pdf

def test_incremental_loading(
self,
destination_name: str,
queries: dict,
data_resource: DltResource,
) -> None:
pipeline, load_info = run_pipeline(
destination_name, queries, data_resource
)
pipeline, load_info = run_pipeline(destination_name, queries, data_resource)
# make sure all data were loaded
assert_load_info(load_info)
print(load_info)
Expand Down

0 comments on commit 1aa28ad

Please sign in to comment.