diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd index 1caf58e20e653..25a7945dc3ddc 100644 --- a/python/pyarrow/lib.pxd +++ b/python/pyarrow/lib.pxd @@ -525,6 +525,8 @@ cdef class Table(_Tabular): cdef: shared_ptr[CTable] sp_table CTable* table + c_bool _is_cpu + c_bool _init_is_cpu cdef void init(self, const shared_ptr[CTable]& table) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 3b0df981e017c..819bbc34c66b9 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -4180,6 +4180,7 @@ cdef class Table(_Tabular): def __cinit__(self): self.table = NULL + self._init_is_cpu = False cdef void init(self, const shared_ptr[CTable]& table): self.sp_table = table @@ -4205,6 +4206,7 @@ cdef class Table(_Tabular): ArrowInvalid """ if full: + self._assert_cpu() with nogil: check_status(self.table.ValidateFull()) else: @@ -4214,6 +4216,7 @@ cdef class Table(_Tabular): def __reduce__(self): # Reduce the columns as ChunkedArrays to avoid serializing schema # data twice + self._assert_cpu() columns = [col for col in self.columns] return _reconstruct_table, (columns, self.schema) @@ -4452,6 +4455,7 @@ cdef class Table(_Tabular): a.year: [[null,2022]] month: [[4,6]] """ + self._assert_cpu() cdef: shared_ptr[CTable] flattened CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) @@ -4499,6 +4503,7 @@ cdef class Table(_Tabular): n_legs: [[2,2,4,4,5,100]] animals: [["Flamingo","Parrot","Dog","Horse","Brittle stars","Centipede"]] """ + self._assert_cpu() cdef: shared_ptr[CTable] combined CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) @@ -4556,6 +4561,7 @@ cdef class Table(_Tabular): ["Flamingo","Parrot","Dog","Horse","Brittle stars","Centipede"] -- indices: [3,4,5]] """ + self._assert_cpu() cdef: CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) shared_ptr[CTable] c_result @@ -4601,6 +4607,7 @@ cdef class Table(_Tabular): >>> table.equals(table_1, check_metadata=True) False """ + self._assert_cpu() if other is None: return False @@ -4658,6 +4665,7 @@ cdef class Table(_Tabular): n_legs: [[2,4,5,100]] animals: [["Flamingo","Horse","Brittle stars","Centipede"]] """ + self._assert_cpu() cdef: ChunkedArray column, casted Field field @@ -4909,6 +4917,7 @@ cdef class Table(_Tabular): ------- ChunkedArray """ + self._assert_cpu() return chunked_array([ batch.to_struct_array() for batch in self.to_batches(max_chunksize=max_chunksize) @@ -5118,6 +5127,7 @@ cdef class Table(_Tabular): def _to_pandas(self, options, categories=None, ignore_metadata=False, types_mapper=None): + self._assert_cpu() from pyarrow.pandas_compat import table_to_dataframe df = table_to_dataframe( options, self, categories, @@ -5239,6 +5249,7 @@ cdef class Table(_Tabular): >>> table.nbytes 72 """ + self._assert_cpu() cdef: CResult[int64_t] c_res_buffer @@ -5268,6 +5279,7 @@ cdef class Table(_Tabular): >>> table.get_total_buffer_size() 76 """ + self._assert_cpu() cdef: int64_t total_buffer_size @@ -5576,6 +5588,7 @@ cdef class Table(_Tabular): year: [[2020,2022,2021,2019]] n_legs_sum: [[2,6,104,5]] """ + self._assert_cpu() return TableGroupBy(self, keys, use_threads=use_threads) def join(self, right_table, keys, right_keys=None, join_type="left outer", @@ -5685,6 +5698,7 @@ cdef class Table(_Tabular): n_legs: [[100]] animal: [["Centipede"]] """ + self._assert_cpu() if right_keys is None: right_keys = keys return _pac()._perform_join( @@ -5772,6 +5786,7 @@ cdef class Table(_Tabular): n_legs: [[null,5,null,5,null]] animal: [[null,"Brittle stars",null,"Brittle stars",null]] """ + self._assert_cpu() if right_on is None: right_on = on if right_by is None: @@ -5797,8 +5812,23 @@ cdef class Table(_Tabular): ------- PyCapsule """ + self._assert_cpu() return self.to_reader().__arrow_c_stream__(requested_schema) + @property + def is_cpu(self): + """ + Whether all ChunkedArrays are CPU-accessible. + """ + if not self._init_is_cpu: + self._is_cpu = all(c.is_cpu for c in self.itercolumns()) + self._init_is_cpu = True + return self._is_cpu + + cdef void _assert_cpu(self) except *: + if not self.is_cpu: + raise NotImplementedError("Implemented only for data on CPU device") + def _reconstruct_table(arrays, schema): """ diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index c3f805b4b32d6..b66a5eb083cc5 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -3430,6 +3430,21 @@ def cuda_recordbatch(cuda_context, cpu_recordbatch): return cpu_recordbatch.copy_to(cuda_context.memory_manager) +@pytest.fixture +def cpu_table(schema, cpu_chunked_array): + return pa.table([cpu_chunked_array, cpu_chunked_array], schema=schema) + + +@pytest.fixture +def cuda_table(schema, cuda_chunked_array): + return pa.table([cuda_chunked_array, cuda_chunked_array], schema=schema) + + +@pytest.fixture +def cpu_and_cuda_table(schema, cpu_chunked_array, cuda_chunked_array): + return pa.table([cpu_chunked_array, cuda_chunked_array], schema=schema) + + def test_chunked_array_non_cpu(cuda_context, cpu_chunked_array, cuda_chunked_array, cpu_and_cuda_chunked_array): # type test @@ -3586,6 +3601,9 @@ def verify_cuda_recordbatch(batch, expected_schema): def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch, cuda_arrays, schema): verify_cuda_recordbatch(cuda_recordbatch, expected_schema=schema) + N = cuda_recordbatch.num_rows + + # shape test assert cuda_recordbatch.shape == (5, 2) # columns() test @@ -3593,24 +3611,26 @@ def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch, # add_column(), set_column() test for fn in [cuda_recordbatch.add_column, cuda_recordbatch.set_column]: - col = pa.array([6, 7, 8, 9, 10], pa.int8()).copy_to(cuda_context.memory_manager) + col = pa.array([-2, -1, 0, 1, 2], pa.int8() + ).copy_to(cuda_context.memory_manager) new_batch = fn(2, 'c2', col) - assert len(new_batch.columns) == 3 - for c in new_batch.columns: - assert c.device_type == pa.DeviceAllocationType.CUDA + verify_cuda_recordbatch( + new_batch, expected_schema=schema.append(pa.field('c2', pa.int8()))) err_msg = ("Got column on device , " "but expected .") with pytest.raises(TypeError, match=err_msg): - fn(2, 'c2', [1, 1, 1, 1, 1]) + fn(2, 'c2', [1] * N) # remove_column() test new_batch = cuda_recordbatch.remove_column(1) verify_cuda_recordbatch(new_batch, expected_schema=schema.remove(1)) # drop_columns() test - new_batch = cuda_recordbatch.drop_columns(['c0', 'c1']) - assert len(new_batch.columns) == 0 - assert new_batch.device_type == pa.DeviceAllocationType.CUDA + new_batch = cuda_recordbatch.drop_columns(['c1']) + verify_cuda_recordbatch(new_batch, expected_schema=schema.remove(1)) + empty_batch = cuda_recordbatch.drop_columns(['c0', 'c1']) + assert len(empty_batch.columns) == 0 + assert empty_batch.device_type == pa.DeviceAllocationType.CUDA # select() test new_batch = cuda_recordbatch.select(['c0']) @@ -3622,8 +3642,7 @@ def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch, cuda_recordbatch.cast(new_schema) # drop_null() test - null_col = pa.array([-2, -1, 0, 1, 2], - mask=[True, False, True, False, True]).copy_to( + null_col = pa.array([1] * N, mask=[True, False, True, False, True]).copy_to( cuda_context.memory_manager) cuda_recordbatch_with_nulls = cuda_recordbatch.add_column(2, 'c2', null_col) with pytest.raises(NotImplementedError): @@ -3631,7 +3650,7 @@ def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch, # filter() test with pytest.raises(NotImplementedError): - cuda_recordbatch.filter([True] * 5) + cuda_recordbatch.filter([True] * N) # take() test with pytest.raises(NotImplementedError): @@ -3737,3 +3756,206 @@ def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch, # __dataframe__() test with pytest.raises(NotImplementedError): from_dataframe(cuda_recordbatch.__dataframe__()) + + +def verify_cuda_table(table, expected_schema): + table.validate() + assert table.is_cpu is False + assert table.num_columns == len(expected_schema.names) + assert table.column_names == expected_schema.names + assert str(table) in repr(table) + for c in table.columns: + assert c.is_cpu is False + for chunk in c.iterchunks(): + assert chunk.is_cpu is False + assert chunk.device_type == pa.DeviceAllocationType.CUDA + assert table.schema == expected_schema + + +def test_table_non_cpu(cuda_context, cpu_table, cuda_table, + cuda_arrays, cuda_recordbatch, schema): + verify_cuda_table(cuda_table, expected_schema=schema) + N = cuda_table.num_rows + + # shape test + assert cuda_table.shape == (10, 2) + + # columns() test + assert len(cuda_table.columns) == 2 + + # add_column(), set_column() test + for fn in [cuda_table.add_column, cuda_table.set_column]: + cpu_col = pa.array([1] * N, pa.int8()) + cuda_col = cpu_col.copy_to(cuda_context.memory_manager) + new_table = fn(2, 'c2', cuda_col) + verify_cuda_table(new_table, expected_schema=schema.append( + pa.field('c2', pa.int8()))) + new_table = fn(2, 'c2', cpu_col) + assert new_table.is_cpu is False + assert new_table.column(0).is_cpu is False + assert new_table.column(1).is_cpu is False + assert new_table.column(2).is_cpu is True + + # remove_column() test + new_table = cuda_table.remove_column(1) + verify_cuda_table(new_table, expected_schema=schema.remove(1)) + + # drop_columns() test + new_table = cuda_table.drop_columns(['c1']) + verify_cuda_table(new_table, expected_schema=schema.remove(1)) + new_table = cuda_table.drop_columns(['c0', 'c1']) + assert len(new_table.columns) == 0 + assert new_table.is_cpu + + # select() test + new_table = cuda_table.select(['c0']) + verify_cuda_table(new_table, expected_schema=schema.remove(1)) + + # cast() test + new_schema = pa.schema([pa.field('c0', pa.int64()), pa.field('c1', pa.int64())]) + with pytest.raises(NotImplementedError): + cuda_table.cast(new_schema) + + # drop_null() test + null_col = pa.array([1] * N, mask=[True] * N).copy_to(cuda_context.memory_manager) + cuda_table_with_nulls = cuda_table.add_column(2, 'c2', null_col) + with pytest.raises(NotImplementedError): + cuda_table_with_nulls.drop_null() + + # filter() test + with pytest.raises(NotImplementedError): + cuda_table.filter([True] * N) + + # take() test + with pytest.raises(NotImplementedError): + cuda_table.take([0]) + + # sort_by() test + with pytest.raises(NotImplementedError): + cuda_table.sort_by('c0') + + # field() test + assert cuda_table.field(0) == schema.field(0) + assert cuda_table.field(1) == schema.field(1) + + # equals() test + with pytest.raises(NotImplementedError): + assert cuda_table.equals(cpu_table) + + # from_arrays() test + new_table = pa.Table.from_arrays(cuda_arrays, ['c0', 'c1']) + verify_cuda_table(new_table, expected_schema=schema) + + # from_pydict() test + new_table = pa.Table.from_pydict({'c0': cuda_arrays[0], 'c1': cuda_arrays[1]}) + verify_cuda_table(new_table, expected_schema=schema) + + # from_struct_array() test + fields = [schema.field(i) for i in range(len(schema.names))] + struct_array = pa.StructArray.from_arrays(cuda_arrays, fields=fields) + with pytest.raises(NotImplementedError): + pa.Table.from_struct_array(struct_array) + + # from_batches() test + new_table = pa.Table.from_batches([cuda_recordbatch, cuda_recordbatch], schema) + verify_cuda_table(new_table, expected_schema=schema) + + # nbytes test + with pytest.raises(NotImplementedError): + assert cuda_table.nbytes + + # get_total_buffer_size() test + with pytest.raises(NotImplementedError): + assert cuda_table.get_total_buffer_size() + + # to_pydict() test + with pytest.raises(NotImplementedError): + cuda_table.to_pydict() + + # to_pylist() test + with pytest.raises(NotImplementedError): + cuda_table.to_pylist() + + # to_pandas() test + with pytest.raises(NotImplementedError): + cuda_table.to_pandas() + + # to_struct_array() test + with pytest.raises(NotImplementedError): + cuda_table.to_struct_array() + + # to_batches() test + batches = cuda_table.to_batches(max_chunksize=5) + for batch in batches: + # GH-44049 + with pytest.raises(AssertionError): + verify_cuda_recordbatch(batch, expected_schema=schema) + + # to_reader() test + reader = cuda_table.to_reader(max_chunksize=5) + for batch in reader: + # GH-44049 + with pytest.raises(AssertionError): + verify_cuda_recordbatch(batch, expected_schema=schema) + + # slice() test + new_table = cuda_table.slice(1, 3) + verify_cuda_table(new_table, expected_schema=schema) + assert new_table.num_rows == 3 + + # replace_schema_metadata() test + new_table = cuda_table.replace_schema_metadata({b'key': b'value'}) + verify_cuda_table(new_table, expected_schema=schema) + assert new_table.schema.metadata == {b'key': b'value'} + + # rename_columns() test + new_table = cuda_table.rename_columns(['col0', 'col1']) + expected_schema = pa.schema( + [pa.field('col0', schema.field(0).type), + pa.field('col1', schema.field(1).type)]) + verify_cuda_table(new_table, expected_schema=expected_schema) + + # validate() test + cuda_table.validate() + with pytest.raises(NotImplementedError): + cuda_table.validate(full=True) + + # flatten() test + with pytest.raises(NotImplementedError): + cuda_table.flatten() + + # combine_chunks() test + with pytest.raises(NotImplementedError): + cuda_table.flatten() + + # unify_dictionaries() test + with pytest.raises(NotImplementedError): + cuda_table.unify_dictionaries() + + # group_by() test + with pytest.raises(NotImplementedError): + cuda_table.group_by('c0') + + # join() test + with pytest.raises(NotImplementedError): + cuda_table.join(cuda_table, 'c0') + + # join_asof() test + with pytest.raises(NotImplementedError): + cuda_table.join_asof(cuda_table, 'c0', 'c0', 0) + + # __array__() test + with pytest.raises(NotImplementedError): + cuda_table.__array__() + + # __arrow_c_stream__() test + with pytest.raises(NotImplementedError): + cuda_table.__arrow_c_stream__() + + # __dataframe__() test + with pytest.raises(NotImplementedError): + from_dataframe(cuda_table.__dataframe__()) + + # __reduce__() test + with pytest.raises(NotImplementedError): + cuda_table.__reduce__()