diff --git a/py/server/deephaven/_table_reader.py b/py/server/deephaven/_table_reader.py index 2c49372ec20..8e3b9f380ac 100644 --- a/py/server/deephaven/_table_reader.py +++ b/py/server/deephaven/_table_reader.py @@ -3,7 +3,7 @@ # """This module supports reading the data in a Deephaven table in a chunked manner.""" from collections import namedtuple -from typing import Union, Sequence, Generator, Dict, Optional, Any, Tuple +from typing import Union, Sequence, Generator, Dict, Optional, Any, Tuple, Callable, TypeVar import jpy import numpy as np @@ -16,6 +16,7 @@ _JTableUpdateDataReader = jpy.get_type("io.deephaven.integrations.python.PythonTableDataReader") +T = TypeVar('T') def _col_defs(table: Table, cols: Union[str, Sequence[str]]) -> Sequence[Column]: if not cols: @@ -54,19 +55,15 @@ def _table_reader_all_dict(table: Table, cols: Optional[Union[str, Sequence[str] with update_graph.auto_locking_ctx(table): try: j_array = _JTableUpdateDataReader.readChunkColumnMajor(j_reader_context, row_set, col_sources, prev) - - col_dict = {} - for i, col_def in enumerate(col_defs): - col_dict[col_def.name] = _column_to_numpy_array(col_def, j_array[i]) if to_numpy else j_array[i] - - return col_dict + return {col_def.name: _column_to_numpy_array(col_def, j_array[i]) if to_numpy else j_array[i] for i, col_def in enumerate(col_defs)} finally: j_reader_context.close() -def _table_reader_chunk_dict(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, *, row_set: jpy.JType, - chunk_size: int = 2048, prev: bool = False, to_numpy: bool = True) \ - -> Generator[Dict[str, Union[np.ndarray | jpy.JType]], None, None]: +def _table_reader_chunk(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, *, + emitter: Callable[[Sequence[Column], jpy.JType], Generator[T, None, None]], row_set: jpy.JType, + chunk_size: int = 2048, prev: bool = False) \ + -> Generator[T, None, None]: """ A generator that reads the chunks of rows over the given row set of a table into a dictionary. The dictionary is a map of column names to numpy arrays or Java arrays. @@ -76,7 +73,6 @@ def _table_reader_chunk_dict(table: Table, cols: Optional[Union[str, Sequence[st row_set (jpy.JType): The row set to read. chunk_size (int): The number of rows to read at a time. Default is 4096. prev (bool): If True, read the previous values. Default is False. - to_numpy (bool): If True, convert the column data to numpy arrays. Default is True. Returns: A generator that yields a dictionary of column names to numpy arrays or Java arrays. @@ -98,51 +94,41 @@ def _table_reader_chunk_dict(table: Table, cols: Optional[Union[str, Sequence[st chunk_row_set = row_sequence_iterator.getNextRowSequenceWithLength(chunk_size) j_array = _JTableUpdateDataReader.readChunkColumnMajor(j_reader_context, chunk_row_set, col_sources, prev) - - col_dict = {} - for i, col_def in enumerate(col_defs): - col_dict[col_def.name] = _column_to_numpy_array(col_def, j_array[i]) if to_numpy else j_array[i] - - yield col_dict + yield from emitter(col_defs, j_array) finally: j_reader_context.close() row_sequence_iterator.close() - -def _table_reader_dict(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, chunk_size: int = 2048) \ - -> Generator[Dict[str, Any], None, None]: - """ A generator that reads one row at a time from a table into a dictionary. The dictionary is a map of column names - to scalar values of the column data type. +def _table_reader_chunk_dict(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, *, row_set: jpy.JType, + chunk_size: int = 2048, prev: bool = False) \ + -> Generator[Dict[str, np.ndarray], None, None]: + """ A generator that reads the chunks of rows over the given row set of a table into a dictionary. The dictionary is + a map of column names to numpy arrays or Java arrays. Args: - table (Table): The table to read. + table (Table): The table to read. cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read. - chunk_size (int): The number of rows to read at a time internally to reduce the number of Java/Python boundary - crossings. Default is 2048. + row_set (jpy.JType): The row set to read. + chunk_size (int): The number of rows to read at a time. Default is 4096. + prev (bool): If True, read the previous values. Default is False. Returns: - A generator that yields a dictionary of column names to values. + A generator that yields a dictionary of column names to numpy arrays or Java arrays. Raises: ValueError """ - col_defs = _col_defs(table, cols) + def _emitter(col_defs, j_array) -> Generator[Dict[str, np.ndarray], None, None]: + yield {col_def.name: _column_to_numpy_array(col_def, j_array[i]) for i, col_def in enumerate(col_defs)} - for chunk_dict in _table_reader_chunk_dict(table, cols=cols, row_set=table.j_table.getRowSet(), - chunk_size=chunk_size, - prev=False, to_numpy=False): - chunk_size = len(chunk_dict[col_defs[0].name]) - for i in range(chunk_size): - col_dict = {} - for col_def in col_defs: - col_dict[col_def.name] = chunk_dict[col_def.name][i] - yield col_dict + yield from _table_reader_chunk(table, cols, emitter=_emitter, row_set=row_set, chunk_size=chunk_size, prev=prev) -def _table_reader_tuple(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, tuple_name: str = 'Deephaven', - chunk_size: int = 2048) -> Generator[Tuple[Any, ...], None, None]: - """ Returns a generator that reads one row at a time from the table into a named tuple. The named tuple is made - up of fields with their names being the column names and their values being of the column data types. +def _table_reader_chunk_tuple(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, *, + tuple_name: str = 'Deephaven', chunk_size: int = 2048,) -> Generator[Tuple[np.ndarray, ...], None, None]: + """ Returns a generator that reads one chunk of rows at a time from the table into a named tuple. The named + tuple is made up of fields with their names being the column names and their values being numpy arrays of the + column data types. If the table is refreshing and no update graph locks are currently being held, the generator will try to acquire the shared lock of the update graph before reading the table data. This provides a consistent view of the data. @@ -154,28 +140,51 @@ def _table_reader_tuple(table: Table, cols: Optional[Union[str, Sequence[str]]] Args: table (Table): The table to read. - cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read. Default is None. + cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read. tuple_name (str): The name of the named tuple. Default is 'Deephaven'. - chunk_size (int): The number of rows to read at a time internally to reduce the number of Java/Python boundary - crossings. Default is 2048. + chunk_size (int): The number of rows to read at a time. Default is 4096. Returns: - A generator that yields a named tuple for each row in the table + A generator that yields a named tuple for each row in the table. Raises: ValueError """ named_tuple_class = namedtuple(tuple_name, cols or [col.name for col in table.columns], rename=False) - for row in _table_reader_dict(table, cols, chunk_size=chunk_size): - yield named_tuple_class(**row) + def _emitter(col_defs, j_array) -> Generator[Tuple[np.ndarray, ...], None, None]: + yield named_tuple_class._make((_column_to_numpy_array(col_def, j_array[i]) for i, col_def in enumerate(col_defs))) + yield from _table_reader_chunk(table, cols, emitter=_emitter, row_set=table.j_table.getRowSet(), chunk_size=chunk_size, prev=False) -def _table_reader_chunk_tuple(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, *, - tuple_name: str = 'Deephaven', chunk_size: int = 2048,) -> Generator[Tuple[np.ndarray, ...], None, None]: - """ Returns a generator that reads one chunk of rows at a time from the table into a named tuple. The named - tuple is made up of fields with their names being the column names and their values being numpy arrays of the - column data types. +def _table_reader_dict(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, chunk_size: int = 2048) \ + -> Generator[Dict[str, Any], None, None]: + """ A generator that reads one row at a time from a table into a dictionary. The dictionary is a map of column names + to scalar values of the column data type. + + Args: + table (Table): The table to read. + cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read. + chunk_size (int): The number of rows to read at a time internally to reduce the number of Java/Python boundary + crossings. Default is 2048. + + Returns: + A generator that yields a dictionary of column names to values. + + Raises: + ValueError + """ + def _emitter(col_defs, j_array) -> Generator[Dict[str, Any], None, None]: + row_count = len(j_array[0]) + for i in range(row_count): + yield {col_def.name: j_array[j][i] for j, col_def in enumerate(col_defs)} + + yield from _table_reader_chunk(table, cols, emitter=_emitter, row_set=table.j_table.getRowSet(), chunk_size=chunk_size, prev=False) + +def _table_reader_tuple(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, tuple_name: str = 'Deephaven', + chunk_size: int = 2048) -> Generator[Tuple[Any, ...], None, None]: + """ Returns a generator that reads one row at a time from the table into a named tuple. The named tuple is made + up of fields with their names being the column names and their values being of the column data types. If the table is refreshing and no update graph locks are currently being held, the generator will try to acquire the shared lock of the update graph before reading the table data. This provides a consistent view of the data. @@ -187,19 +196,22 @@ def _table_reader_chunk_tuple(table: Table, cols: Optional[Union[str, Sequence[s Args: table (Table): The table to read. - cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read. + cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read. Default is None. tuple_name (str): The name of the named tuple. Default is 'Deephaven'. - chunk_size (int): The number of rows to read at a time. Default is 4096. + chunk_size (int): The number of rows to read at a time internally to reduce the number of Java/Python boundary + crossings. Default is 2048. Returns: - A generator that yields a named tuple for each row in the table. + A generator that yields a named tuple for each row in the table Raises: ValueError """ named_tuple_class = namedtuple(tuple_name, cols or [col.name for col in table.columns], rename=False) - for chunk_dict in _table_reader_chunk_dict(table, cols=cols, row_set=table.j_table.getRowSet(), - chunk_size=chunk_size, - prev=False, to_numpy=True): - yield named_tuple_class(**chunk_dict) + def _emitter(col_defs, j_array) -> Generator[Tuple[Any, ...], None, None]: + row_count = len(j_array[0]) + for i in range(row_count): + yield named_tuple_class._make((j_array[j][i] for j, _ in enumerate(col_defs))) + + yield from _table_reader_chunk(table, cols, emitter=_emitter, row_set=table.j_table.getRowSet(), chunk_size=chunk_size, prev=False) diff --git a/py/server/deephaven/table.py b/py/server/deephaven/table.py index b3fad0f9e2f..2c45d393f62 100644 --- a/py/server/deephaven/table.py +++ b/py/server/deephaven/table.py @@ -585,7 +585,7 @@ def iter_chunk_dict(self, cols: Optional[Union[str, Sequence[str]]] = None, chun from deephaven._table_reader import _table_reader_chunk_dict # to prevent circular import return _table_reader_chunk_dict(self, cols=cols, row_set=self.j_table.getRowSet(), chunk_size=chunk_size, - prev=False, to_numpy=True) + prev=False) def iter_chunk_tuple(self, cols: Optional[Union[str, Sequence[str]]] = None, tuple_name: str = 'Deephaven', chunk_size: int = 2048,)-> Generator[Tuple[np.ndarray, ...], None, None]: diff --git a/py/server/deephaven/table_listener.py b/py/server/deephaven/table_listener.py index 38fc69e8e46..c7db8af827e 100644 --- a/py/server/deephaven/table_listener.py +++ b/py/server/deephaven/table_listener.py @@ -67,7 +67,7 @@ def added_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) -> G return (_ for _ in ()) return _table_reader_chunk_dict(table=self.table, cols=cols, row_set=self.j_table_update.added.asRowSet(), - chunk_size=chunk_size, prev=False, to_numpy=True) + chunk_size=chunk_size, prev=False) def removed(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]: """Returns a dict with each key being a column name and each value being a NumPy array of @@ -83,7 +83,7 @@ def removed(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray return {} return _table_reader_all_dict(table=self.table, cols=cols, row_set=self.j_table_update.removed.asRowSet(), - prev=True, to_numpy=True) + prev=True) def removed_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) -> Generator[ Dict[str, numpy.ndarray], None, None]: @@ -101,7 +101,7 @@ def removed_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) -> return (_ for _ in ()) return _table_reader_chunk_dict(table=self.table, cols=cols, row_set=self.j_table_update.removed.asRowSet(), - chunk_size=chunk_size, prev=True, to_numpy=True) + chunk_size=chunk_size, prev=True) def modified(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]: """Returns a dict with each key being a column name and each value being a NumPy array of the current values of @@ -135,7 +135,7 @@ def modified_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) - return (_ for _ in ()) return _table_reader_chunk_dict(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(), - chunk_size=chunk_size, prev=False, to_numpy=True) + chunk_size=chunk_size, prev=False) def modified_prev(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]: """Returns a dict with each key being a column name and each value being a NumPy array of the previous values of @@ -169,7 +169,7 @@ def modified_prev_chunks(self, chunk_size: int, cols: Union[str, List[str]] = No return (_ for _ in ()) return _table_reader_chunk_dict(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(), - chunk_size=chunk_size, prev=True, to_numpy=True) + chunk_size=chunk_size, prev=True) @property def shifted(self):