Skip to content

Commit

Permalink
remove validate_dataset and regenerate_dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
rjzamora committed Jul 19, 2024
1 parent 4eb42db commit 7f43473
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 658 deletions.
99 changes: 3 additions & 96 deletions merlin/io/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1130,103 +1130,10 @@ def npartitions(self):
return self.to_ddf().npartitions

def validate_dataset(self, **kwargs):
"""Validate for efficient processing.
raise NotImplementedError(""" validate_dataset is not supported for merlin >23.08 """)

The purpose of this method is to validate that the Dataset object
meets the minimal requirements for efficient NVTabular processing.
For now, this criteria requires the data to be in parquet format.
Example Usage::
dataset = Dataset("/path/to/data_pq", engine="parquet")
assert validate_dataset(dataset)
Parameters
-----------
**kwargs :
Key-word arguments to pass down to the engine's validate_dataset
method. For the recommended parquet format, these arguments
include `add_metadata_file`, `row_group_max_size`, `file_min_size`,
and `require_metadata_file`. For more information, see
`ParquetDatasetEngine.validate_dataset`.
Returns
-------
valid : bool
`True` if the input dataset is valid for efficient NVTabular
processing.
"""

# Check that the dataset format is Parquet
if not isinstance(self.engine, ParquetDatasetEngine):
msg = (
"NVTabular is optimized for the parquet format. Please use "
"the to_parquet method to convert your dataset."
)
warnings.warn(msg)
return False # Early return

return self.engine.validate_dataset(**kwargs)

def regenerate_dataset(
self,
output_path,
columns=None,
output_format="parquet",
compute=True,
**kwargs,
):
"""EXPERIMENTAL:
Regenerate an NVTabular Dataset for efficient processing by writing
out new Parquet files. In contrast to default ``to_parquet`` behavior,
this method preserves the original ordering.
Example Usage::
dataset = Dataset("/path/to/data_pq", engine="parquet")
dataset.regenerate_dataset(
out_path, part_size="1MiB", file_size="10MiB"
)
Parameters
-----------
output_path : string
Root directory path to use for the new (regenerated) dataset.
columns : list(string), optional
Subset of columns to include in the regenerated dataset.
output_format : string, optional
Format to use for regenerated dataset. Only "parquet" (default)
is currently supported.
compute : bool, optional
Whether to compute the task graph or to return a Delayed object.
By default, the graph will be executed.
**kwargs :
Key-word arguments to pass down to the engine's regenerate_dataset
method. See `ParquetDatasetEngine.regenerate_dataset` for more
information.
Returns
-------
result : int or Delayed
If `compute=True` (default), the return value will be an integer
corresponding to the number of generated data files. If `False`,
the returned value will be a `Delayed` object.
"""

# Check that the desired output format is Parquet
if output_format not in ["parquet"]:
msg = (
f"NVTabular is optimized for the parquet format. "
f"{output_format} is not yet a supported output format for "
f"regenerate_dataset."
)
raise ValueError(msg)

result = ParquetDatasetEngine.regenerate_dataset(self, output_path, columns=None, **kwargs)
if compute:
return result.compute()
else:
return result
def regenerate_dataset(self, *args, **kwargs):
raise NotImplementedError(""" regenerate_dataset is not supported for merlin >23.08 """)

def infer_schema(self, n=1):
"""Create a schema containing the column names and inferred dtypes of the Dataset
Expand Down
12 changes: 0 additions & 12 deletions merlin/io/dataset_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,6 @@ def _path_partition_map(self):
def num_rows(self):
raise NotImplementedError(""" Returns the number of rows in the dataset """)

def validate_dataset(self, **kwargs):
raise NotImplementedError(
"""The `validate_dataset` method is no longer supported
for merlin versions >23.08, because it relies on deprecated
functionality in pyarrow.
"""
)

@classmethod
def regenerate_dataset(cls, dataset, output_path, columns=None, **kwargs):
raise NotImplementedError(""" Regenerate a dataset with optimal properties """)

def sample_data(self, n=1):
"""Return a sample of real data from the dataset
Expand Down
Loading

0 comments on commit 7f43473

Please sign in to comment.