Skip to content

Commit

Permalink
[FEAT] Public Delta Lake writer (#2329)
Browse files Browse the repository at this point in the history
Follow-up on #2304 with additional cleanup, parameters, and testing of
the Delta Lake writing functionality, now set to be a public API with
this PR!

This PR also renames `read_delta_lake` to `read_deltalake`, providing a
deprecation warning for the old name

Todo before merging:
- [x] Test on S3/AWS Glue
- [x] Add GIthub issues for future work
  - #2332
  • Loading branch information
kevinzwang authored Jun 4, 2024
1 parent b7295da commit 00eb8e1
Show file tree
Hide file tree
Showing 30 changed files with 447 additions and 326 deletions.
2 changes: 2 additions & 0 deletions daft/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def refresh_logger() -> None:
DataCatalogType,
from_glob_path,
read_csv,
read_deltalake,
read_delta_lake,
read_hudi,
read_iceberg,
Expand All @@ -103,6 +104,7 @@ def refresh_logger() -> None:
"read_parquet",
"read_hudi",
"read_iceberg",
"read_deltalake",
"read_delta_lake",
"read_sql",
"read_lance",
Expand Down
9 changes: 7 additions & 2 deletions daft/api_annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import functools
import inspect
import sys
from typing import Any, Callable, ForwardRef, TypeVar, Union, get_args, get_origin
from typing import Any, Callable, ForwardRef, Literal, TypeVar, Union, get_args, get_origin

if sys.version_info < (3, 10):
from typing_extensions import ParamSpec
Expand Down Expand Up @@ -70,11 +70,16 @@ def isinstance_helper(value: Any, T: Any) -> bool:
if isinstance(origin_T, type):
return isinstance(value, origin_T)

# T is a higher order type, like `typing.Union`
# T is a `typing.Union`
if origin_T is Union:
union_types = get_args(T)
return any(isinstance_helper(value, union_type) for union_type in union_types)

# T is a `typing.Literal`
if origin_T is Literal:
literal_values = get_args(T)
return value in literal_values

raise NotImplementedError(
f"Unexpected error: Type checking is not implemented for type {T}. Sorry! Please file an issue."
)
Expand Down
1 change: 0 additions & 1 deletion daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,6 @@ def set_execution_config(
with ctx._lock:
old_daft_execution_config = ctx._daft_execution_config if config is None else config

# TODO: Re-addd Parquet configs when we are ready to support Delta Lake writes
new_daft_execution_config = old_daft_execution_config.with_config_values(
scan_tasks_min_size_bytes=scan_tasks_min_size_bytes,
scan_tasks_max_size_bytes=scan_tasks_max_size_bytes,
Expand Down
14 changes: 1 addition & 13 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1449,7 +1449,7 @@ class LogicalPlanBuilder:
path: str,
columns_name: list[str],
mode: str,
current_version: int,
version: int,
large_dtypes: bool,
io_config: IOConfig | None = None,
) -> LogicalPlanBuilder: ...
Expand All @@ -1473,10 +1473,6 @@ class PyDaftExecutionConfig:
num_preview_rows: int | None = None,
parquet_target_filesize: int | None = None,
parquet_target_row_group_size: int | None = None,
parquet_max_open_files: int | None = None,
parquet_max_rows_per_file: int | None = None,
parquet_min_rows_per_group: int | None = None,
parquet_max_rows_per_group: int | None = None,
parquet_inflation_factor: float | None = None,
csv_target_filesize: int | None = None,
csv_inflation_factor: float | None = None,
Expand All @@ -1501,14 +1497,6 @@ class PyDaftExecutionConfig:
@property
def parquet_target_row_group_size(self) -> int: ...
@property
def parquet_max_open_files(self) -> int: ...
@property
def parquet_max_rows_per_file(self) -> int: ...
@property
def parquet_min_rows_per_group(self) -> int: ...
@property
def parquet_max_rows_per_group(self) -> int: ...
@property
def parquet_inflation_factor(self) -> float: ...
@property
def csv_target_filesize(self) -> int: ...
Expand Down
176 changes: 128 additions & 48 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
Iterable,
Iterator,
List,
Literal,
Mapping,
Optional,
Set,
Tuple,
TypeVar,
Union,
)
from urllib.parse import urlparse

from daft.api_annotations import DataframePublicAPI
from daft.context import get_context
Expand All @@ -33,15 +36,10 @@
IOConfig,
JoinStrategy,
JoinType,
NativeStorageConfig,
ResourceRequest,
StorageConfig,
)
from daft.dataframe.preview import DataFramePreview
from daft.datatype import DataType
from daft.delta_lake.delta_lake_storage_function import (
_storage_config_to_storage_options,
)
from daft.errors import ExpressionTypeError
from daft.expressions import Expression, ExpressionsProjection, col, lit
from daft.logical.builder import LogicalPlanBuilder
Expand All @@ -52,12 +50,15 @@

