-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Python] Possibility of a table.drop_duplicates() function? #30950
Comments
Weston Pace / @westonpace: Are you ok with "min" or "max"? import pyarrow as pa
tab = pa.Table.from_pydict({'x': [1, 2, 2, 3], 'y': ['a', 'b', 'c', 'd']})
>>> tab.group_by("x").aggregate([("y", "max")])
pyarrow.Table
y_max: string
x: int64
----
y_max: [["a","c","d"]]
x: [[1,2,3]]
>>> tab.group_by("x").aggregate([("y", "min")])
pyarrow.Table
y_min: string
x: int64
----
y_min: [["a","b","d"]]
x: [[1,2,3]] |
Weston Pace / @westonpace: |
Lance Dacey / @ldacey: Is it possible to get the first or nth values from a table groupby? In pandas, we can do this which I think has the desired behavior even with multiple columns (as long as we sort the data first). If we can get the indices of which rows to keep, then we could use table.take() to return a new table with the latest values. df = pd.DataFrame(
{
"id": [1, 1, 1, 2, 2, 2],
"name": ["a", "a", "a", "b", "c", "c"],
"updated_at": [
"2021-01-01 00:02:19",
"2022-01-04 12:13:10",
"2022-01-06 04:10:52",
"2022-01-02 17:32:21",
"2022-01-06 01:27:14",
"2022-01-06 23:09:56",
],
}
)
df.sort_values(["id", "name", "updated_at"], ascending=[1, 1, 0]).groupby(["id", "name"]).nth(0).reset_index() |
Weston Pace / @westonpace: drop_duplicates(subset=["id"] keep="random") I think we are further away from either: drop_duplicates(subset=["id"] keep="first")
drop_duplicates(subset=["id"] keep="last") The problem is that the execution engine processes data in batches and those batches are processed in parallel. We can resequence the batches, but only at the very end of a plan. Otherwise they just get out of order again immediately after we resequence them. This ability to run parts of a plan in sequence is something we need for a number of reasons (e.g. window functions) and so it is something we will likely have at some point. |
Lance Dacey / @ldacey: I found this repository which has a method to drop duplicates that I might be able to adopt in the meantime. I would need to digest exactly what is happening down below a bit more, but I think there are some compute functions like def drop_duplicates(table, on=[], keep='first'):
# Gather columns to arr
arr = columns_to_array(table, (on if on else table.column_names))
# Groupify
dic, counts, sort_idxs, bgn_idxs = groupify_array(arr)
# Gather idxs
if keep == 'last':
idxs = (np.array(bgn_idxs) - 1)[1:].tolist() + [len(sort_idxs) - 1]
elif keep == 'first':
idxs = bgn_idxs
elif keep == 'drop':
idxs = [i for i, c in zip(bgn_idxs, counts) if c == 1]
return table.take(sort_idxs[idxs])
def groupify_array(arr):
# Input: Pyarrow/Numpy array
# Output:
# - 1. Unique values
# - 2. Sort index
# - 3. Count per unique
# - 4. Begin index per unique
dic, counts = np.unique(arr, return_counts=True)
sort_idx = np.argsort(arr)
return dic, counts, sort_idx, [0] + np.cumsum(counts)[:-1].tolist()
def combine_column(table, name):
return table.column(name).combine_chunks()
f = np.vectorize(hash)
def columns_to_array(table, columns):
columns = ([columns] if isinstance(columns, str) else list(set(columns)))
if len(columns) == 1:
return f(combine_column(table, columns[0]).to_numpy(zero_copy_only=False))
else:
values = [c.to_numpy() for c in table.select(columns).itercolumns()]
return np.array(list(map(hash, zip(*values))))
|
Lance Dacey / @ldacey: |
Jacek Pliszka / @JacekPliszka:
import numpy as np
import pyarrow.compute as pc
t1=t.append_column('i', pa.array(np.arange(len(t))))
t2 = t1.group_by(['keys', 'values']).aggregate([('i', 'min')]).column('i_min')
pc.take(t, t2) On my PC your code is 1.19s while code above is 0.15s. to_pandas.drop_duplicates was around 0.36s |
Lance Dacey / @ldacey: Can we sort the data before adding the |
Jacek Pliszka / @JacekPliszka: |
Lance Dacey / @ldacey: %%time
table = con.execute("select distinct on (forecast_group) * from scanner order by session_id, date").arrow()
CPU times: user 735 ms, sys: 45.7 ms, total: 780 ms
Wall time: 1.92 s Your suggestion: %%time
table = scanner.to_table()
t1 = table.append_column('i', pa.array(np.arange(len(table))))
t2 = t1.group_by(['forecast_group']).aggregate([('i', 'min')]).column('i_min')
table = pc.take(table, t2)
CPU times: user 872 ms, sys: 60.9 ms, total: 933 ms
Wall time: 4.6 s A bit slower than duckdb somehow, but for me it is acceptable and gives me an option to drop duplicates without requiring additional libraries, including pandas. Thanks! |
Jacek Pliszka / @JacekPliszka: |
Jacek Pliszka / @JacekPliszka: We need compute function that for given array of values returns the index of the first/last appearance. Once we have index of the first/last appearance - we can use compute.take to have the output table. Maybe even ordering function can be specified so there would be no need to sort the array a priori. |
You can run your ordering function first so that your ordering is a column in the data. Then, for the first/last kernel the ordering would just be a field ref. The rest would be a straightforward aggregate kernel I think. The "state" would be the current first/last and the value of the ordering field at that point. So yes, I think that approach should work to avoid sorting beforehand. |
Weston Pace / @westonpace: |
It would be nice even to have a basic version of this function that doesn't support |
@wjones127 where is this implemented in the R package? (or which name is it using, I don't find it based on "duplicate") BTW, one of the possible workarounds that was being discussed above, using groupby, nowadays works because the "first"/"last" ordered aggregations have been added:
|
import time
from typing import List, Literal, Tuple, Callable, Optional
from uuid import uuid4
import numpy as np
import pyarrow as pa
import pyarrow.compute as pc
def drop_duplicates(
table: pa.Table,
on: Optional[List[str]] = None,
keep: Literal['first', 'last'] = 'first'
) -> pa.Table:
"""
Remove duplicate rows from a PyArrow table based on specified columns.
This function efficiently removes duplicate rows from a PyArrow table,
keeping either the first or last occurrence of each unique combination
of values in the specified columns.
Args:
table (pa.Table): The input PyArrow table.
on (Optional[List[str]]): List of column names to consider for identifying duplicates.
If None, all columns are used.
keep (Literal['first', 'last']): Whether to keep the first or last occurrence of duplicates.
Returns:
pa.Table: A new PyArrow table with duplicates removed.
Raises:
ValueError: If 'keep' is not 'first' or 'last'.
TypeError: If 'table' is not a PyArrow Table.
Example:
>>> import pyarrow as pa
>>> data = [
... pa.array([1, 2, 2, 3]),
... pa.array(['a', 'b', 'b', 'c']),
... pa.array([10, 20, 30, 40])
... ]
>>> table = pa.Table.from_arrays(data, names=['id', 'name', 'value'])
>>> deduped = drop_duplicates(table, on=['id', 'name'], keep='first')
>>> print(deduped)
pyarrow.Table
id: int64
name: string
value: int64
----
id: [1, 2, 3]
name: ["a", "b", "c"]
value: [10, 20, 40]
"""
if not isinstance(table, pa.Table):
raise TypeError("Parameter 'table' must be a PyArrow Table")
if keep not in ['first', 'last']:
raise ValueError("Parameter 'keep' must be either 'first' or 'last'")
if not on:
on = table.column_names
# Generate a unique column name for row index
index_column = f"index_{uuid4().hex}"
index_aggregate_column = f'{index_column}_{keep}'
# Create row numbers
num_rows = table.num_rows
row_numbers = pa.array(np.arange(num_rows, dtype=np.int64))
# Append row numbers, group by specified columns, and aggregate
unique_indices = (
table.append_column(index_column, row_numbers)
.group_by(on, use_threads=False)
.aggregate([(index_column, keep)])
).column(index_aggregate_column)
return pc.take(table, unique_indices, boundscheck=False)
def drop_duplicates_filter(table, on=None, keep='first'):
if not on:
on = table.column_names
row_numbers = pa.array(np.arange(table.num_rows, dtype=np.int64))
index_column = f"index_{uuid4().hex}"
index_aggregate_column = f'{index_column}_{keep}'
table_with_index = table.append_column(index_column, row_numbers)
unique_indices = table_with_index.group_by(on, use_threads=False).aggregate([(index_column, keep)])
unique_row_numbers = unique_indices.column(index_aggregate_column)
mask = pc.is_in(row_numbers, value_set=unique_row_numbers)
return table.filter(mask)
def drop_duplicates_join(table, on=None, keep='first'):
if not on:
on = table.column_names
index_column = f"index_{uuid4().hex}"
index_aggregate_column = f'{index_column}_{keep}'
row_numbers = pa.array(np.arange(table.num_rows, dtype=np.int64))
table_with_index = table.append_column(index_column, row_numbers)
unique_indices = table_with_index.group_by(on, use_threads=False).aggregate([(index_column, keep)])
return table_with_index.join(
unique_indices,
keys=index_column,
right_keys=index_aggregate_column,
join_type='left semi',
use_threads=True
).drop(index_column) Results
Findings
See gist for full implementation. Leave a comment in the gist if you can manage to make this more efficient. |
I noticed that there is a group_by() and sort_by() function in the 7.0.0 branch. Is it possible to include a drop_duplicates() function as well?
Something like this which would return a table without the second row in the example above would be great.
I usually am reading an append-only dataset and then I need to report on latest version of each row. To drop duplicates, I am temporarily converting the append-only table to a pandas DataFrame, and then I convert it back to a table and save a separate "latest-version" dataset.
Reporter: Lance Dacey / @ldacey
Note: This issue was originally created as ARROW-15474. Please see the migration documentation for further details.
The text was updated successfully, but these errors were encountered: