Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Outer joins for native executor #2860

Open
wants to merge 14 commits into
base: main
Choose a base branch
from

Conversation

colin-ho
Copy link
Contributor

@colin-ho colin-ho commented Sep 19, 2024

Implement outer joins for Swordfish.

(Yes, this PR is a little big. But:

  1. at least tests run in CI now, so you don't need to just take my word for it now.
  2. A lot of the diff is because I moved left/right joins to be together with the outer join operator. Therefore the HashJoinProbe operator is now just InnerHashJoinProbeOperator)

Outer join probes (and left/right now) are implemented as a Streaming Sink.

  • During the execute phase of the streaming sink, probing is done concurrently via workers (this is the same implementation as all the other join types). The only difference is that during probing, workers will save the indices on the left side that have matches (using a mutable bitmap).
  • During the finalize phase, we merge together all the bitmaps across the concurrent workers (via a bitwise OR) to get a global view of all the indices that had matches. Then, we take all the indices that didn't get a match and return them (with nulls for the right side). This is the same logic we currently use for the python runner.
  • Why is left/right with outer joins now? In the future, we may want to choose the build side for left/right/outer joins based on cardinality. This means that we may need the used_indices bitmaps for left/right joins as well.

Note: I had to make Streaming Sink concurrency-aware to allow this. The changes in particular are:

  • Streaming Sinks can specify max concurrency, currently only LIMIT will have this set to 1.
  • execute accepts some mut state and finalize will consolidate all of the state, i.e. Vec<Box<dyn State>>.
  • In order to make sure that all the workers are done, they are spawned on a Worker Set, and return their state when done. This ensures that the finalize method doesn't get called before the workers are done with the executes.

@github-actions github-actions bot added the enhancement New feature or request label Sep 19, 2024
Copy link

codspeed-hq bot commented Sep 19, 2024

CodSpeed Performance Report

Merging #2860 will not alter performance

Comparing colin/swordfish-outer-join (09c785c) with main (73ff3f3)

Summary

✅ 17 untouched benchmarks

@colin-ho colin-ho marked this pull request as ready for review September 19, 2024 21:16
Copy link

codecov bot commented Sep 19, 2024

Codecov Report

Attention: Patch coverage is 98.69403% with 7 lines in your changes missing coverage. Please review.

Project coverage is 78.21%. Comparing base (73ff3f3) to head (09c785c).

Files with missing lines Patch % Lines
...local-execution/src/sinks/outer_hash_join_probe.rs 98.50% 4 Missing ⚠️
...tion/src/intermediate_ops/inner_hash_join_probe.rs 97.16% 3 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #2860      +/-   ##
==========================================
+ Coverage   78.14%   78.21%   +0.06%     
==========================================
  Files         610      611       +1     
  Lines       72146    72345     +199     
==========================================
+ Hits        56381    56584     +203     
+ Misses      15765    15761       -4     
Files with missing lines Coverage Δ
.../src/intermediate_ops/anti_semi_hash_join_probe.rs 96.61% <100.00%> (+0.24%) ⬆️
src/daft-local-execution/src/lib.rs 90.47% <100.00%> (+0.73%) ⬆️
src/daft-local-execution/src/pipeline.rs 92.91% <100.00%> (+1.39%) ⬆️
src/daft-local-execution/src/runtime_stats.rs 55.67% <100.00%> (ø)
.../daft-local-execution/src/sinks/hash_join_build.rs 95.00% <100.00%> (-0.24%) ⬇️
src/daft-local-execution/src/sinks/limit.rs 100.00% <100.00%> (ø)
...c/daft-local-execution/src/sinks/streaming_sink.rs 80.15% <100.00%> (+14.27%) ⬆️
src/daft-table/src/lib.rs 89.51% <ø> (-0.33%) ⬇️
src/daft-table/src/probeable/mod.rs 100.00% <100.00%> (ø)
...tion/src/intermediate_ops/inner_hash_join_probe.rs 97.16% <97.16%> (ø)
... and 1 more

... and 24 files with indirect coverage changes

src/arrow2/src/bitmap/mutable.rs Outdated Show resolved Hide resolved
src/arrow2/src/bitmap/mutable.rs Outdated Show resolved Hide resolved
src/daft-local-execution/Cargo.toml Outdated Show resolved Hide resolved
input: &Arc<MicroPartition>,
state: &mut InnerHashJoinProbeState,
) -> DaftResult<Arc<MicroPartition>> {
let (probe_table, tables) = state.get_probeable_and_table();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be worth making a struct type for

struct ProbeState {
  probe_table
  tables
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense! Implemented it

let mut build_side_growable =
GrowableTable::new(&tables.iter().collect::<Vec<_>>(), true, 20)?;

for (table_idx, row_idx) in merged_bitmap.get_unused_indices() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be much more performant using a BitmapIter

pub struct BitmapIter<'a> {

Which will compress the adjacent valid bits so we can reduce the calls to extend.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also convert the bitmap into a BooleanArray and use mask_filter https://github.com/Eventual-Inc/Daft/blob/b1ea3b9749e01512f48dfd45f9899a329fc9799f/src/daft-table/src/lib.rs#L321 instead of iterating

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the BitmapIter returns an iterator over the individual bits? I just checked and there is also SlicesIterator: https://github.com/Eventual-Inc/Daft/blob/b1ea3b9749e01512f48dfd45f9899a329fc9799f/src/arrow2/src/bitmap/utils/slice_iterator.rs which is a Iterator over a bitmap that returns slices of set regions, did you mean this one?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, yup thats the one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I prefer the BooleanArray as a mask_filter method more, it's a lot cleaner. Went with that in the latest commit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants