Skip to content

Commit

Permalink
Merge branch 'devel' into feat/improves-run-context
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Oct 12, 2024
2 parents 7d73296 + b717627 commit 01987e3
Show file tree
Hide file tree
Showing 6 changed files with 12,649 additions and 6 deletions.
13 changes: 10 additions & 3 deletions dlt/common/libs/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,12 @@ def should_normalize_arrow_schema(
naming: NamingConvention,
add_load_id: bool = False,
) -> Tuple[bool, Mapping[str, str], Dict[str, str], Dict[str, bool], bool, TTableSchemaColumns]:
"""Figure out if any of the normalization steps must be executed. This prevents
from rewriting arrow tables when no changes are needed. Refer to `normalize_py_arrow_item`
for a list of normalizations. Note that `column` must be already normalized.
"""
rename_mapping = get_normalized_arrow_fields_mapping(schema, naming)
# no clashes in rename ensured above
rev_mapping = {v: k for k, v in rename_mapping.items()}
nullable_mapping = {k: is_nullable_column(v) for k, v in columns.items()}
# All fields from arrow schema that have nullable set to different value than in columns
Expand Down Expand Up @@ -301,7 +306,8 @@ def normalize_py_arrow_item(
caps: DestinationCapabilitiesContext,
load_id: Optional[str] = None,
) -> TAnyArrowItem:
"""Normalize arrow `item` schema according to the `columns`.
"""Normalize arrow `item` schema according to the `columns`. Note that
columns must be already normalized.
1. arrow schema field names will be normalized according to `naming`
2. arrows columns will be reordered according to `columns`
Expand Down Expand Up @@ -366,13 +372,14 @@ def normalize_py_arrow_item(

def get_normalized_arrow_fields_mapping(schema: pyarrow.Schema, naming: NamingConvention) -> StrStr:
"""Normalizes schema field names and returns mapping from original to normalized name. Raises on name collisions"""
norm_f = naming.normalize_identifier
# use normalize_path to be compatible with how regular columns are normalized in dlt.Schema
norm_f = naming.normalize_path
name_mapping = {n.name: norm_f(n.name) for n in schema}
# verify if names uniquely normalize
normalized_names = set(name_mapping.values())
if len(name_mapping) != len(normalized_names):
raise NameNormalizationCollision(
f"Arrow schema fields normalized from {list(name_mapping.keys())} to"
f"Arrow schema fields normalized from:\n{list(name_mapping.keys())}:\nto:\n"
f" {list(normalized_names)}"
)
return name_mapping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,15 @@ def _double_as_decimal_adapter(table: sa.Table) -> None:
sql_alchemy_source = sql_database(
"mysql+pymysql://[email protected]:4497/Rfam?&binary_prefix=true",
backend="pyarrow",
backend_kwargs={"tz": "UTC"},
table_adapter_callback=_double_as_decimal_adapter
).with_resources("family", "genome")

info = pipeline.run(sql_alchemy_source)
print(info)
```
For more information on the `tz` parameter within `backend_kwargs` supported by PyArrow, please refer to the
[official documentation.](https://arrow.apache.org/docs/python/generated/pyarrow.timestamp.html)

### Pandas

Expand Down
4 changes: 2 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ pandas = [
{version = ">2.1", markers = "python_version >= '3.12'"},
{version = "<2.1", markers = "python_version < '3.12'"}
]
sqlglot = {version = ">=20.0.0"}

[tool.poetry.group.airflow]
optional = true
Expand Down
Loading

0 comments on commit 01987e3

Please sign in to comment.