Skip to content

Commit

Permalink
[CHORE] Populate previews only when show() or __repr__() is called (#…
Browse files Browse the repository at this point in the history
…1889)

Closes #1858 
Closes #1859 

Changes:
 - no more preview creation in `.collect()`
- added a `_populate_preview` method thats called during `repr` or
`show()`
 - changed some tests
  • Loading branch information
colin-ho authored Feb 20, 2024
1 parent f3b53ea commit b702e4d
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 53 deletions.
70 changes: 26 additions & 44 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def __init__(self, builder: LogicalPlanBuilder) -> None:
self.__builder = builder
self._result_cache: Optional[PartitionCacheEntry] = None
self._preview = DataFramePreview(preview_partition=None, dataframe_num_rows=None)
self._num_preview_rows = get_context().daft_execution_config.num_preview_rows

@property
def _builder(self) -> LogicalPlanBuilder:
Expand Down Expand Up @@ -225,13 +226,33 @@ def iter_partitions(self) -> Iterator[Union[MicroPartition, "RayObjectRef"]]:
for result in results_iter:
yield result.partition()

def _populate_preview(self) -> None:
"""Populates the preview of the DataFrame, if it is not already populated."""
if self._result is None:
return

preview_partition_invalid = (
self._preview.preview_partition is None or len(self._preview.preview_partition) < self._num_preview_rows
)
if preview_partition_invalid:
preview_parts = self._result._get_preview_vpartition(self._num_preview_rows)
preview_results = LocalPartitionSet({i: part for i, part in enumerate(preview_parts)})

preview_partition = preview_results._get_merged_vpartition()
self._preview = DataFramePreview(
preview_partition=preview_partition,
dataframe_num_rows=len(self),
)

@DataframePublicAPI
def __repr__(self) -> str:
self._populate_preview()
display = DataFrameDisplay(self._preview, self.schema())
return display.__repr__()

@DataframePublicAPI
def _repr_html_(self) -> str:
self._populate_preview()
display = DataFrameDisplay(self._preview, self.schema())
return display._repr_html_()

Expand Down Expand Up @@ -305,30 +326,7 @@ def _from_tables(cls, *parts: MicroPartition) -> "DataFrame":
df._result_cache = cache_entry

# build preview
num_preview_rows = context.daft_execution_config.num_preview_rows
dataframe_num_rows = len(df)
if dataframe_num_rows > num_preview_rows:
need = num_preview_rows
preview_parts = []
for part in parts:
part_len = len(part)
if part_len >= need: # if this part has enough rows, take what we need and break
preview_parts.append(part.slice(0, need))
break
else: # otherwise, take the whole part and keep going
need -= part_len
preview_parts.append(part)

preview_results = LocalPartitionSet({i: part for i, part in enumerate(preview_parts)})
else:
preview_results = result_pset

# set preview
preview_partition = preview_results._get_merged_vpartition()
df._preview = DataFramePreview(
preview_partition=preview_partition,
dataframe_num_rows=dataframe_num_rows,
)
df._populate_preview()
return df

###
Expand Down Expand Up @@ -1129,26 +1127,10 @@ def collect(self, num_preview_rows: Optional[int] = 8) -> "DataFrame":

assert self._result is not None
dataframe_len = len(self._result)
requested_rows = dataframe_len if num_preview_rows is None else num_preview_rows

# Build a DataFramePreview and cache it if necessary
preview_partition_invalid = (
self._preview.preview_partition is None or len(self._preview.preview_partition) < requested_rows
)
if preview_partition_invalid:
preview_df = self
if num_preview_rows is not None:
preview_df = preview_df.limit(num_preview_rows)
preview_df._materialize_results()
preview_results = preview_df._result
assert preview_results is not None

preview_partition = preview_results._get_merged_vpartition()
self._preview = DataFramePreview(
preview_partition=preview_partition,
dataframe_num_rows=dataframe_len,
)

if num_preview_rows is not None:
self._num_preview_rows = num_preview_rows
else:
self._num_preview_rows = dataframe_len
return self

def _construct_show_display(self, n: int) -> "DataFrameDisplay":
Expand Down
3 changes: 3 additions & 0 deletions daft/runners/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ class PartitionSet(Generic[PartitionT]):
def _get_merged_vpartition(self) -> MicroPartition:
raise NotImplementedError()

