Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHORE] Populate previews only when show() or __repr__() is called #1889

Merged
merged 7 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 26 additions & 44 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def __init__(self, builder: LogicalPlanBuilder) -> None:
self.__builder = builder
self._result_cache: Optional[PartitionCacheEntry] = None
self._preview = DataFramePreview(preview_partition=None, dataframe_num_rows=None)
self._num_preview_rows = get_context().daft_execution_config.num_preview_rows

@property
def _builder(self) -> LogicalPlanBuilder:
Expand Down Expand Up @@ -225,13 +226,33 @@ def iter_partitions(self) -> Iterator[Union[MicroPartition, "RayObjectRef"]]:
for result in results_iter:
yield result.partition()

def _populate_preview(self) -> None:
"""Populates the preview of the DataFrame, if it is not already populated."""
if self._result is None:
return

preview_partition_invalid = (
self._preview.preview_partition is None or len(self._preview.preview_partition) < self._num_preview_rows
)
if preview_partition_invalid:
preview_parts = self._result._get_preview_vpartition(self._num_preview_rows)
preview_results = LocalPartitionSet({i: part for i, part in enumerate(preview_parts)})

preview_partition = preview_results._get_merged_vpartition()
self._preview = DataFramePreview(
preview_partition=preview_partition,
dataframe_num_rows=len(self),
)

@DataframePublicAPI
def __repr__(self) -> str:
self._populate_preview()
display = DataFrameDisplay(self._preview, self.schema())
return display.__repr__()

@DataframePublicAPI
def _repr_html_(self) -> str:
self._populate_preview()
display = DataFrameDisplay(self._preview, self.schema())
return display._repr_html_()

Expand Down Expand Up @@ -305,30 +326,7 @@ def _from_tables(cls, *parts: MicroPartition) -> "DataFrame":
df._result_cache = cache_entry

# build preview
num_preview_rows = context.daft_execution_config.num_preview_rows
dataframe_num_rows = len(df)
if dataframe_num_rows > num_preview_rows:
need = num_preview_rows
preview_parts = []
for part in parts:
part_len = len(part)
if part_len >= need: # if this part has enough rows, take what we need and break
preview_parts.append(part.slice(0, need))
break
else: # otherwise, take the whole part and keep going
need -= part_len
preview_parts.append(part)

preview_results = LocalPartitionSet({i: part for i, part in enumerate(preview_parts)})
else:
preview_results = result_pset

# set preview
preview_partition = preview_results._get_merged_vpartition()
df._preview = DataFramePreview(
preview_partition=preview_partition,
dataframe_num_rows=dataframe_num_rows,
)
df._populate_preview()
return df

###
Expand Down Expand Up @@ -1129,26 +1127,10 @@ def collect(self, num_preview_rows: Optional[int] = 8) -> "DataFrame":

assert self._result is not None
dataframe_len = len(self._result)
requested_rows = dataframe_len if num_preview_rows is None else num_preview_rows

# Build a DataFramePreview and cache it if necessary
preview_partition_invalid = (
self._preview.preview_partition is None or len(self._preview.preview_partition) < requested_rows
)
if preview_partition_invalid:
preview_df = self
if num_preview_rows is not None:
preview_df = preview_df.limit(num_preview_rows)
preview_df._materialize_results()
preview_results = preview_df._result
assert preview_results is not None

preview_partition = preview_results._get_merged_vpartition()
self._preview = DataFramePreview(
preview_partition=preview_partition,
dataframe_num_rows=dataframe_len,
)

if num_preview_rows is not None:
self._num_preview_rows = num_preview_rows
else:
self._num_preview_rows = dataframe_len
return self

def _construct_show_display(self, n: int) -> "DataFrameDisplay":
Expand Down
3 changes: 3 additions & 0 deletions daft/runners/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@
def _get_merged_vpartition(self) -> MicroPartition:
raise NotImplementedError()

def _get_preview_vpartition(self, num_rows: int) -> list[MicroPartition]:
raise NotImplementedError()

Check warning on line 213 in daft/runners/partitioning.py

View check run for this annotation

Codecov / codecov/patch

daft/runners/partitioning.py#L213

Added line #L213 was not covered by tests

def to_pydict(self) -> dict[str, list[Any]]:
"""Retrieves all the data in a PartitionSet as a Python dictionary. Values are the raw data from each Block."""
merged_partition = self._get_merged_vpartition()
Expand Down
13 changes: 13 additions & 0 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,19 @@ def _get_merged_vpartition(self) -> MicroPartition:
assert ids_and_partitions[-1][0] + 1 == len(ids_and_partitions)
return MicroPartition.concat([part for id, part in ids_and_partitions])