if TYPE_CHECKING:
import dask
import deltalake
import pandas
import pyarrow
import pyiceberg
import ray
import torch

from daft.io import DataCatalogTable

from daft.logical.schema import Schema

UDFReturnType = TypeVar("UDFReturnType", covariant=True)
Expand Down Expand Up @@ -549,62 +550,121 @@ def write_iceberg(self, table: "pyiceberg.table.Table", mode: str = "append") ->
# This is due to the fact that the logical plan of the write_iceberg returns datafiles but we want to return the above data
return with_operations

def write_delta(
@DataframePublicAPI
def write_deltalake(
self,
path: str,
mode: str = "append",
table: Union[str, pathlib.Path, "DataCatalogTable", "deltalake.DeltaTable"],
mode: Literal["append", "overwrite", "error", "ignore"] = "append",
schema_mode: Optional[Literal["merge", "overwrite"]] = None,
name: Optional[str] = None,
description: Optional[str] = None,
configuration: Optional[Mapping[str, Optional[str]]] = None,
custom_metadata: Optional[Dict[str, str]] = None,
dynamo_table_name: Optional[str] = None,
io_config: Optional[IOConfig] = None,
) -> None:
) -> "DataFrame":
"""Writes the DataFrame to a `Delta Lake <https://docs.delta.io/latest/index.html>`__ table, returning a new DataFrame with the operations that occurred.
.. NOTE::
This call is **blocking** and will execute the DataFrame when called
Args:
table (Union[str, pathlib.Path, DataCatalogTable, deltalake.DeltaTable]): Destination `Delta Lake Table <https://delta-io.github.io/delta-rs/api/delta_table/>`__ or table URI to write dataframe to.
mode (str, optional): Operation mode of the write. `append` will add new data, `overwrite` will replace table with new data, `error` will raise an error if table already exists, and `ignore` will not write anything if table already exists. Defaults to "append".
schema_mode (str, optional): Schema mode of the write. If set to `overwrite`, allows replacing the schema of the table when doing `mode=overwrite`. Schema mode `merge` is currently not supported.
name (str, optional): User-provided identifier for this table.
description (str, optional): User-provided description for this table.
configuration (Mapping[str, Optional[str]], optional): A map containing configuration options for the metadata action.
custom_metadata (Dict[str, str], optional): Custom metadata to add to the commit info.
dynamo_table_name (str, optional): Name of the DynamoDB table to be used as the locking provider if writing to S3.
io_config (IOConfig, optional): configurations to use when interacting with remote storage.
Returns:
DataFrame: The operations that occurred with this write.
"""

import json

import deltalake
import pyarrow as pa
from deltalake.schema import _convert_pa_schema_to_delta
from deltalake.writer import (
try_get_table_and_table_uri,
try_get_deltatable,
write_deltalake_pyarrow,
)
from packaging.version import parse

if mode not in ["append"]:
raise ValueError(f"Mode {mode} is not supported. Only 'append' mode is supported")
from daft import from_pydict
from daft.io import DataCatalogTable
from daft.io.object_store_options import io_config_to_storage_options

if schema_mode == "merge":
raise ValueError("Schema mode' merge' is not currently supported for write_deltalake.")

if parse(deltalake.__version__) < parse("0.14.0"):
raise ValueError(f"Write delta lake is only supported on deltalake>=0.14.0, found {deltalake.__version__}")

io_config = get_context().daft_planning_config.default_io_config if io_config is None else io_config
storage_config = StorageConfig.native(NativeStorageConfig(False, io_config))
storage_options = _storage_config_to_storage_options(storage_config, path)
table, table_uri = try_get_table_and_table_uri(path, storage_options)
if table is not None:

if isinstance(table, (str, pathlib.Path, DataCatalogTable)):
if isinstance(table, str):
table_uri = table
elif isinstance(table, pathlib.Path):
table_uri = str(table)
else:
table_uri = table.table_uri(io_config)