def _get_preview_vpartition(self, num_rows: int) -> list[MicroPartition]:
raise NotImplementedError()

def to_pydict(self) -> dict[str, list[Any]]:
"""Retrieves all the data in a PartitionSet as a Python dictionary. Values are the raw data from each Block."""
merged_partition = self._get_merged_vpartition()
Expand Down
13 changes: 13 additions & 0 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,19 @@ def _get_merged_vpartition(self) -> MicroPartition:
assert ids_and_partitions[-1][0] + 1 == len(ids_and_partitions)
return MicroPartition.concat([part for id, part in ids_and_partitions])

def _get_preview_vpartition(self, num_rows: int) -> list[MicroPartition]:
ids_and_partitions = self.items()
preview_parts = []
for _, part in ids_and_partitions:
part_len = len(part)
if part_len >= num_rows: # if this part has enough rows, take what we need and break
preview_parts.append(part.slice(0, num_rows))
break
else: # otherwise, take the whole part and keep going
num_rows -= part_len
preview_parts.append(part)
return preview_parts

def get_partition(self, idx: PartID) -> MicroPartition:
return self._partitions[idx]

Expand Down
14 changes: 14 additions & 0 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,20 @@ def _get_merged_vpartition(self) -> MicroPartition:
all_partitions = ray.get([part for id, part in ids_and_partitions])
return MicroPartition.concat(all_partitions)

def _get_preview_vpartition(self, num_rows: int) -> list[MicroPartition]:
ids_and_partitions = self.items()
preview_parts = []
for _, part in ids_and_partitions:
part = ray.get(part)
part_len = len(part)
if part_len >= num_rows: # if this part has enough rows, take what we need and break
preview_parts.append(part.slice(0, num_rows))
break
else: # otherwise, take the whole part and keep going
num_rows -= part_len
preview_parts.append(part)
return preview_parts

