Skip to content

Commit

Permalink
Merge branch 'main' into sammy/scan-task-in-memory-estimate
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 committed Feb 22, 2024
2 parents 03b8dd2 + 5b2fe98 commit 0a1cdc8
Show file tree
Hide file tree
Showing 39 changed files with 618 additions and 99 deletions.
11 changes: 11 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,14 @@ To set up your development environment:
1. `make build`: recompile your code after modifying any Rust code in `src/`
2. `make test`: run tests
3. `DAFT_RUNNER=ray make test`: set the runner to the Ray runner and run tests (DAFT_RUNNER defaults to `py`)

### Developing with Ray

Running a development version of Daft on a local Ray cluster is as simple as including `daft.context.set_runner_ray()` in your Python script and then building and executing it as usual.

To use a remote Ray cluster, run the following steps on the same operating system version as your Ray nodes, in order to ensure that your binaries are executable on Ray.

1. `mkdir wd`: this is the working directory, it will hold all the files to be submitted to Ray for a job
2. `ln -s daft wd/daft`: create a symbolic link from the Python module to the working directory
3. `make build-release`: an optimized build to ensure that the module is small enough to be successfully uploaded to Ray. Run this after modifying any Rust code in `src/`
4. `ray job submit --working-dir wd --address "http://<head_node_host>:8265" -- python script.py`: submit `wd/script.py` to be run on Ray
6 changes: 3 additions & 3 deletions daft/analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from daft import context

_ANALYTICS_CLIENT = None
_WRITE_KEY = "opL9scJXH6GKdIYgPdA0ncCj8i920LJq"
_WRITE_KEY = "ZU2LLq6HFW0kMEY6TiGZoGnRzogXBUwa"
_SEGMENT_BATCH_ENDPOINT = "https://api.segment.io/v1/batch"


Expand All @@ -34,8 +34,8 @@ class AnalyticsEvent:


def _get_session_key():
# Restrict the cardinality of keys to 8000
return f"anon-{random.randint(1, 8000)}"
# Restrict the cardinality of keys to 800
return f"anon-{random.randint(1, 800)}"


