Skip to content

Commit

Permalink
changes airtable IDs to dlthub's CI account, adds schema hints to sil…
Browse files Browse the repository at this point in the history
…ence warnings and improve documentation

refactoring according to latest recommendations of pyairtable

simplify airtable filters by following the paradigm of the pyairtable API which treats table names and table IDs equally

updates pyairtable dependency
  • Loading branch information
willi-mueller committed Aug 29, 2023
1 parent def5819 commit 79685c0
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 45 deletions.
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,12 @@ tiktoken = "^0.4.0"
[tool.poetry.group.mongodb.dependencies]
pymongo = "^4.3.3"

[tool.poetry.group.airtable.dependencies]
pyairtable = "^2.1.0.post1"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.black]
include = '.*py$'
include = '.*py$'
40 changes: 31 additions & 9 deletions sources/airtable/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,40 @@ This [dlt source](https://dlthub.com/docs/general-usage/source) creates a [dlt r
1. identify the *base* you want to load from. The ID starts with "app". See [how to find airtable IDs](https://support.airtable.com/docs/finding-airtable-ids)
2. identify the *airtables* you want to load. You can identify in three ways:

| filter | Description |
| ---------------- | -----------------------------------------------------------|
| table_names | retrieves *airtables* from a given *base* for a list of user-defined mutable names of tables |
| table_ids | retrieves *airtables* from a given *base* for a list of immutable IDs defined by airtable.com at the creation time of the *airtable*. IDs start with "tbl". See [how to find airtable IDs](https://support.airtable.com/docs/finding-airtable-ids) |
| empty filter | retrieves all *airtables* from a given *base* |
1. retrieve *airtables* from a given *base* for a list of user-defined mutable names of tables
2. retrieve *airtables* from a given *base* for a list of immutable IDs defined by airtable.com at the creation time of the *airtable*. IDs start with "tbl". See [how to find airtable IDs](https://support.airtable.com/docs/finding-airtable-ids)
3. empty filter: retrieve all *airtables* from a given *base*


## Supported write dispositions
This connector supports the write disposition `replace`, i.e. it does a [full load](https://dlthub.com/docs/general-usage/full-loading) on every invocation.
We do not support `append`, i.e. [incremental loading](https://dlthub.com/docs/general-usage/incremental-loading) because a *base* can contain only [up to 50k records in the most expensive plan](https://support.airtable.com/docs/airtable-plans).

If resource consumption for data loading becomes a concern in practice [request](https://github.com/dlt-hub/verified-sources/issues/new/choose) the `append` loading method.
To use support `append`, i.e. [incremental loading](https://dlthub.com/docs/general-usage/incremental-loading) there are two possibilities:

### Parametrize the `pipeline.run` method

```python
event_base = airtable_source(
base_id="app7RlqvdoOmJm9XR",
table_names=["💰 Budget"],
)
load_info = pipeline.run(event_base, write_disposition="replace")
```

## Customize the resource using the `apply_hints` method

This approach further allows to [adjust the schema](https://dlthub.com/docs/general-usage/resource#adjust-schema)
```python
event_base = airtable_source(
base_id="app7RlqvdoOmJm9XR",
table_names=["💰 Budget"],
)
event_base.resources["💰 Budget"].apply_hints(
write_disposition="merge",
columns={"Item": {"name": "Item", "data_type": "text"}},
)
load_info = pipeline.run(event_base)
```


## Initialize the pipeline
Expand All @@ -38,8 +60,8 @@ If resource consumption for data loading becomes a concern in practice [request]
dlt init airtable duckdb
```

Here, we chose duckdb as the destination. Alternatively, you can also choose redshift, bigquery, or
any of the other [destinations](https://dlthub.com/docs/dlt-ecosystem/destinations/).
Here, we chose duckdb as the destination. Alternatively, you can also choose redshift, bigquery, or any of the other
[destinations](https://dlthub.com/docs/dlt-ecosystem/destinations/).


## Add credentials
Expand Down
28 changes: 13 additions & 15 deletions sources/airtable/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,55 +14,53 @@
@dlt.source
def airtable_source(
base_id: str,
table_ids: Optional[List[str]] = None,
table_names: Optional[List[str]] = None,
access_token: str = dlt.secrets.value,
) -> Iterable[DltResource]:
"""
Represents tables for a single Airtable base.
Args:
base_id (str): The id of the base. Obtain it e.g. from the URL in your webbrowser. It starts with "app". See https://support.airtable.com/docs/finding-airtable-ids
table_ids (Optional[List[str]]): A list of table ids to load. By default, all tables in the schema are loaded. Starts with "tbl". See https://support.airtable.com/docs/finding-airtable-ids
table_names (Optional[List[str]]): A list of table names to load. By default, all tables in the schema are loaded. Table names can change and thus filtering on names is less reliable than on ids.
table_names (Optional[List[str]]): A list of table IDs or table names to load. Unless specified otherwise, all tables in the schema are loaded. Names are freely user-defined. IDs start with "tbl". See https://support.airtable.com/docs/finding-airtable-ids
access_token (str): The personal access token. See https://support.airtable.com/docs/creating-and-using-api-keys-and-access-tokens#personal-access-tokens-basic-actions
"""
api = pyairtable.Api(access_token)
all_tables_url = api.build_url(f"meta/bases/{base_id}/tables")
tables = api.request(method="GET", url=all_tables_url).get("tables")
for t in tables:
if table_ids:
if t.get("id") in table_ids:
yield airtable_resource(access_token, base_id, t)
elif table_names:
if t.get("name") in table_names:
yield airtable_resource(access_token, base_id, t)
if table_names:
if t.get("id") in table_names or t.get("name") in table_names:
yield airtable_resource(api, base_id, t)
else:
yield airtable_resource(access_token, base_id, t)
yield airtable_resource(api, base_id, t)


def airtable_resource(
access_token: str,
api: pyairtable.Api,
base_id: str,
table: Dict[str, Any],
) -> DltResource:
"""
Represents a single airtable.
Args:
api (pyairtable.Api): The API connection object
base_id (str): The id of the base. Obtain it e.g. from the URL in your webbrowser. It starts with "app". See https://support.airtable.com/docs/finding-airtable-ids
access_token (str): The personal access token. See https://support.airtable.com/docs/creating-and-using-api-keys-and-access-tokens#personal-access-tokens-basic-actions
table (TDataItem): The iterable created by pyairtable representing the data of an airtable
table (Dict[str, Any]): Metadata about an airtable, does not contain the actual records
"""
primary_key_id = table["primaryFieldId"]
primary_key_field = [
field for field in table["fields"] if field["id"] == primary_key_id
][0]
table_name: str = table["name"]
primary_key: List[str] = [primary_key_field["name"]]
air_table = pyairtable.Table(access_token, base_id, table["id"])
air_table = api.table(base_id, table["id"])

# Table.iterate() supports rich customization options, such as chunk size, fields, cell format, timezone, locale, and view
air_table_generator: Iterator[List[Any]] = air_table.iterate()

return dlt.resource(
air_table_generator,
name=table_name,
primary_key=primary_key,
write_disposition="replace", # using a typed parameter crashes the typing
write_disposition="replace",
)
2 changes: 1 addition & 1 deletion sources/airtable/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
pyairtable~=2.0
pyairtable~=2.1
dlt>=0.3.8
57 changes: 40 additions & 17 deletions sources/airtable_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,64 @@
def load_entire_base(pipeline: dlt.Pipeline) -> None:
# Loads all tables inside a given base.
# Find the base ID starting with "app". See https://support.airtable.com/docs/finding-airtable-ids
all_event_planning_tables = airtable_source(base_id="appctwIznRf5lqe62")
all_event_planning_tables = airtable_source(base_id="app7RlqvdoOmJm9XR")

# typing columns to silence warnings
all_event_planning_tables.resources["📆 Schedule"].apply_hints(
columns={"Activity": {"name": "Activity", "data_type": "text"}}
)
all_event_planning_tables.resources["🎤 Speakers"].apply_hints(
columns={"Name": {"name": "Name", "data_type": "text"}}
)
all_event_planning_tables.resources["🪑 Attendees"].apply_hints(
columns={"Name": {"name": "Name", "data_type": "text"}}
)
all_event_planning_tables.resources["💰 Budget"].apply_hints(
columns={"Item": {"name": "Item", "data_type": "text"}}
)

load_info = pipeline.run(all_event_planning_tables, write_disposition="replace")
print(load_info)


def load_select_tables_from_base_by_id(pipeline: dlt.Pipeline) -> None:
# Loads specific table IDs.
# Starts with "tbl". See https://support.airtable.com/docs/finding-airtable-ids
# See example: https://airtable.com/appctwIznRf5lqe62/tblPjXnwd3V2RWgJS/
# See example: https://airtable.com/app7RlqvdoOmJm9XR/tblKHM5s3AujfSbAH
airtables = airtable_source(
base_id="appctwIznRf5lqe62",
table_ids=["tblPjXnwd3V2RWgJS", "tbltdCacZQPxI7fV0"],
base_id="app7RlqvdoOmJm9XR",
table_names=["tblKHM5s3AujfSbAH", "tbloBrS8PnoO63aMP"],
)

load_info = pipeline.run(airtables, write_disposition="replace")
print(load_info)


def load_select_tables_from_base_by_name(pipeline: dlt.Pipeline) -> None:
# Loads specific table names.
# Filtering by names is less reliable than filtering on IDs because names can be changed by Airtable users.
# See example: https://airtable.com/appctwIznRf5lqe62/tblOe4fjtZfnvqAHd/
budget_table = airtable_source(
base_id="appctwIznRf5lqe62",
# See example: https://airtable.com/app7RlqvdoOmJm9XR/tblJCTXfjwOETmvy2/
event_base = airtable_source(
base_id="app7RlqvdoOmJm9XR",
table_names=["💰 Budget"],
)
load_info = pipeline.run(budget_table, write_disposition="replace")
event_base.resources["💰 Budget"].apply_hints(
primary_key="Item", columns={"Item": {"name": "Item", "data_type": "text"}}
)
load_info = pipeline.run(event_base, write_disposition="replace")
print(load_info)


def load_table_for_ci(pipeline: dlt.Pipeline) -> None:
# Setup for CI of dlt hub
questionnaire_table = airtable_source(
base_id="appcChDyP0pZeC76v", table_ids=["tbl1sN4CpPv8pBll4"]
def load_and_customize_write_disposition(pipeline: dlt.Pipeline) -> None:
questionnaire = airtable_source(
base_id="appcChDyP0pZeC76v", table_names=["tbl1sN4CpPv8pBll4"]
)
questionnaire.resources["Sheet1"].apply_hints(
primary_key="Name",
columns={"Name": {"name": "Name", "data_type": "text"}},
write_disposition="merge",
)
load_info = pipeline.run(questionnaire_table, write_disposition="replace")
load_info = pipeline.run(questionnaire)
print(load_info)


Expand All @@ -49,7 +72,7 @@ def load_table_for_ci(pipeline: dlt.Pipeline) -> None:
pipeline_name="airtable", destination="duckdb", dataset_name="airtable_data"
)

load_table_for_ci(pipeline)
# load_select_tables_from_base_by_id(pipeline)
# load_select_tables_from_base_by_name(pipeline)
# load_entire_base(pipeline)
load_entire_base(pipeline)
load_select_tables_from_base_by_id(pipeline)
load_select_tables_from_base_by_name(pipeline)
load_and_customize_write_disposition(pipeline)
4 changes: 2 additions & 2 deletions tests/airtable/test_airtable_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def make_pipeline(destination_name: str) -> dlt.Pipeline:
def test_load_table_by_id(destination_name: str) -> None:
pipeline = make_pipeline(destination_name)
questionnaire_table = airtable_source(
base_id="appcChDyP0pZeC76v", table_ids=["tbl1sN4CpPv8pBll4"]
base_id="appcChDyP0pZeC76v", table_names=["tbl1sN4CpPv8pBll4"]
)
run_single_table_assertions(pipeline, questionnaire_table)

Expand Down Expand Up @@ -59,7 +59,7 @@ def run_single_table_assertions(pipeline, questionnaire_table):

@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
def test_load_all_tables_in_base(destination_name: str) -> None:
all_event_planning_tables = airtable_source(base_id="appctwIznRf5lqe62")
all_event_planning_tables = airtable_source(base_id="app7RlqvdoOmJm9XR")
pipeline = make_pipeline(destination_name)
load_info = pipeline.run(all_event_planning_tables, write_disposition="replace")
assert_load_info(load_info)
Expand Down

0 comments on commit 79685c0

Please sign in to comment.