def to_ray_dataset(self) -> RayDataset:
if not _RAY_FROM_ARROW_REFS_AVAILABLE:
raise ImportError(
Expand Down
12 changes: 12 additions & 0 deletions tests/cookbook/test_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ def test_parquet_write(tmp_path):
assert_df_equals(df.to_pandas(), read_back_pd_df)

assert len(pd_df) == 1
assert pd_df._preview.preview_partition is None
pd_df.__repr__()
assert len(pd_df._preview.preview_partition) == 1


Expand All @@ -33,6 +35,8 @@ def test_parquet_write_with_partitioning(tmp_path):
assert_df_equals(df.to_pandas(), read_back_pd_df)

assert len(pd_df) == 5
assert pd_df._preview.preview_partition is None
pd_df.__repr__()
assert len(pd_df._preview.preview_partition) == 5


Expand All @@ -41,6 +45,8 @@ def test_empty_parquet_write_without_partitioning(tmp_path):
df = df.where(daft.lit(False))
output_files = df.write_parquet(tmp_path)
assert len(output_files) == 0
assert output_files._preview.preview_partition is None
output_files.__repr__()
assert len(output_files._preview.preview_partition) == 0


Expand All @@ -49,6 +55,8 @@ def test_empty_parquet_write_with_partitioning(tmp_path):
df = df.where(daft.lit(False))
output_files = df.write_parquet(tmp_path, partition_cols=["Borough"])
assert len(output_files) == 0
assert output_files._preview.preview_partition is None
output_files.__repr__()
assert len(output_files._preview.preview_partition) == 0


Expand All @@ -69,6 +77,8 @@ def test_parquet_write_with_partitioning_readback_values(tmp_path):
assert_df_equals(df.to_pandas(), read_back_pd_df)

assert len(output_files) == 5
assert output_files._preview.preview_partition is None
output_files.__repr__()
assert len(output_files._preview.preview_partition) == 5


Expand Down Expand Up @@ -193,6 +203,8 @@ def test_csv_write(tmp_path):
assert_df_equals(df.to_pandas(), read_back_pd_df)

assert len(pd_df) == 1
assert pd_df._preview.preview_partition is None
pd_df.__repr__()
assert len(pd_df._preview.preview_partition) == 1


Expand Down
2 changes: 1 addition & 1 deletion tests/dataframe/test_decimals.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def test_decimal_parquet_roundtrip() -> None:
df.write_parquet(dirname)
df_readback = daft.read_parquet(dirname).collect()

assert str(df.to_pydict()["decimal128"]) == str(df_readback.to_pydict()["decimal128"])
assert str(df.to_pydict()["decimal128"]) == str(df_readback.to_pydict()["decimal128"])


def test_arrow_decimal() -> None:
Expand Down
11 changes: 11 additions & 0 deletions tests/dataframe/test_repr.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import numpy as np
import pandas as pd
import pytest
from PIL import Image

import daft
Expand Down Expand Up @@ -86,6 +87,16 @@ def test_empty_repr(make_df):
assert df._repr_html_() == "<small>(No data to display: Dataframe has no columns)</small>"


@pytest.mark.parametrize("num_preview_rows", [9, 10, None])
def test_repr_with_non_default_preview_rows(make_df, num_preview_rows):
df = make_df({"A": [i for i in range(10)], "B": [i for i in range(10)]})
df.collect(num_preview_rows=num_preview_rows)
df.__repr__()

assert df._preview.dataframe_num_rows == 10
assert len(df._preview.preview_partition) == (num_preview_rows if num_preview_rows is not None else 10)


def test_empty_df_repr(make_df):
df = make_df({"A": [1, 2, 3], "B": ["a", "b", "c"]})
df = df.where(df["A"] > 10)
Expand Down
17 changes: 10 additions & 7 deletions tests/dataframe/test_show.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,44 +26,47 @@ def test_show_some(make_df, valid_data, data_source):
assert df_display.num_rows == 1


def test_show_from_cached_collect(make_df, valid_data):
def test_show_from_cached_repr(make_df, valid_data):
df = make_df(valid_data)
df = df.collect()
df.__repr__()
collected_preview = df._preview
df_display = df._construct_show_display(8)

# Check that cached preview from df.collect() was used.
# Check that cached preview from df.__repr__() was used.
assert df_display.preview is collected_preview
assert df_display.schema == df.schema()
assert len(df_display.preview.preview_partition) == len(valid_data)
assert df_display.preview.dataframe_num_rows == 3
assert df_display.num_rows == 3


def test_show_from_cached_collect_prefix(make_df, valid_data):
def test_show_from_cached_repr_prefix(make_df, valid_data):
df = make_df(valid_data)
df = df.collect(3)
df.__repr__()
df_display = df._construct_show_display(2)

assert df_display.schema == df.schema()
assert len(df_display.preview.preview_partition) == 2
# Check that a prefix of the cached preview from df.collect() was used, so dataframe_num_rows should be set.
# Check that a prefix of the cached preview from df.__repr__() was used, so dataframe_num_rows should be set.
assert df_display.preview.dataframe_num_rows == 3
assert df_display.num_rows == 2


def test_show_not_from_cached_collect(make_df, valid_data, data_source):
def test_show_not_from_cached_repr(make_df, valid_data, data_source):
df = make_df(valid_data)
df = df.collect(2)
df.__repr__()
collected_preview = df._preview
df_display = df._construct_show_display(8)

variant = data_source
if variant == "parquet":
# Cached preview from df.collect() is NOT USED because data was not materialized from parquet.
# Cached preview from df.__repr__() is NOT USED because data was not materialized from parquet.
assert df_display.preview != collected_preview
elif variant == "arrow":
# Cached preview from df.collect() is USED because data was materialized from arrow.
# Cached preview from df.__repr__() is USED because data was materialized from arrow.
assert df_display.preview == collected_preview
assert df_display.schema == df.schema()
assert len(df_display.preview.preview_partition) == len(valid_data)
Expand Down
2 changes: 1 addition & 1 deletion tests/dataframe/test_temporals.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def test_temporal_file_roundtrip(format, use_native_downloader) -> None:
df.write_parquet(dirname)
df_readback = daft.read_parquet(dirname, use_native_downloader=use_native_downloader).collect()

assert df.to_pydict() == df_readback.to_pydict()
assert df.to_pydict() == df_readback.to_pydict()


@pytest.mark.parametrize(
Expand Down

0 comments on commit b702e4d

Please sign in to comment.