diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 5f13ac06bf..793e4c6c8c 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -36,3 +36,14 @@ To set up your development environment: 1. `make build`: recompile your code after modifying any Rust code in `src/` 2. `make test`: run tests 3. `DAFT_RUNNER=ray make test`: set the runner to the Ray runner and run tests (DAFT_RUNNER defaults to `py`) + +### Developing with Ray + +Running a development version of Daft on a local Ray cluster is as simple as including `daft.context.set_runner_ray()` in your Python script and then building and executing it as usual. + +To use a remote Ray cluster, run the following steps on the same operating system version as your Ray nodes, in order to ensure that your binaries are executable on Ray. + +1. `mkdir wd`: this is the working directory, it will hold all the files to be submitted to Ray for a job +2. `ln -s daft wd/daft`: create a symbolic link from the Python module to the working directory +3. `make build-release`: an optimized build to ensure that the module is small enough to be successfully uploaded to Ray. Run this after modifying any Rust code in `src/` +4. `ray job submit --working-dir wd --address "http://:8265" -- python script.py`: submit `wd/script.py` to be run on Ray diff --git a/daft/analytics.py b/daft/analytics.py index 4944aa2bad..bf5b2c661d 100644 --- a/daft/analytics.py +++ b/daft/analytics.py @@ -18,7 +18,7 @@ from daft import context _ANALYTICS_CLIENT = None -_WRITE_KEY = "opL9scJXH6GKdIYgPdA0ncCj8i920LJq" +_WRITE_KEY = "ZU2LLq6HFW0kMEY6TiGZoGnRzogXBUwa" _SEGMENT_BATCH_ENDPOINT = "https://api.segment.io/v1/batch" @@ -34,8 +34,8 @@ class AnalyticsEvent: def _get_session_key(): - # Restrict the cardinality of keys to 8000 - return f"anon-{random.randint(1, 8000)}" + # Restrict the cardinality of keys to 800 + return f"anon-{random.randint(1, 800)}" def _build_segment_batch_payload( diff --git a/daft/daft.pyi b/daft/daft.pyi index ccd04aaf11..14ad010de1 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -914,6 +914,8 @@ class PyExpr: def utf8_contains(self, pattern: PyExpr) -> PyExpr: ... def utf8_split(self, pattern: PyExpr) -> PyExpr: ... def utf8_length(self) -> PyExpr: ... + def utf8_lower(self) -> PyExpr: ... + def utf8_upper(self) -> PyExpr: ... def image_decode(self) -> PyExpr: ... def image_encode(self, image_format: ImageFormat) -> PyExpr: ... def image_resize(self, w: int, h: int) -> PyExpr: ... @@ -989,6 +991,8 @@ class PySeries: def utf8_contains(self, pattern: PySeries) -> PySeries: ... def utf8_split(self, pattern: PySeries) -> PySeries: ... def utf8_length(self) -> PySeries: ... + def utf8_lower(self) -> PySeries: ... + def utf8_upper(self) -> PySeries: ... def is_nan(self) -> PySeries: ... def dt_date(self) -> PySeries: ... def dt_day(self) -> PySeries: ... diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index a370d998ed..32f8bb726e 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -85,6 +85,7 @@ def __init__(self, builder: LogicalPlanBuilder) -> None: self.__builder = builder self._result_cache: Optional[PartitionCacheEntry] = None self._preview = DataFramePreview(preview_partition=None, dataframe_num_rows=None) + self._num_preview_rows = get_context().daft_execution_config.num_preview_rows @property def _builder(self) -> LogicalPlanBuilder: @@ -225,13 +226,33 @@ def iter_partitions(self) -> Iterator[Union[MicroPartition, "RayObjectRef"]]: for result in results_iter: yield result.partition() + def _populate_preview(self) -> None: + """Populates the preview of the DataFrame, if it is not already populated.""" + if self._result is None: + return + + preview_partition_invalid = ( + self._preview.preview_partition is None or len(self._preview.preview_partition) < self._num_preview_rows + ) + if preview_partition_invalid: + preview_parts = self._result._get_preview_vpartition(self._num_preview_rows) + preview_results = LocalPartitionSet({i: part for i, part in enumerate(preview_parts)}) + + preview_partition = preview_results._get_merged_vpartition() + self._preview = DataFramePreview( + preview_partition=preview_partition, + dataframe_num_rows=len(self), + ) + @DataframePublicAPI def __repr__(self) -> str: + self._populate_preview() display = DataFrameDisplay(self._preview, self.schema()) return display.__repr__() @DataframePublicAPI def _repr_html_(self) -> str: + self._populate_preview() display = DataFrameDisplay(self._preview, self.schema()) return display._repr_html_() @@ -305,30 +326,7 @@ def _from_tables(cls, *parts: MicroPartition) -> "DataFrame": df._result_cache = cache_entry # build preview - num_preview_rows = context.daft_execution_config.num_preview_rows - dataframe_num_rows = len(df) - if dataframe_num_rows > num_preview_rows: - need = num_preview_rows - preview_parts = [] - for part in parts: - part_len = len(part) - if part_len >= need: # if this part has enough rows, take what we need and break - preview_parts.append(part.slice(0, need)) - break - else: # otherwise, take the whole part and keep going - need -= part_len - preview_parts.append(part) - - preview_results = LocalPartitionSet({i: part for i, part in enumerate(preview_parts)}) - else: - preview_results = result_pset - - # set preview - preview_partition = preview_results._get_merged_vpartition() - df._preview = DataFramePreview( - preview_partition=preview_partition, - dataframe_num_rows=dataframe_num_rows, - ) + df._populate_preview() return df ### @@ -1129,26 +1127,10 @@ def collect(self, num_preview_rows: Optional[int] = 8) -> "DataFrame": assert self._result is not None dataframe_len = len(self._result) - requested_rows = dataframe_len if num_preview_rows is None else num_preview_rows - - # Build a DataFramePreview and cache it if necessary - preview_partition_invalid = ( - self._preview.preview_partition is None or len(self._preview.preview_partition) < requested_rows - ) - if preview_partition_invalid: - preview_df = self - if num_preview_rows is not None: - preview_df = preview_df.limit(num_preview_rows) - preview_df._materialize_results() - preview_results = preview_df._result - assert preview_results is not None - - preview_partition = preview_results._get_merged_vpartition() - self._preview = DataFramePreview( - preview_partition=preview_partition, - dataframe_num_rows=dataframe_len, - ) - + if num_preview_rows is not None: + self._num_preview_rows = num_preview_rows + else: + self._num_preview_rows = dataframe_len return self def _construct_show_display(self, n: int) -> "DataFrameDisplay": diff --git a/daft/expressions/expressions.py b/daft/expressions/expressions.py index a903c968db..964903a97d 100644 --- a/daft/expressions/expressions.py +++ b/daft/expressions/expressions.py @@ -732,6 +732,28 @@ def length(self) -> Expression: """ return Expression._from_pyexpr(self._expr.utf8_length()) + def lower(self) -> Expression: + """Convert UTF-8 string to all lowercase + + Example: + >>> col("x").str.lower() + + Returns: + Expression: a String expression which is `self` lowercased + """ + return Expression._from_pyexpr(self._expr.utf8_lower()) + + def upper(self) -> Expression: + """Convert UTF-8 string to all upper + + Example: + >>> col("x").str.upper() + + Returns: + Expression: a String expression which is `self` uppercased + """ + return Expression._from_pyexpr(self._expr.utf8_upper()) + class ExpressionListNamespace(ExpressionNamespace): def join(self, delimiter: str | Expression) -> Expression: diff --git a/daft/io/_csv.py b/daft/io/_csv.py index 9101f83872..ec3b1f1e7f 100644 --- a/daft/io/_csv.py +++ b/daft/io/_csv.py @@ -43,8 +43,8 @@ def read_csv( Args: path (str): Path to CSV (allows for wildcards) - schema_hints (dict[str, DataType]): A mapping between column names and datatypes - passing this option will - disable all schema inference on data being read, and throw an error if data being read is incompatible. + schema_hints (dict[str, DataType]): A mapping between column names and datatypes - passing this option + will override the specified columns on the inferred schema with the specified DataTypes has_headers (bool): Whether the CSV has a header or not, defaults to True delimiter (Str): Delimiter used in the CSV, defaults to "," doubled_quote (bool): Whether to support double quote escapes, defaults to True diff --git a/daft/io/_json.py b/daft/io/_json.py index 309686c61b..c052fb2c52 100644 --- a/daft/io/_json.py +++ b/daft/io/_json.py @@ -36,8 +36,8 @@ def read_json( Args: path (str): Path to JSON files (allows for wildcards) - schema_hints (dict[str, DataType]): A mapping between column names and datatypes - passing this option will - disable all schema inference on data being read, and throw an error if data being read is incompatible. + schema_hints (dict[str, DataType]): A mapping between column names and datatypes - passing this option + will override the specified columns on the inferred schema with the specified DataTypes io_config (IOConfig): Config to be used with the native downloader use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. This is currently experimental. diff --git a/daft/io/_parquet.py b/daft/io/_parquet.py index 6b95723a45..a41d82ec64 100644 --- a/daft/io/_parquet.py +++ b/daft/io/_parquet.py @@ -35,8 +35,8 @@ def read_parquet( Args: path (str): Path to Parquet file (allows for wildcards) - schema_hints (dict[str, DataType]): A mapping between column names and datatypes - passing this option will - disable all schema inference on data being read, and throw an error if data being read is incompatible. + schema_hints (dict[str, DataType]): A mapping between column names and datatypes - passing this option + will override the specified columns on the inferred schema with the specified DataTypes io_config (IOConfig): Config to be used with the native downloader use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. _multithreaded_io: Whether to use multithreading for IO threads. Setting this to False can be helpful in reducing diff --git a/daft/runners/partitioning.py b/daft/runners/partitioning.py index 8836a3bc5c..56fda08a3a 100644 --- a/daft/runners/partitioning.py +++ b/daft/runners/partitioning.py @@ -209,6 +209,9 @@ class PartitionSet(Generic[PartitionT]): def _get_merged_vpartition(self) -> MicroPartition: raise NotImplementedError() + def _get_preview_vpartition(self, num_rows: int) -> list[MicroPartition]: + raise NotImplementedError() + def to_pydict(self) -> dict[str, list[Any]]: """Retrieves all the data in a PartitionSet as a Python dictionary. Values are the raw data from each Block.""" merged_partition = self._get_merged_vpartition() diff --git a/daft/runners/pyrunner.py b/daft/runners/pyrunner.py index 2be28e4c54..41b435ff24 100644 --- a/daft/runners/pyrunner.py +++ b/daft/runners/pyrunner.py @@ -51,6 +51,19 @@ def _get_merged_vpartition(self) -> MicroPartition: assert ids_and_partitions[-1][0] + 1 == len(ids_and_partitions) return MicroPartition.concat([part for id, part in ids_and_partitions]) + def _get_preview_vpartition(self, num_rows: int) -> list[MicroPartition]: + ids_and_partitions = self.items() + preview_parts = [] + for _, part in ids_and_partitions: + part_len = len(part) + if part_len >= num_rows: # if this part has enough rows, take what we need and break + preview_parts.append(part.slice(0, num_rows)) + break + else: # otherwise, take the whole part and keep going + num_rows -= part_len + preview_parts.append(part) + return preview_parts + def get_partition(self, idx: PartID) -> MicroPartition: return self._partitions[idx] diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index 24d597084c..e2da71b62b 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -151,6 +151,20 @@ def _get_merged_vpartition(self) -> MicroPartition: all_partitions = ray.get([part for id, part in ids_and_partitions]) return MicroPartition.concat(all_partitions) + def _get_preview_vpartition(self, num_rows: int) -> list[MicroPartition]: + ids_and_partitions = self.items() + preview_parts = [] + for _, part in ids_and_partitions: + part = ray.get(part) + part_len = len(part) + if part_len >= num_rows: # if this part has enough rows, take what we need and break + preview_parts.append(part.slice(0, num_rows)) + break + else: # otherwise, take the whole part and keep going + num_rows -= part_len + preview_parts.append(part) + return preview_parts + def to_ray_dataset(self) -> RayDataset: if not _RAY_FROM_ARROW_REFS_AVAILABLE: raise ImportError( diff --git a/daft/series.py b/daft/series.py index 1f4782b4f0..f776dd8260 100644 --- a/daft/series.py +++ b/daft/series.py @@ -590,6 +590,14 @@ def length(self) -> Series: assert self._series is not None return Series._from_pyseries(self._series.utf8_length()) + def lower(self) -> Series: + assert self._series is not None + return Series._from_pyseries(self._series.utf8_lower()) + + def upper(self) -> Series: + assert self._series is not None + return Series._from_pyseries(self._series.utf8_upper()) + class SeriesDateNamespace(SeriesNamespace): def date(self) -> Series: diff --git a/docs/source/api_docs/expressions.rst b/docs/source/api_docs/expressions.rst index e977c849d8..f0d5002846 100644 --- a/docs/source/api_docs/expressions.rst +++ b/docs/source/api_docs/expressions.rst @@ -98,6 +98,8 @@ The following methods are available under the ``expr.str`` attribute. Expression.str.concat Expression.str.length Expression.str.split + Expression.str.lower + Expression.str.upper .. _api-expressions-temporal: diff --git a/docs/source/user_guide/integrations.rst b/docs/source/user_guide/integrations.rst index 4a565cfa22..6fdd7adcfe 100644 --- a/docs/source/user_guide/integrations.rst +++ b/docs/source/user_guide/integrations.rst @@ -3,6 +3,7 @@ Integrations .. toctree:: + integrations/ray integrations/iceberg integrations/microsoft-azure integrations/aws diff --git a/docs/source/user_guide/integrations/ray.rst b/docs/source/user_guide/integrations/ray.rst new file mode 100644 index 0000000000..3ea94cd20a --- /dev/null +++ b/docs/source/user_guide/integrations/ray.rst @@ -0,0 +1,90 @@ +Ray +=== + +`Ray `_ is an open-source framework for distributed computing. + +Daft's native support for Ray enables you to run distributed DataFrame workloads at scale. + +Usage +----- + +You can run Daft on Ray in two ways: by using the `Ray Client `_ or by submitting a Ray job. + +Ray Client +********** +The Ray client is quick way to get started with running tasks and retrieving their results on Ray using Python. + +.. WARNING:: + To run tasks using the Ray client, the version of Daft and the minor version (eg. 3.9, 3.10) of Python must match between client and server. + +Here's an example of how you can use the Ray client with Daft: + +.. code:: python + + >>> import daft + >>> import ray + >>> + >>> # Refer to the note under "Ray Job" for details on "runtime_env" + >>> ray.init("ray://:10001", runtime_env={"pip": ["getdaft"]}) + >>> + >>> # Starts the Ray client and tells Daft to use Ray to execute queries + >>> # If ray.init() has already been called, it uses the existing client + >>> daft.context.set_runner_ray("ray://:10001") + >>> + >>> df = daft.from_pydict({ + >>> "a": [3, 2, 5, 6, 1, 4], + >>> "b": [True, False, False, True, True, False] + >>> }) + >>> df = df.where(df["b"]).sort(df["a"]) + >>> + >>> # Daft executes the query remotely and returns a preview to the client + >>> df.collect() + ╭───────┬─────────╮ + │ a ┆ b │ + │ --- ┆ --- │ + │ Int64 ┆ Boolean │ + ╞═══════╪═════════╡ + │ 1 ┆ true │ + ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤ + │ 3 ┆ true │ + ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤ + │ 6 ┆ true │ + ╰───────┴─────────╯ + + (Showing first 3 of 3 rows) + +Ray Job +******* +Ray jobs allow for more control and observability over using the Ray client. In addition, your entire code runs on Ray, which means it is not constrained by the compute, network, library versions, or availability of your local machine. + +.. code:: python + + # wd/job.py + + import daft + + def main(): + # call without any arguments to connect to Ray from the head node + daft.context.set_runner_ray() + + # ... Run Daft commands here ... + + if __name__ == "__main__": + main() + +To submit this script as a job, use the Ray CLI, which can be installed with `pip install "ray[default]"`. + +.. code:: sh + + ray job submit \ + --working-dir wd \ + --address "http://:8265" \ + --runtime-env-json '{"pip": ["getdaft"]}' \ + -- python job.py + +.. NOTE:: + + The runtime env parameter specifies that Daft should be installed on the Ray workers. Alternative methods of including Daft in the worker dependencies can be found `here `_. + + +For more information about Ray jobs, see `Ray docs -> Ray Jobs Overview `_. diff --git a/src/daft-core/src/array/ops/utf8.rs b/src/daft-core/src/array/ops/utf8.rs index 7f9987078b..5f6e3eb127 100644 --- a/src/daft-core/src/array/ops/utf8.rs +++ b/src/daft-core/src/array/ops/utf8.rs @@ -146,6 +146,32 @@ impl Utf8Array { Ok(UInt64Array::from((self.name(), Box::new(arrow_result)))) } + pub fn lower(&self) -> DaftResult { + let self_arrow = self.as_arrow(); + let arrow_result = self_arrow + .iter() + .map(|val| { + let v = val?; + Some(v.to_lowercase()) + }) + .collect::>() + .with_validity(self_arrow.validity().cloned()); + Ok(Utf8Array::from((self.name(), Box::new(arrow_result)))) + } + + pub fn upper(&self) -> DaftResult { + let self_arrow = self.as_arrow(); + let arrow_result = self_arrow + .iter() + .map(|val| { + let v = val?; + Some(v.to_uppercase()) + }) + .collect::>() + .with_validity(self_arrow.validity().cloned()); + Ok(Utf8Array::from((self.name(), Box::new(arrow_result)))) + } + fn binary_broadcasted_compare( &self, other: &Self, diff --git a/src/daft-core/src/python/series.rs b/src/daft-core/src/python/series.rs index e89b206399..78d4dd9997 100644 --- a/src/daft-core/src/python/series.rs +++ b/src/daft-core/src/python/series.rs @@ -264,6 +264,14 @@ impl PySeries { Ok(self.series.utf8_length()?.into()) } + pub fn utf8_lower(&self) -> PyResult { + Ok(self.series.utf8_lower()?.into()) + } + + pub fn utf8_upper(&self) -> PyResult { + Ok(self.series.utf8_upper()?.into()) + } + pub fn is_nan(&self) -> PyResult { Ok(self.series.is_nan()?.into()) } diff --git a/src/daft-core/src/series/ops/utf8.rs b/src/daft-core/src/series/ops/utf8.rs index fb2539b64e..d29edafd41 100644 --- a/src/daft-core/src/series/ops/utf8.rs +++ b/src/daft-core/src/series/ops/utf8.rs @@ -50,4 +50,24 @@ impl Series { ))), } } + + pub fn utf8_lower(&self) -> DaftResult { + match self.data_type() { + DataType::Utf8 => Ok(self.utf8()?.lower()?.into_series()), + DataType::Null => Ok(self.clone()), + dt => Err(DaftError::TypeError(format!( + "Lower not implemented for type {dt}" + ))), + } + } + + pub fn utf8_upper(&self) -> DaftResult { + match self.data_type() { + DataType::Utf8 => Ok(self.utf8()?.upper()?.into_series()), + DataType::Null => Ok(self.clone()), + dt => Err(DaftError::TypeError(format!( + "Upper not implemented for type {dt}" + ))), + } + } } diff --git a/src/daft-dsl/src/functions/utf8/lower.rs b/src/daft-dsl/src/functions/utf8/lower.rs new file mode 100644 index 0000000000..56e153e167 --- /dev/null +++ b/src/daft-dsl/src/functions/utf8/lower.rs @@ -0,0 +1,46 @@ +use daft_core::{ + datatypes::{DataType, Field}, + schema::Schema, + series::Series, +}; + +use crate::Expr; +use common_error::{DaftError, DaftResult}; + +use super::super::FunctionEvaluator; + +pub(super) struct LowerEvaluator {} + +impl FunctionEvaluator for LowerEvaluator { + fn fn_name(&self) -> &'static str { + "lower" + } + + fn to_field(&self, inputs: &[Expr], schema: &Schema, _: &Expr) -> DaftResult { + match inputs { + [data] => match data.to_field(schema) { + Ok(data_field) => match &data_field.dtype { + DataType::Utf8 => Ok(Field::new(data_field.name, DataType::Utf8)), + _ => Err(DaftError::TypeError(format!( + "Expects input to lower to be utf8, but received {data_field}", + ))), + }, + Err(e) => Err(e), + }, + _ => Err(DaftError::SchemaMismatch(format!( + "Expected 1 input args, got {}", + inputs.len() + ))), + } + } + + fn evaluate(&self, inputs: &[Series], _: &Expr) -> DaftResult { + match inputs { + [data] => data.utf8_lower(), + _ => Err(DaftError::ValueError(format!( + "Expected 1 input args, got {}", + inputs.len() + ))), + } + } +} diff --git a/src/daft-dsl/src/functions/utf8/mod.rs b/src/daft-dsl/src/functions/utf8/mod.rs index 5c8901147e..c4789ff212 100644 --- a/src/daft-dsl/src/functions/utf8/mod.rs +++ b/src/daft-dsl/src/functions/utf8/mod.rs @@ -1,15 +1,19 @@ mod contains; mod endswith; mod length; +mod lower; mod split; mod startswith; +mod upper; use contains::ContainsEvaluator; use endswith::EndswithEvaluator; use length::LengthEvaluator; +use lower::LowerEvaluator; use serde::{Deserialize, Serialize}; use split::SplitEvaluator; use startswith::StartswithEvaluator; +use upper::UpperEvaluator; use crate::Expr; @@ -22,6 +26,8 @@ pub enum Utf8Expr { Contains, Split, Length, + Lower, + Upper, } impl Utf8Expr { @@ -34,6 +40,8 @@ impl Utf8Expr { Contains => &ContainsEvaluator {}, Split => &SplitEvaluator {}, Length => &LengthEvaluator {}, + Lower => &LowerEvaluator {}, + Upper => &UpperEvaluator {}, } } } @@ -72,3 +80,17 @@ pub fn length(data: &Expr) -> Expr { inputs: vec![data.clone()], } } + +pub fn lower(data: &Expr) -> Expr { + Expr::Function { + func: super::FunctionExpr::Utf8(Utf8Expr::Lower), + inputs: vec![data.clone()], + } +} + +pub fn upper(data: &Expr) -> Expr { + Expr::Function { + func: super::FunctionExpr::Utf8(Utf8Expr::Upper), + inputs: vec![data.clone()], + } +} diff --git a/src/daft-dsl/src/functions/utf8/upper.rs b/src/daft-dsl/src/functions/utf8/upper.rs new file mode 100644 index 0000000000..6c7967561e --- /dev/null +++ b/src/daft-dsl/src/functions/utf8/upper.rs @@ -0,0 +1,46 @@ +use daft_core::{ + datatypes::{DataType, Field}, + schema::Schema, + series::Series, +}; + +use crate::Expr; +use common_error::{DaftError, DaftResult}; + +use super::super::FunctionEvaluator; + +pub(super) struct UpperEvaluator {} + +impl FunctionEvaluator for UpperEvaluator { + fn fn_name(&self) -> &'static str { + "upper" + } + + fn to_field(&self, inputs: &[Expr], schema: &Schema, _: &Expr) -> DaftResult { + match inputs { + [data] => match data.to_field(schema) { + Ok(data_field) => match &data_field.dtype { + DataType::Utf8 => Ok(Field::new(data_field.name, DataType::Utf8)), + _ => Err(DaftError::TypeError(format!( + "Expects input to upper to be utf8, but received {data_field}", + ))), + }, + Err(e) => Err(e), + }, + _ => Err(DaftError::SchemaMismatch(format!( + "Expected 1 input args, got {}", + inputs.len() + ))), + } + } + + fn evaluate(&self, inputs: &[Series], _: &Expr) -> DaftResult { + match inputs { + [data] => data.utf8_upper(), + _ => Err(DaftError::ValueError(format!( + "Expected 1 input args, got {}", + inputs.len() + ))), + } + } +} diff --git a/src/daft-dsl/src/python.rs b/src/daft-dsl/src/python.rs index f135b96bb4..f61583f411 100644 --- a/src/daft-dsl/src/python.rs +++ b/src/daft-dsl/src/python.rs @@ -333,6 +333,16 @@ impl PyExpr { Ok(length(&self.expr).into()) } + pub fn utf8_lower(&self) -> PyResult { + use crate::functions::utf8::lower; + Ok(lower(&self.expr).into()) + } + + pub fn utf8_upper(&self) -> PyResult { + use crate::functions::utf8::upper; + Ok(upper(&self.expr).into()) + } + pub fn image_decode(&self) -> PyResult { use crate::functions::image::decode; Ok(decode(&self.expr).into()) diff --git a/src/daft-io/src/azure_blob.rs b/src/daft-io/src/azure_blob.rs index 24f1547c57..fe9726e1a5 100644 --- a/src/daft-io/src/azure_blob.rs +++ b/src/daft-io/src/azure_blob.rs @@ -71,6 +71,8 @@ enum Error { path: String, source: azure_storage::Error, }, + #[snafu(display("Not a File: \"{}\"", path))] + NotAFile { path: String }, } impl From for super::Error { @@ -98,6 +100,7 @@ impl From for super::Error { path: path.into(), source: error.into(), }, + NotAFile { path } => super::Error::NotAFile { path }, _ => super::Error::Generic { store: super::SourceType::AzureBlob, source: error.into(), @@ -414,6 +417,10 @@ impl ObjectSource for AzureBlobSource { }?; let key = parsed.path(); + if key.is_empty() { + return Err(Error::NotAFile { path: uri.into() }.into()); + } + let container_client = self.blob_client.container_client(container); let blob_client = container_client.blob_client(key); let request_builder = blob_client.get(); @@ -455,6 +462,10 @@ impl ObjectSource for AzureBlobSource { }?; let key = parsed.path(); + if key.is_empty() { + return Err(Error::NotAFile { path: uri.into() }.into()); + } + let container_client = self.blob_client.container_client(container); let blob_client = container_client.blob_client(key); let metadata = blob_client diff --git a/src/daft-io/src/google_cloud.rs b/src/daft-io/src/google_cloud.rs index 34cdde825f..f2ad4e67e4 100644 --- a/src/daft-io/src/google_cloud.rs +++ b/src/daft-io/src/google_cloud.rs @@ -48,7 +48,8 @@ enum Error { UnableToLoadCredentials { source: google_cloud_storage::client::google_cloud_auth::error::Error, }, - + #[snafu(display("Not a File: \"{}\"", path))] + NotAFile { path: String }, #[snafu(display("Not a File: \"{}\"", path))] NotFound { path: String }, } @@ -104,6 +105,7 @@ impl From for super::Error { store: super::SourceType::GCS, source: source.into(), }, + NotAFile { path } => super::Error::NotAFile { path }, } } } @@ -132,6 +134,10 @@ impl GCSClientWrapper { ) -> super::Result { let uri = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?; let (bucket, key) = parse_uri(&uri)?; + if key.is_empty() { + return Err(Error::NotAFile { path: uri.into() }.into()); + } + let client = &self.0; let req = GetObjectRequest { bucket: bucket.into(), @@ -174,6 +180,9 @@ impl GCSClientWrapper { async fn get_size(&self, uri: &str, io_stats: Option) -> super::Result { let uri = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?; let (bucket, key) = parse_uri(&uri)?; + if key.is_empty() { + return Err(Error::NotAFile { path: uri.into() }.into()); + } let client = &self.0; let req = GetObjectRequest { bucket: bucket.into(), diff --git a/src/daft-io/src/http.rs b/src/daft-io/src/http.rs index b31f46060b..b234477c4d 100644 --- a/src/daft-io/src/http.rs +++ b/src/daft-io/src/http.rs @@ -152,6 +152,7 @@ impl From for super::Error { source: source.into(), }, }, + UnableToDetermineSize { path } => super::Error::UnableToDetermineSize { path }, _ => super::Error::Generic { store: super::SourceType::Http, source: error.into(), diff --git a/src/daft-io/src/lib.rs b/src/daft-io/src/lib.rs index 70e3f7b1fd..f5d8133e26 100644 --- a/src/daft-io/src/lib.rs +++ b/src/daft-io/src/lib.rs @@ -1,6 +1,6 @@ #![feature(async_closure)] #![feature(let_chains)] - +#![feature(io_error_more)] mod azure_blob; mod google_cloud; mod http; @@ -72,6 +72,9 @@ pub enum Error { #[snafu(display("Not a File: \"{}\"", path))] NotAFile { path: String }, + #[snafu(display("Unable to determine size of {}", path))] + UnableToDetermineSize { path: String }, + #[snafu(display("Unable to load Credentials for store: {store}\nDetails:\n{source:?}"))] UnableToLoadCredentials { store: SourceType, source: DynError }, diff --git a/src/daft-io/src/local.rs b/src/daft-io/src/local.rs index 1f7afdc26d..5538b1cab8 100644 --- a/src/daft-io/src/local.rs +++ b/src/daft-io/src/local.rs @@ -65,15 +65,16 @@ enum Error { #[snafu(display("Unable to convert URL \"{}\" to local file path", path))] InvalidFilePath { path: String }, + + #[snafu(display("Attempted to Read Directory as File {}", path))] + IsADirectory { path: String }, } impl From for super::Error { fn from(error: Error) -> Self { use Error::*; match error { - UnableToOpenFile { path, source } - | UnableToFetchFileMetadata { path, source } - | UnableToFetchDirectoryEntries { path, source } => { + UnableToOpenFile { path, source } | UnableToFetchDirectoryEntries { path, source } => { use std::io::ErrorKind::*; match source.kind() { NotFound => super::Error::NotFound { @@ -86,6 +87,19 @@ impl From for super::Error { }, } } + UnableToFetchFileMetadata { path, source } => { + use std::io::ErrorKind::*; + match source.kind() { + NotFound | IsADirectory => super::Error::NotFound { + path, + source: source.into(), + }, + _ => super::Error::UnableToOpenFile { + path, + source: source.into(), + }, + } + } UnableToReadBytes { path, source } => super::Error::UnableToReadBytes { path, source }, InvalidUrl { url, source } => super::Error::InvalidUrl { path: url.to_string_lossy().into_owned(), @@ -139,7 +153,14 @@ impl ObjectSource for LocalSource { .context(UnableToFetchFileMetadataSnafu { path: uri.to_string(), })?; - Ok(meta.len() as usize) + + if meta.is_dir() { + Err(super::Error::NotAFile { + path: uri.to_owned(), + }) + } else { + Ok(meta.len() as usize) + } } async fn glob( diff --git a/src/daft-io/src/object_store_glob.rs b/src/daft-io/src/object_store_glob.rs index ba61ca3283..d7ad04a2d1 100644 --- a/src/daft-io/src/object_store_glob.rs +++ b/src/daft-io/src/object_store_glob.rs @@ -346,17 +346,31 @@ pub(crate) async fn glob( if !full_fragment.has_special_character() { let mut remaining_results = limit; let glob = full_fragment.escaped_str().to_string(); + return Ok(stream! { - let mut results = source.iter_dir(glob.as_str(), true, page_size, io_stats).await?; - while let Some(result) = results.next().await && remaining_results.map(|rr| rr > 0).unwrap_or(true) { - match result { - Ok(fm) => { - if _should_return(&fm) { - remaining_results = remaining_results.map(|rr| rr - 1); - yield Ok(fm) - } - }, - Err(e) => yield Err(e), + let mut attempt_as_dir = true; + if !glob.ends_with(GLOB_DELIMITER) { + attempt_as_dir = false; + // If doesn't have a glob character and doesn't end with a delimiter, assume its a file first. + let maybe_size = source.get_size(&glob, io_stats.clone()).await; + match maybe_size { + Ok(size_bytes) => yield Ok(FileMetadata{filepath: glob.clone(), size: Some(size_bytes as u64), filetype: FileType::File }), + Err(crate::Error::NotAFile {..} | crate::Error::NotFound { .. } | crate::Error::UnableToDetermineSize { .. }) => {attempt_as_dir = true;}, + Err(err) => yield Err(err), + } + } + if attempt_as_dir { + let mut results = source.iter_dir(glob.as_str(), true, page_size, io_stats).await?; + while let Some(result) = results.next().await && remaining_results.map(|rr| rr > 0).unwrap_or(true) { + match result { + Ok(fm) => { + if _should_return(&fm) { + remaining_results = remaining_results.map(|rr| rr - 1); + yield Ok(fm) + } + }, + Err(e) => yield Err(e), + } } } } diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index 4600310cab..2ce839eb33 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -1,11 +1,11 @@ -use std::sync::Arc; +use std::{sync::Arc, vec}; use common_error::{DaftError, DaftResult}; use daft_core::schema::SchemaRef; use daft_csv::CsvParseOptions; use daft_io::{parse_url, FileMetadata, IOClient, IOStatsContext, IOStatsRef}; use daft_parquet::read::ParquetSchemaInferenceOptions; -use futures::{stream::BoxStream, StreamExt}; +use futures::{stream::BoxStream, StreamExt, TryStreamExt}; use snafu::Snafu; use crate::{ @@ -81,6 +81,43 @@ fn run_glob( Ok(Box::new(iterator)) } +fn run_glob_parallel( + glob_paths: Vec, + io_client: Arc, + runtime: Arc, + io_stats: Option, +) -> DaftResult>> { + let num_parallel_tasks = 64; + + let owned_runtime = runtime.clone(); + let boxstream = futures::stream::iter(glob_paths.into_iter().map(move |path| { + let (_, parsed_glob_path) = parse_url(&path).unwrap(); + let glob_input = parsed_glob_path.as_ref().to_string(); + let io_client = io_client.clone(); + let io_stats = io_stats.clone(); + + runtime.spawn(async move { + let stream = io_client + .glob(glob_input, None, None, None, io_stats) + .await?; + let results = stream.collect::>().await; + Result::<_, daft_io::Error>::Ok(futures::stream::iter(results)) + }) + })) + .buffered(num_parallel_tasks) + .map(|v| v.map_err(|e| daft_io::Error::JoinError { source: e })?) + .try_flatten() + .map(|v| Ok(v?)) + .boxed(); + + // Construct a static-lifetime BoxStreamIterator + let iterator = BoxStreamIterator { + boxstream, + runtime_handle: owned_runtime.handle().clone(), + }; + Ok(iterator) +} + impl GlobScanOperator { pub fn try_new( glob_paths: &[&str], @@ -220,23 +257,12 @@ impl ScanOperator for GlobScanOperator { self.glob_paths )); - // Run [`run_glob`] on each path and mux them into the same iterator - let files = self - .glob_paths - .clone() - .into_iter() - .flat_map(move |glob_path| { - match run_glob( - glob_path.as_str(), - None, - io_client.clone(), - io_runtime.clone(), - Some(io_stats.clone()), - ) { - Ok(paths) => paths, - Err(err) => Box::new(vec![Err(err)].into_iter()), - } - }); + let files = run_glob_parallel( + self.glob_paths.clone(), + io_client.clone(), + io_runtime.clone(), + Some(io_stats.clone()), + )?; let file_format_config = self.file_format_config.clone(); let schema = self.schema.clone(); diff --git a/tests/cookbook/test_write.py b/tests/cookbook/test_write.py index 2f611c4059..bb1dba9668 100644 --- a/tests/cookbook/test_write.py +++ b/tests/cookbook/test_write.py @@ -21,6 +21,8 @@ def test_parquet_write(tmp_path): assert_df_equals(df.to_pandas(), read_back_pd_df) assert len(pd_df) == 1 + assert pd_df._preview.preview_partition is None + pd_df.__repr__() assert len(pd_df._preview.preview_partition) == 1 @@ -33,6 +35,8 @@ def test_parquet_write_with_partitioning(tmp_path): assert_df_equals(df.to_pandas(), read_back_pd_df) assert len(pd_df) == 5 + assert pd_df._preview.preview_partition is None + pd_df.__repr__() assert len(pd_df._preview.preview_partition) == 5 @@ -41,6 +45,8 @@ def test_empty_parquet_write_without_partitioning(tmp_path): df = df.where(daft.lit(False)) output_files = df.write_parquet(tmp_path) assert len(output_files) == 0 + assert output_files._preview.preview_partition is None + output_files.__repr__() assert len(output_files._preview.preview_partition) == 0 @@ -49,6 +55,8 @@ def test_empty_parquet_write_with_partitioning(tmp_path): df = df.where(daft.lit(False)) output_files = df.write_parquet(tmp_path, partition_cols=["Borough"]) assert len(output_files) == 0 + assert output_files._preview.preview_partition is None + output_files.__repr__() assert len(output_files._preview.preview_partition) == 0 @@ -69,6 +77,8 @@ def test_parquet_write_with_partitioning_readback_values(tmp_path): assert_df_equals(df.to_pandas(), read_back_pd_df) assert len(output_files) == 5 + assert output_files._preview.preview_partition is None + output_files.__repr__() assert len(output_files._preview.preview_partition) == 5 @@ -193,6 +203,8 @@ def test_csv_write(tmp_path): assert_df_equals(df.to_pandas(), read_back_pd_df) assert len(pd_df) == 1 + assert pd_df._preview.preview_partition is None + pd_df.__repr__() assert len(pd_df._preview.preview_partition) == 1 diff --git a/tests/dataframe/test_decimals.py b/tests/dataframe/test_decimals.py index 530146a098..2005790c9d 100644 --- a/tests/dataframe/test_decimals.py +++ b/tests/dataframe/test_decimals.py @@ -24,7 +24,7 @@ def test_decimal_parquet_roundtrip() -> None: df.write_parquet(dirname) df_readback = daft.read_parquet(dirname).collect() - assert str(df.to_pydict()["decimal128"]) == str(df_readback.to_pydict()["decimal128"]) + assert str(df.to_pydict()["decimal128"]) == str(df_readback.to_pydict()["decimal128"]) def test_arrow_decimal() -> None: diff --git a/tests/dataframe/test_repr.py b/tests/dataframe/test_repr.py index f84e13c0e7..636552978a 100644 --- a/tests/dataframe/test_repr.py +++ b/tests/dataframe/test_repr.py @@ -4,6 +4,7 @@ import numpy as np import pandas as pd +import pytest from PIL import Image import daft @@ -86,6 +87,16 @@ def test_empty_repr(make_df): assert df._repr_html_() == "(No data to display: Dataframe has no columns)" +@pytest.mark.parametrize("num_preview_rows", [9, 10, None]) +def test_repr_with_non_default_preview_rows(make_df, num_preview_rows): + df = make_df({"A": [i for i in range(10)], "B": [i for i in range(10)]}) + df.collect(num_preview_rows=num_preview_rows) + df.__repr__() + + assert df._preview.dataframe_num_rows == 10 + assert len(df._preview.preview_partition) == (num_preview_rows if num_preview_rows is not None else 10) + + def test_empty_df_repr(make_df): df = make_df({"A": [1, 2, 3], "B": ["a", "b", "c"]}) df = df.where(df["A"] > 10) diff --git a/tests/dataframe/test_show.py b/tests/dataframe/test_show.py index df32865551..dd5ced328f 100644 --- a/tests/dataframe/test_show.py +++ b/tests/dataframe/test_show.py @@ -26,13 +26,14 @@ def test_show_some(make_df, valid_data, data_source): assert df_display.num_rows == 1 -def test_show_from_cached_collect(make_df, valid_data): +def test_show_from_cached_repr(make_df, valid_data): df = make_df(valid_data) df = df.collect() + df.__repr__() collected_preview = df._preview df_display = df._construct_show_display(8) - # Check that cached preview from df.collect() was used. + # Check that cached preview from df.__repr__() was used. assert df_display.preview is collected_preview assert df_display.schema == df.schema() assert len(df_display.preview.preview_partition) == len(valid_data) @@ -40,30 +41,32 @@ def test_show_from_cached_collect(make_df, valid_data): assert df_display.num_rows == 3 -def test_show_from_cached_collect_prefix(make_df, valid_data): +def test_show_from_cached_repr_prefix(make_df, valid_data): df = make_df(valid_data) df = df.collect(3) + df.__repr__() df_display = df._construct_show_display(2) assert df_display.schema == df.schema() assert len(df_display.preview.preview_partition) == 2 - # Check that a prefix of the cached preview from df.collect() was used, so dataframe_num_rows should be set. + # Check that a prefix of the cached preview from df.__repr__() was used, so dataframe_num_rows should be set. assert df_display.preview.dataframe_num_rows == 3 assert df_display.num_rows == 2 -def test_show_not_from_cached_collect(make_df, valid_data, data_source): +def test_show_not_from_cached_repr(make_df, valid_data, data_source): df = make_df(valid_data) df = df.collect(2) + df.__repr__() collected_preview = df._preview df_display = df._construct_show_display(8) variant = data_source if variant == "parquet": - # Cached preview from df.collect() is NOT USED because data was not materialized from parquet. + # Cached preview from df.__repr__() is NOT USED because data was not materialized from parquet. assert df_display.preview != collected_preview elif variant == "arrow": - # Cached preview from df.collect() is USED because data was materialized from arrow. + # Cached preview from df.__repr__() is USED because data was materialized from arrow. assert df_display.preview == collected_preview assert df_display.schema == df.schema() assert len(df_display.preview.preview_partition) == len(valid_data) diff --git a/tests/dataframe/test_temporals.py b/tests/dataframe/test_temporals.py index 1c0cbcda9c..3973d3c7ce 100644 --- a/tests/dataframe/test_temporals.py +++ b/tests/dataframe/test_temporals.py @@ -90,7 +90,7 @@ def test_temporal_file_roundtrip(format, use_native_downloader) -> None: df.write_parquet(dirname) df_readback = daft.read_parquet(dirname, use_native_downloader=use_native_downloader).collect() - assert df.to_pydict() == df_readback.to_pydict() + assert df.to_pydict() == df_readback.to_pydict() @pytest.mark.parametrize( diff --git a/tests/expressions/typing/test_str.py b/tests/expressions/typing/test_str.py index 99ab473190..1018dcff4e 100644 --- a/tests/expressions/typing/test_str.py +++ b/tests/expressions/typing/test_str.py @@ -43,3 +43,23 @@ def test_str_length(): run_kernel=s.str.length, resolvable=True, ) + + +def test_str_lower(): + s = Series.from_arrow(pa.array(["Foo", "BarBaz", "QUUX"]), name="arg") + assert_typing_resolve_vs_runtime_behavior( + data=[s], + expr=col(s.name()).str.lower(), + run_kernel=s.str.lower, + resolvable=True, + ) + + +def test_str_upper(): + s = Series.from_arrow(pa.array(["Foo", "BarBaz", "quux"]), name="arg") + assert_typing_resolve_vs_runtime_behavior( + data=[s], + expr=col(s.name()).str.upper(), + run_kernel=s.str.lower, + resolvable=True, + ) diff --git a/tests/integration/io/test_list_files_http.py b/tests/integration/io/test_list_files_http.py index a8cf410e22..0893736698 100644 --- a/tests/integration/io/test_list_files_http.py +++ b/tests/integration/io/test_list_files_http.py @@ -103,7 +103,7 @@ def test_http_listing_absolute_urls(nginx_config, tmpdir): ) with mount_data_nginx(nginx_config, tmpdir): - http_path = f"{nginx_http_url}/index.html" + http_path = f"{nginx_http_url}/" daft_ls_result = io_glob(http_path) # NOTE: Cannot use fsspec here because they do not correctly find the links @@ -129,7 +129,7 @@ def test_http_listing_absolute_base_urls(nginx_config, tmpdir): ) with mount_data_nginx(nginx_config, tmpdir): - http_path = f"{nginx_http_url}/index.html" + http_path = f"{nginx_http_url}/" daft_ls_result = io_glob(http_path) # NOTE: Cannot use fsspec here because they do not correctly find the links diff --git a/tests/series/test_utf8_ops.py b/tests/series/test_utf8_ops.py index 1019769501..e1975f0359 100644 --- a/tests/series/test_utf8_ops.py +++ b/tests/series/test_utf8_ops.py @@ -222,3 +222,43 @@ def test_series_utf8_length_all_null() -> None: s = Series.from_arrow(pa.array([None, None, None])) result = s.str.length() assert result.to_pylist() == [None, None, None] + + +@pytest.mark.parametrize( + ["data", "expected"], + [ + (["Foo", "BarBaz", "QUUX"], ["foo", "barbaz", "quux"]), + # With at least one null + (["Foo", None, "BarBaz", "QUUX"], ["foo", None, "barbaz", "quux"]), + # With all nulls + ([None] * 4, [None] * 4), + # With at least one numeric strings + (["Foo", "BarBaz", "QUUX", "2"], ["foo", "barbaz", "quux", "2"]), + # With all numeric strings + (["1", "2", "3"], ["1", "2", "3"]), + ], +) +def test_series_utf8_lower(data, expected) -> None: + s = Series.from_arrow(pa.array(data)) + result = s.str.lower() + assert result.to_pylist() == expected + + +@pytest.mark.parametrize( + ["data", "expected"], + [ + (["Foo", "BarBaz", "quux"], ["FOO", "BARBAZ", "QUUX"]), + # With at least one null + (["Foo", None, "BarBaz", "quux"], ["FOO", None, "BARBAZ", "QUUX"]), + # With all nulls + ([None] * 4, [None] * 4), + # With at least one numeric strings + (["Foo", "BarBaz", "quux", "2"], ["FOO", "BARBAZ", "QUUX", "2"]), + # With all numeric strings + (["1", "2", "3"], ["1", "2", "3"]), + ], +) +def test_series_utf8_upper(data, expected) -> None: + s = Series.from_arrow(pa.array(data)) + result = s.str.upper() + assert result.to_pylist() == expected diff --git a/tests/table/utf8/test_lower.py b/tests/table/utf8/test_lower.py new file mode 100644 index 0000000000..6ed7d6b050 --- /dev/null +++ b/tests/table/utf8/test_lower.py @@ -0,0 +1,10 @@ +from __future__ import annotations + +from daft.expressions import col +from daft.table import MicroPartition + + +def test_utf8_lower(): + table = MicroPartition.from_pydict({"col": ["Foo", None, "BarBaz", "QUUX"]}) + result = table.eval_expression_list([col("col").str.lower()]) + assert result.to_pydict() == {"col": ["foo", None, "barbaz", "quux"]} diff --git a/tests/table/utf8/test_upper.py b/tests/table/utf8/test_upper.py new file mode 100644 index 0000000000..812afdf7d3 --- /dev/null +++ b/tests/table/utf8/test_upper.py @@ -0,0 +1,10 @@ +from __future__ import annotations + +from daft.expressions import col +from daft.table import MicroPartition + + +def test_utf8_upper(): + table = MicroPartition.from_pydict({"col": ["Foo", None, "BarBaz", "quux", "1"]}) + result = table.eval_expression_list([col("col").str.upper()]) + assert result.to_pydict() == {"col": ["FOO", None, "BARBAZ", "QUUX", "1"]}