storage_options = io_config_to_storage_options(io_config, table_uri) or {}
table = try_get_deltatable(table_uri, storage_options=storage_options)
elif isinstance(table, deltalake.DeltaTable):
table_uri = table.table_uri
storage_options = table._storage_options or {}
storage_options.update(storage_options or {})
new_storage_options = io_config_to_storage_options(io_config, table_uri)
storage_options.update(new_storage_options or {})
else:
raise ValueError(f"Expected table to be a path or a DeltaTable, received: {type(table)}")

# see: https://delta-io.github.io/delta-rs/usage/writing/writing-to-s3-with-locking-provider/
scheme = urlparse(table_uri).scheme
if scheme == "s3" or scheme == "s3a":
if dynamo_table_name is not None:
storage_options["AWS_S3_LOCKING_PROVIDER"] = "dynamodb"
storage_options["DELTA_DYNAMO_TABLE_NAME"] = dynamo_table_name
else:
storage_options["AWS_S3_ALLOW_UNSAFE_RENAME"] = "true"
warnings.warn("No DynamoDB table specified for Delta Lake locking. Defaulting to unsafe writes.")

pyarrow_schema = pa.schema((f.name, f.dtype.to_arrow_dtype()) for f in self.schema())
delta_schema = _convert_pa_schema_to_delta(pyarrow_schema, large_dtypes=True)

if table:
table.update_incremental()

fields = [f for f in self.schema()]
pyarrow_fields = [pa.field(f.name, f.dtype.to_arrow_dtype()) for f in fields]
pyarrow_schema = pa.schema(pyarrow_fields)

delta_schema = _convert_pa_schema_to_delta(pyarrow_schema, large_dtypes=True)
if table:
if delta_schema != table.schema().to_pyarrow(as_large_types=True):
table_schema = table.schema().to_pyarrow(as_large_types=True)
if delta_schema != table_schema and not (mode == "overwrite" and schema_mode == "overwrite"):
raise ValueError(
"Schema of data does not match table schema\n"
f"Data schema:\n{delta_schema}\nTable Schema:\n{table.schema().to_pyarrow(as_large_types=True)}"
f"Data schema:\n{delta_schema}\nTable Schema:\n{table_schema}"
)
if mode == "error":
raise AssertionError("DeltaTable already exists.")
raise AssertionError("Delta table already exists, write mode set to error.")
elif mode == "ignore":
return

current_version = table.version()

return from_pydict(
{
"operation": pa.array([], type=pa.string()),
"rows": pa.array([], type=pa.int64()),
"file_size": pa.array([], type=pa.int64()),
"file_name": pa.array([], type=pa.string()),
}
)
version = table.version() + 1
else:
current_version = -1
version = 0

builder = self._builder.write_delta(
path=path,
mode=mode,
current_version=current_version,
builder = self._builder.write_deltalake(
table_uri,
mode,
version,
large_dtypes=True,
io_config=io_config,
)
Expand All @@ -617,13 +677,17 @@ def write_delta(
add_action = []

operations = []
respath = []
size = []
paths = []
rows = []
sizes = []

for data_file in data_files:
stats = json.loads(data_file.stats)
operations.append("ADD")
respath.append(data_file.path)
size.append(data_file.size)
paths.append(data_file.path)
rows.append(stats["numRecords"])
sizes.append(data_file.size)

add_action.append(data_file)

if table is None:
Expand All @@ -633,19 +697,35 @@ def write_delta(
add_action,
mode,
[],
storage_options=storage_options,
name,
description,
configuration,
storage_options,
custom_metadata,
)
else:
table._table.create_write_transaction(
add_action,
mode,
[],
delta_schema,
None,
)
if mode == "overwrite":
old_actions = table.get_add_actions()
old_actions_dict = old_actions.to_pydict()
for i in range(old_actions.num_rows):
operations.append("DELETE")
paths.append(old_actions_dict["path"][i])
rows.append(old_actions_dict["num_records"][i])
sizes.append(old_actions_dict["size_bytes"][i])

table._table.create_write_transaction(add_action, mode, [], delta_schema, None, custom_metadata)
table.update_incremental()

return None
with_operations = from_pydict(
{
"operation": pa.array(operations, type=pa.string()),
"rows": pa.array(rows, type=pa.int64()),
"file_size": pa.array(sizes, type=pa.int64()),
"file_name": pa.array([os.path.basename(fp) for fp in paths], type=pa.string()),
}
)

return with_operations

###
# DataFrame operations
Expand Down
Loading

0 comments on commit 00eb8e1

Please sign in to comment.