def _get_preview_vpartition(self, num_rows: int) -> list[MicroPartition]:
ids_and_partitions = self.items()
preview_parts = []
for _, part in ids_and_partitions:
part_len = len(part)
if part_len >= num_rows: # if this part has enough rows, take what we need and break
preview_parts.append(part.slice(0, num_rows))
break
else: # otherwise, take the whole part and keep going
num_rows -= part_len
preview_parts.append(part)
return preview_parts

def get_partition(self, idx: PartID) -> MicroPartition:
return self._partitions[idx]

Expand Down
14 changes: 14 additions & 0 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,20 @@ def _get_merged_vpartition(self) -> MicroPartition:
all_partitions = ray.get([part for id, part in ids_and_partitions])
return MicroPartition.concat(all_partitions)

def _get_preview_vpartition(self, num_rows: int) -> list[MicroPartition]:
ids_and_partitions = self.items()
preview_parts = []
for _, part in ids_and_partitions:
part = ray.get(part)
part_len = len(part)
if part_len >= num_rows: # if this part has enough rows, take what we need and break
preview_parts.append(part.slice(0, num_rows))
break
else: # otherwise, take the whole part and keep going
num_rows -= part_len
preview_parts.append(part)
return preview_parts

def to_ray_dataset(self) -> RayDataset:
if not _RAY_FROM_ARROW_REFS_AVAILABLE:
raise ImportError(
Expand Down
12 changes: 12 additions & 0 deletions tests/cookbook/test_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ def test_parquet_write(tmp_path):
assert_df_equals(df.to_pandas(), read_back_pd_df)

assert len(pd_df) == 1
assert pd_df._preview.preview_partition is None
pd_df.__repr__()
assert len(pd_df._preview.preview_partition) == 1


Expand All @@ -33,6 +35,8 @@ def test_parquet_write_with_partitioning(tmp_path):
assert_df_equals(df.to_pandas(), read_back_pd_df)

assert len(pd_df) == 5
assert pd_df._preview.preview_partition is None
pd_df.__repr__()
assert len(pd_df._preview.preview_partition) == 5


Expand All @@ -41,6 +45,8 @@ def test_empty_parquet_write_without_partitioning(tmp_path):
df = df.where(daft.lit(False))
output_files = df.write_parquet(tmp_path)
assert len(output_files) == 0
assert output_files._preview.preview_partition is None
output_files.__repr__()
assert len(output_files._preview.preview_partition) == 0


Expand All @@ -49,6 +55,8 @@ def test_empty_parquet_write_with_partitioning(tmp_path):
df = df.where(daft.lit(False))
output_files = df.write_parquet(tmp_path, partition_cols=["Borough"])
assert len(output_files) == 0
assert output_files._preview.preview_partition is None
output_files.__repr__()
assert len(output_files._preview.preview_partition) == 0


Expand All @@ -69,6 +77,8 @@ def test_parquet_write_with_partitioning_readback_values(tmp_path):
assert_df_equals(df.to_pandas(), read_back_pd_df)

assert len(output_files) == 5
assert output_files._preview.preview_partition is None
output_files.__repr__()
assert len(output_files._preview.preview_partition) == 5


Expand Down Expand Up @@ -193,6 +203,8 @@ def test_csv_write(tmp_path):
assert_df_equals(df.to_pandas(), read_back_pd_df)

assert len(pd_df) == 1
assert pd_df._preview.preview_partition is None
pd_df.__repr__()
assert len(pd_df._preview.preview_partition) == 1


Expand Down
2 changes: 1 addition & 1 deletion tests/dataframe/test_decimals.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def test_decimal_parquet_roundtrip() -> None:
df.write_parquet(dirname)
df_readback = daft.read_parquet(dirname).collect()

assert str(df.to_pydict()["decimal128"]) == str(df_readback.to_pydict()["decimal128"])
assert str(df.to_pydict()["decimal128"]) == str(df_readback.to_pydict()["decimal128"])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was weird, if i didn't do the indent, it would fail with:

../../daft/api_annotations.py:31: in _wrap
    return timed_method(*args, **kwargs)
../../daft/analytics.py:185: in tracked_method
    return method(*args, **kwargs)
../../daft/dataframe/dataframe.py:1316: in to_pydict
    return result.to_pydict()
../../daft/runners/partitioning.py:214: in to_pydict
    merged_partition = self._get_merged_vpartition()
../../daft/runners/pyrunner.py:52: in _get_merged_vpartition
    return MicroPartition.concat([part for id, part in ids_and_partitions])

@classmethod
    def concat(cls, to_merge: list[MicroPartition]) -> MicroPartition:
        micropartitions = []
        for t in to_merge:
            if not isinstance(t, MicroPartition):
                raise TypeError(f"Expected a MicroPartition for concat, got {type(t)}")
            micropartitions.append(t._micropartition)
>       return MicroPartition._from_pymicropartition(_PyMicroPartition.concat(micropartitions))
E       ValueError: DaftError::External Internal IO Error when Opening: /var/folders/nl/4tv7_psn7fq3n8sn6b4cpvw80000gn/T/tmputa6b7lc/ce958d3e-9fdc-48c3-9c8b-1917639037cd-0.parquet:
E       Details:
E       No such file or directory (os error 2)
```

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i see, .collect() creates an unloaded micropartition, so it didn't do any actual reading of the parquet data



def test_arrow_decimal() -> None:
Expand Down
11 changes: 11 additions & 0 deletions tests/dataframe/test_repr.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import numpy as np
import pandas as pd
import pytest
from PIL import Image

import daft
Expand Down Expand Up @@ -86,6 +87,16 @@ def test_empty_repr(make_df):
assert df._repr_html_() == "<small>(No data to display: Dataframe has no columns)</small>"


@pytest.mark.parametrize("num_preview_rows", [9, 10, None])
def test_repr_with_non_default_preview_rows(make_df, num_preview_rows):
df = make_df({"A": [i for i in range(10)], "B": [i for i in range(10)]})
df.collect(num_preview_rows=num_preview_rows)
df.__repr__()

assert df._preview.dataframe_num_rows == 10
assert len(df._preview.preview_partition) == (num_preview_rows if num_preview_rows is not None else 10)


def test_empty_df_repr(make_df):
df = make_df({"A": [1, 2, 3], "B": ["a", "b", "c"]})
df = df.where(df["A"] > 10)
Expand Down
17 changes: 10 additions & 7 deletions tests/dataframe/test_show.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,44 +26,47 @@ def test_show_some(make_df, valid_data, data_source):
assert df_display.num_rows == 1


def test_show_from_cached_collect(make_df, valid_data):
Copy link
Contributor Author

@colin-ho colin-ho Feb 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

concept of 'cached_collect' doesn't exist anymore with this PR so i removed these tests

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess "cached" would be if we called __repr__() twice in a row now?

def test_show_from_cached_repr(make_df, valid_data):
df = make_df(valid_data)
df = df.collect()
df.__repr__()
collected_preview = df._preview
df_display = df._construct_show_display(8)

# Check that cached preview from df.collect() was used.
# Check that cached preview from df.__repr__() was used.
assert df_display.preview is collected_preview
assert df_display.schema == df.schema()
assert len(df_display.preview.preview_partition) == len(valid_data)
assert df_display.preview.dataframe_num_rows == 3
assert df_display.num_rows == 3


def test_show_from_cached_collect_prefix(make_df, valid_data):
def test_show_from_cached_repr_prefix(make_df, valid_data):
df = make_df(valid_data)
df = df.collect(3)
df.__repr__()
df_display = df._construct_show_display(2)

assert df_display.schema == df.schema()
assert len(df_display.preview.preview_partition) == 2
# Check that a prefix of the cached preview from df.collect() was used, so dataframe_num_rows should be set.
# Check that a prefix of the cached preview from df.__repr__() was used, so dataframe_num_rows should be set.
assert df_display.preview.dataframe_num_rows == 3
assert df_display.num_rows == 2


def test_show_not_from_cached_collect(make_df, valid_data, data_source):
def test_show_not_from_cached_repr(make_df, valid_data, data_source):
df = make_df(valid_data)
df = df.collect(2)
df.__repr__()
collected_preview = df._preview
df_display = df._construct_show_display(8)

variant = data_source
if variant == "parquet":
# Cached preview from df.collect() is NOT USED because data was not materialized from parquet.
# Cached preview from df.__repr__() is NOT USED because data was not materialized from parquet.
assert df_display.preview != collected_preview
elif variant == "arrow":
# Cached preview from df.collect() is USED because data was materialized from arrow.
# Cached preview from df.__repr__() is USED because data was materialized from arrow.
assert df_display.preview == collected_preview
assert df_display.schema == df.schema()
assert len(df_display.preview.preview_partition) == len(valid_data)
Expand Down
2 changes: 1 addition & 1 deletion tests/dataframe/test_temporals.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def test_temporal_file_roundtrip(format, use_native_downloader) -> None:
df.write_parquet(dirname)
df_readback = daft.read_parquet(dirname, use_native_downloader=use_native_downloader).collect()

assert df.to_pydict() == df_readback.to_pydict()
assert df.to_pydict() == df_readback.to_pydict()


@pytest.mark.parametrize(
Expand Down
Loading