Skip to content

Commit

Permalink
skips tables without jobs when creating table chain jobs, deletes del…
Browse files Browse the repository at this point in the history
…ta table and arrow dataset instances
  • Loading branch information
rudolfix committed Sep 11, 2024
1 parent 6451bd7 commit 52e3f1e
Showing 1 changed file with 53 additions and 38 deletions.
91 changes: 53 additions & 38 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,55 +106,63 @@ class DeltaLoadFilesystemJob(FilesystemLoadJob):
def __init__(self, file_path: str) -> None:
super().__init__(file_path=file_path)

# create Arrow dataset from Parquet files
from dlt.common.libs.pyarrow import pyarrow as pa

self.file_paths = ReferenceFollowupJobRequest.resolve_references(self._file_path)
self.arrow_ds = pa.dataset.dataset(self.file_paths)

def make_remote_path(self) -> str:
# remote path is table dir - delta will create its file structure inside it
return self._job_client.get_table_dir(self.load_table_name)

def run(self) -> None:
logger.info(f"Will copy file(s) {self.file_paths} to delta table {self.make_remote_url()}")

# create Arrow dataset from Parquet files
from dlt.common.libs.pyarrow import pyarrow as pa
from dlt.common.libs.deltalake import write_delta_table, merge_delta_table

logger.info(
f"Will copy file(s) {self.file_paths} to delta table {self.make_remote_url()} [arrow"
f" buffer: {pa.total_allocated_bytes()}]"
)
source_ds = pa.dataset.dataset(self.file_paths)
delta_table = self._delta_table()

# explicitly check if there is data
# (https://github.com/delta-io/delta-rs/issues/2686)
if self.arrow_ds.head(1).num_rows == 0:
self._create_or_evolve_delta_table()
return

with self.arrow_ds.scanner().to_reader() as arrow_rbr: # RecordBatchReader
if self._load_table["write_disposition"] == "merge" and self._delta_table is not None:
self._load_table["x-merge-strategy"] = resolve_merge_strategy( # type: ignore[typeddict-unknown-key]
self._schema.tables, self._load_table, self._job_client.capabilities
)
merge_delta_table(
table=self._delta_table,
data=arrow_rbr,
schema=self._load_table,
)
else:
write_delta_table(
table_or_uri=(
self.make_remote_url() if self._delta_table is None else self._delta_table
),
data=arrow_rbr,
write_disposition=self._load_table["write_disposition"],
partition_by=self._partition_columns,
storage_options=self._storage_options,
)
if source_ds.head(1).num_rows == 0:
delta_table = self._create_or_evolve_delta_table(source_ds, delta_table)
else:
with source_ds.scanner().to_reader() as arrow_rbr: # RecordBatchReader
if self._load_table["write_disposition"] == "merge" and delta_table is not None:
self._load_table["x-merge-strategy"] = resolve_merge_strategy( # type: ignore[typeddict-unknown-key]
self._schema.tables, self._load_table, self._job_client.capabilities
)
merge_delta_table(
table=delta_table,
data=arrow_rbr,
schema=self._load_table,
)
else:
write_delta_table(
table_or_uri=(
self.make_remote_url() if delta_table is None else delta_table
),
data=arrow_rbr,
write_disposition=self._load_table["write_disposition"],
partition_by=self._partition_columns,
storage_options=self._storage_options,
)
# release memory ASAP by deleting objects explicitly
del source_ds
del delta_table
logger.info(
f"Copied {self.file_paths} to delta table {self.make_remote_url()} [arrow buffer:"
f" {pa.total_allocated_bytes()}]"
)

@property
def _storage_options(self) -> Dict[str, str]:
from dlt.common.libs.deltalake import _deltalake_storage_options

return _deltalake_storage_options(self._job_client.config)

@property
def _delta_table(self) -> Optional["DeltaTable"]: # type: ignore[name-defined] # noqa: F821
from dlt.common.libs.deltalake import try_get_deltatable

Expand All @@ -164,23 +172,23 @@ def _delta_table(self) -> Optional["DeltaTable"]: # type: ignore[name-defined]
def _partition_columns(self) -> List[str]:
return get_columns_names_with_prop(self._load_table, "partition")

def _create_or_evolve_delta_table(self) -> None:
def _create_or_evolve_delta_table(self, arrow_ds: "Dataset", delta_table: "DeltaTable") -> "DeltaTable": # type: ignore[name-defined] # noqa: F821
from dlt.common.libs.deltalake import (
DeltaTable,
ensure_delta_compatible_arrow_schema,
_evolve_delta_table_schema,
)

if self._delta_table is None:
DeltaTable.create(
if delta_table is None:
return DeltaTable.create(
table_uri=self.make_remote_url(),
schema=ensure_delta_compatible_arrow_schema(self.arrow_ds.schema),
schema=ensure_delta_compatible_arrow_schema(arrow_ds.schema),
mode="overwrite",
partition_by=self._partition_columns,
storage_options=self._storage_options,
)
else:
_evolve_delta_table_schema(self._delta_table, self.arrow_ds.schema)
return _evolve_delta_table_schema(delta_table, arrow_ds.schema)


class FilesystemLoadJobWithFollowup(HasFollowupJobs, FilesystemLoadJob):
Expand Down Expand Up @@ -643,6 +651,13 @@ def create_table_chain_completed_followup_jobs(
for job in completed_table_chain_jobs
if job.job_file_info.table_name == table["name"]
]
file_name = FileStorage.get_file_name_from_file_path(table_job_paths[0])
jobs.append(ReferenceFollowupJobRequest(file_name, table_job_paths))
if table_job_paths:
file_name = FileStorage.get_file_name_from_file_path(table_job_paths[0])
jobs.append(ReferenceFollowupJobRequest(file_name, table_job_paths))
else:
# file_name = ParsedLoadJobFileName(table["name"], "empty", 0, "reference").file_name()
# TODO: if we implement removal od orphaned rows, we may need to propagate such job without files
# to the delta load job
pass

return jobs

0 comments on commit 52e3f1e

Please sign in to comment.