def _build_segment_batch_payload(
Expand Down
4 changes: 4 additions & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,8 @@ class PyExpr:
def utf8_contains(self, pattern: PyExpr) -> PyExpr: ...
def utf8_split(self, pattern: PyExpr) -> PyExpr: ...
def utf8_length(self) -> PyExpr: ...
def utf8_lower(self) -> PyExpr: ...
def utf8_upper(self) -> PyExpr: ...
def image_decode(self) -> PyExpr: ...
def image_encode(self, image_format: ImageFormat) -> PyExpr: ...
def image_resize(self, w: int, h: int) -> PyExpr: ...
Expand Down Expand Up @@ -989,6 +991,8 @@ class PySeries:
def utf8_contains(self, pattern: PySeries) -> PySeries: ...
def utf8_split(self, pattern: PySeries) -> PySeries: ...
def utf8_length(self) -> PySeries: ...
def utf8_lower(self) -> PySeries: ...
def utf8_upper(self) -> PySeries: ...
def is_nan(self) -> PySeries: ...
def dt_date(self) -> PySeries: ...
def dt_day(self) -> PySeries: ...
Expand Down
70 changes: 26 additions & 44 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def __init__(self, builder: LogicalPlanBuilder) -> None:
self.__builder = builder
self._result_cache: Optional[PartitionCacheEntry] = None
self._preview = DataFramePreview(preview_partition=None, dataframe_num_rows=None)
self._num_preview_rows = get_context().daft_execution_config.num_preview_rows

@property
def _builder(self) -> LogicalPlanBuilder:
Expand Down Expand Up @@ -225,13 +226,33 @@ def iter_partitions(self) -> Iterator[Union[MicroPartition, "RayObjectRef"]]:
for result in results_iter:
yield result.partition()

def _populate_preview(self) -> None:
"""Populates the preview of the DataFrame, if it is not already populated."""
if self._result is None:
return

preview_partition_invalid = (
self._preview.preview_partition is None or len(self._preview.preview_partition) < self._num_preview_rows
)
if preview_partition_invalid:
preview_parts = self._result._get_preview_vpartition(self._num_preview_rows)
preview_results = LocalPartitionSet({i: part for i, part in enumerate(preview_parts)})

preview_partition = preview_results._get_merged_vpartition()
self._preview = DataFramePreview(
preview_partition=preview_partition,
dataframe_num_rows=len(self),
)

@DataframePublicAPI
def __repr__(self) -> str:
self._populate_preview()
display = DataFrameDisplay(self._preview, self.schema())
return display.__repr__()

@DataframePublicAPI
def _repr_html_(self) -> str:
self._populate_preview()
display = DataFrameDisplay(self._preview, self.schema())
return display._repr_html_()

Expand Down Expand Up @@ -305,30 +326,7 @@ def _from_tables(cls, *parts: MicroPartition) -> "DataFrame":
df._result_cache = cache_entry

# build preview
num_preview_rows = context.daft_execution_config.num_preview_rows
dataframe_num_rows = len(df)
if dataframe_num_rows > num_preview_rows:
need = num_preview_rows
preview_parts = []
for part in parts:
part_len = len(part)
if part_len >= need: # if this part has enough rows, take what we need and break
preview_parts.append(part.slice(0, need))
break
else: # otherwise, take the whole part and keep going
need -= part_len
preview_parts.append(part)

preview_results = LocalPartitionSet({i: part for i, part in enumerate(preview_parts)})
else:
preview_results = result_pset

# set preview
preview_partition = preview_results._get_merged_vpartition()
df._preview = DataFramePreview(
preview_partition=preview_partition,
dataframe_num_rows=dataframe_num_rows,
)
df._populate_preview()
return df

###
Expand Down Expand Up @@ -1129,26 +1127,10 @@ def collect(self, num_preview_rows: Optional[int] = 8) -> "DataFrame":

assert self._result is not None
dataframe_len = len(self._result)
requested_rows = dataframe_len if num_preview_rows is None else num_preview_rows

# Build a DataFramePreview and cache it if necessary
preview_partition_invalid = (
self._preview.preview_partition is None or len(self._preview.preview_partition) < requested_rows
)
if preview_partition_invalid:
preview_df = self
if num_preview_rows is not None:
preview_df = preview_df.limit(num_preview_rows)
preview_df._materialize_results()
preview_results = preview_df._result
assert preview_results is not None

preview_partition = preview_results._get_merged_vpartition()
self._preview = DataFramePreview(
preview_partition=preview_partition,
dataframe_num_rows=dataframe_len,
)

if num_preview_rows is not None:
self._num_preview_rows = num_preview_rows
else:
self._num_preview_rows = dataframe_len
return self

def _construct_show_display(self, n: int) -> "DataFrameDisplay":
Expand Down
22 changes: 22 additions & 0 deletions daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,28 @@ def length(self) -> Expression:
"""
return Expression._from_pyexpr(self._expr.utf8_length())

def lower(self) -> Expression:
"""Convert UTF-8 string to all lowercase
Example:
>>> col("x").str.lower()
Returns:
Expression: a String expression which is `self` lowercased
"""
return Expression._from_pyexpr(self._expr.utf8_lower())

def upper(self) -> Expression:
"""Convert UTF-8 string to all upper
Example:
>>> col("x").str.upper()
Returns:
Expression: a String expression which is `self` uppercased
"""
return Expression._from_pyexpr(self._expr.utf8_upper())


class ExpressionListNamespace(ExpressionNamespace):
def join(self, delimiter: str | Expression) -> Expression:
Expand Down
4 changes: 2 additions & 2 deletions daft/io/_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ def read_csv(
Args:
path (str): Path to CSV (allows for wildcards)
schema_hints (dict[str, DataType]): A mapping between column names and datatypes - passing this option will
disable all schema inference on data being read, and throw an error if data being read is incompatible.
schema_hints (dict[str, DataType]): A mapping between column names and datatypes - passing this option
will override the specified columns on the inferred schema with the specified DataTypes
has_headers (bool): Whether the CSV has a header or not, defaults to True
delimiter (Str): Delimiter used in the CSV, defaults to ","
doubled_quote (bool): Whether to support double quote escapes, defaults to True
Expand Down
4 changes: 2 additions & 2 deletions daft/io/_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ def read_json(
Args:
path (str): Path to JSON files (allows for wildcards)
schema_hints (dict[str, DataType]): A mapping between column names and datatypes - passing this option will
disable all schema inference on data being read, and throw an error if data being read is incompatible.
schema_hints (dict[str, DataType]): A mapping between column names and datatypes - passing this option
will override the specified columns on the inferred schema with the specified DataTypes
io_config (IOConfig): Config to be used with the native downloader
use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. This
is currently experimental.
Expand Down
4 changes: 2 additions & 2 deletions daft/io/_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ def read_parquet(
Args:
path (str): Path to Parquet file (allows for wildcards)
schema_hints (dict[str, DataType]): A mapping between column names and datatypes - passing this option will
disable all schema inference on data being read, and throw an error if data being read is incompatible.
schema_hints (dict[str, DataType]): A mapping between column names and datatypes - passing this option
will override the specified columns on the inferred schema with the specified DataTypes
io_config (IOConfig): Config to be used with the native downloader
use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet.
_multithreaded_io: Whether to use multithreading for IO threads. Setting this to False can be helpful in reducing
Expand Down
3 changes: 3 additions & 0 deletions daft/runners/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ class PartitionSet(Generic[PartitionT]):
def _get_merged_vpartition(self) -> MicroPartition:
raise NotImplementedError()

def _get_preview_vpartition(self, num_rows: int) -> list[MicroPartition]:
raise NotImplementedError()

def to_pydict(self) -> dict[str, list[Any]]:
"""Retrieves all the data in a PartitionSet as a Python dictionary. Values are the raw data from each Block."""
merged_partition = self._get_merged_vpartition()
Expand Down
13 changes: 13 additions & 0 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,19 @@ def _get_merged_vpartition(self) -> MicroPartition:
assert ids_and_partitions[-1][0] + 1 == len(ids_and_partitions)
return MicroPartition.concat([part for id, part in ids_and_partitions])

def _get_preview_vpartition(self, num_rows: int) -> list[MicroPartition]:
ids_and_partitions = self.items()
preview_parts = []
for _, part in ids_and_partitions:
part_len = len(part)
if part_len >= num_rows: # if this part has enough rows, take what we need and break
preview_parts.append(part.slice(0, num_rows))
break
else: # otherwise, take the whole part and keep going
num_rows -= part_len
preview_parts.append(part)
return preview_parts

def get_partition(self, idx: PartID) -> MicroPartition:
return self._partitions[idx]

Expand Down
14 changes: 14 additions & 0 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,20 @@ def _get_merged_vpartition(self) -> MicroPartition:
all_partitions = ray.get([part for id, part in ids_and_partitions])
return MicroPartition.concat(all_partitions)

def _get_preview_vpartition(self, num_rows: int) -> list[MicroPartition]:
ids_and_partitions = self.items()
preview_parts = []
for _, part in ids_and_partitions:
part = ray.get(part)
part_len = len(part)
if part_len >= num_rows: # if this part has enough rows, take what we need and break
preview_parts.append(part.slice(0, num_rows))
break
else: # otherwise, take the whole part and keep going
num_rows -= part_len
preview_parts.append(part)
return preview_parts

def to_ray_dataset(self) -> RayDataset:
if not _RAY_FROM_ARROW_REFS_AVAILABLE:
raise ImportError(
Expand Down
8 changes: 8 additions & 0 deletions daft/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,14 @@ def length(self) -> Series:
assert self._series is not None
return Series._from_pyseries(self._series.utf8_length())

def lower(self) -> Series:
assert self._series is not None
return Series._from_pyseries(self._series.utf8_lower())

def upper(self) -> Series:
assert self._series is not None
return Series._from_pyseries(self._series.utf8_upper())


class SeriesDateNamespace(SeriesNamespace):
def date(self) -> Series:
Expand Down
2 changes: 2 additions & 0 deletions docs/source/api_docs/expressions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ The following methods are available under the ``expr.str`` attribute.
Expression.str.concat
Expression.str.length
Expression.str.split
Expression.str.lower
Expression.str.upper

.. _api-expressions-temporal:

Expand Down
1 change: 1 addition & 0 deletions docs/source/user_guide/integrations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ Integrations

.. toctree::

integrations/ray
integrations/iceberg
integrations/microsoft-azure
integrations/aws
90 changes: 90 additions & 0 deletions docs/source/user_guide/integrations/ray.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
Ray
===

`Ray <https://docs.ray.io/en/latest/ray-overview/index.html>`_ is an open-source framework for distributed computing.

Daft's native support for Ray enables you to run distributed DataFrame workloads at scale.

Usage
-----

You can run Daft on Ray in two ways: by using the `Ray Client <https://docs.ray.io/en/latest/cluster/running-applications/job-submission/ray-client.html>`_ or by submitting a Ray job.

Ray Client
**********
The Ray client is quick way to get started with running tasks and retrieving their results on Ray using Python.

.. WARNING::
To run tasks using the Ray client, the version of Daft and the minor version (eg. 3.9, 3.10) of Python must match between client and server.

Here's an example of how you can use the Ray client with Daft:

.. code:: python
>>> import daft
>>> import ray
>>>
>>> # Refer to the note under "Ray Job" for details on "runtime_env"
>>> ray.init("ray://<head_node_host>:10001", runtime_env={"pip": ["getdaft"]})
>>>
>>> # Starts the Ray client and tells Daft to use Ray to execute queries
>>> # If ray.init() has already been called, it uses the existing client
>>> daft.context.set_runner_ray("ray://<head_node_host>:10001")
>>>
>>> df = daft.from_pydict({
>>> "a": [3, 2, 5, 6, 1, 4],
>>> "b": [True, False, False, True, True, False]
>>> })
>>> df = df.where(df["b"]).sort(df["a"])
>>>
>>> # Daft executes the query remotely and returns a preview to the client
>>> df.collect()
╭───────┬─────────╮
│ a ┆ b │
------
│ Int64 ┆ Boolean │
╞═══════╪═════════╡
1 ┆ true │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
3 ┆ true │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
6 ┆ true │
╰───────┴─────────╯
(Showing first 3 of 3 rows)
Ray Job
*******
Ray jobs allow for more control and observability over using the Ray client. In addition, your entire code runs on Ray, which means it is not constrained by the compute, network, library versions, or availability of your local machine.

.. code:: python
# wd/job.py
import daft
def main():
# call without any arguments to connect to Ray from the head node
daft.context.set_runner_ray()
# ... Run Daft commands here ...
if __name__ == "__main__":
main()
To submit this script as a job, use the Ray CLI, which can be installed with `pip install "ray[default]"`.

.. code:: sh
ray job submit \
--working-dir wd \
--address "http://<head_node_host>:8265" \
--runtime-env-json '{"pip": ["getdaft"]}' \
-- python job.py
.. NOTE::

The runtime env parameter specifies that Daft should be installed on the Ray workers. Alternative methods of including Daft in the worker dependencies can be found `here <https://docs.ray.io/en/latest/ray-core/handling-dependencies.html>`_.


For more information about Ray jobs, see `Ray docs -> Ray Jobs Overview <https://docs.ray.io/en/latest/cluster/running-applications/job-submission/index.html>`_.
Loading

0 comments on commit 0a1cdc8

Please sign in to comment.