Skip to content

Commit

Permalink
[FEAT] Add left/right/anti/semi joins to native executor (#2743)
Browse files Browse the repository at this point in the history
Implement left/right/anti/semi joins to native executor.

- Left/Right: Build probe table on the opposite side, e.g. for left
joins build the probe table on the right side. During the probing phase,
if there is no match, add a null row.

- Anti/Semi: Build probe table on the right side. During the probing
phase, emit for anti if there is no match, vice versa for semi.

Running `DAFT_ENABLE_NATIVE_EXECUTOR=1 pytest
tests/dataframe/test_joins.py` with SMJ/broadcast + Outer joins skipped.
<img width="1038" alt="Screenshot 2024-08-29 at 9 47 16 PM"
src="https://github.com/user-attachments/assets/cb15a419-2c87-476d-bc34-0075bd558625">

---------

Co-authored-by: Colin Ho <[email protected]>
Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
3 people authored Sep 19, 2024
1 parent 7666669 commit 7ee5fda
Show file tree
Hide file tree
Showing 12 changed files with 725 additions and 161 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/daft-local-execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ daft-plan = {path = "../daft-plan", default-features = false}
daft-scan = {path = "../daft-scan", default-features = false}
daft-table = {path = "../daft-table", default-features = false}
futures = {workspace = true}
indexmap = {workspace = true}
lazy_static = {workspace = true}
log = {workspace = true}
num-format = "0.4.4"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
use std::sync::Arc;

use common_error::DaftResult;
use daft_dsl::ExprRef;
use daft_micropartition::MicroPartition;
use daft_plan::JoinType;
use daft_table::{GrowableTable, Probeable};
use tracing::{info_span, instrument};

use super::intermediate_op::{
IntermediateOperator, IntermediateOperatorResult, IntermediateOperatorState,
};
use crate::pipeline::PipelineResultType;

enum AntiSemiProbeState {
Building,
ReadyToProbe(Arc<dyn Probeable>),
}

impl AntiSemiProbeState {
fn set_table(&mut self, table: &Arc<dyn Probeable>) {
if let AntiSemiProbeState::Building = self {
*self = AntiSemiProbeState::ReadyToProbe(table.clone());
} else {
panic!("AntiSemiProbeState should only be in Building state when setting table")
}
}

fn get_probeable(&self) -> &Arc<dyn Probeable> {
if let AntiSemiProbeState::ReadyToProbe(probeable) = self {
probeable
} else {
panic!("AntiSemiProbeState should only be in ReadyToProbe state when getting probeable")
}
}
}

impl IntermediateOperatorState for AntiSemiProbeState {
fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
self
}
}

pub struct AntiSemiProbeOperator {
probe_on: Vec<ExprRef>,
join_type: JoinType,
}

impl AntiSemiProbeOperator {
pub fn new(probe_on: Vec<ExprRef>, join_type: JoinType) -> Self {
Self {
probe_on,
join_type,
}
}

fn probe_anti_semi(
&self,
input: &Arc<MicroPartition>,
state: &mut AntiSemiProbeState,
) -> DaftResult<Arc<MicroPartition>> {
let probe_set = state.get_probeable();

let _growables = info_span!("AntiSemiOperator::build_growables").entered();

let input_tables = input.get_tables()?;

let mut probe_side_growable =
GrowableTable::new(&input_tables.iter().collect::<Vec<_>>(), false, 20)?;

drop(_growables);
{
let _loop = info_span!("AntiSemiOperator::eval_and_probe").entered();
for (probe_side_table_idx, table) in input_tables.iter().enumerate() {
let join_keys = table.eval_expression_list(&self.probe_on)?;
let iter = probe_set.probe_exists(&join_keys)?;

for (probe_row_idx, matched) in iter.enumerate() {
match (self.join_type == JoinType::Semi, matched) {
(true, true) | (false, false) => {
probe_side_growable.extend(probe_side_table_idx, probe_row_idx, 1);
}
_ => {}
}
}
}
}
let probe_side_table = probe_side_growable.build()?;
Ok(Arc::new(MicroPartition::new_loaded(
probe_side_table.schema.clone(),
Arc::new(vec![probe_side_table]),
None,
)))
}
}

impl IntermediateOperator for AntiSemiProbeOperator {
#[instrument(skip_all, name = "AntiSemiOperator::execute")]
fn execute(
&self,
idx: usize,
input: &PipelineResultType,
state: Option<&mut Box<dyn IntermediateOperatorState>>,
) -> DaftResult<IntermediateOperatorResult> {
match idx {
0 => {
let state = state
.expect("AntiSemiProbeOperator should have state")
.as_any_mut()
.downcast_mut::<AntiSemiProbeState>()
.expect("AntiSemiProbeOperator state should be AntiSemiProbeState");
let (probe_table, _) = input.as_probe_table();
state.set_table(probe_table);
Ok(IntermediateOperatorResult::NeedMoreInput(None))
}
_ => {
let state = state
.expect("AntiSemiProbeOperator should have state")
.as_any_mut()
.downcast_mut::<AntiSemiProbeState>()
.expect("AntiSemiProbeOperator state should be AntiSemiProbeState");
let input = input.as_data();
let out = match self.join_type {
JoinType::Semi | JoinType::Anti => self.probe_anti_semi(input, state),
_ => unreachable!("Only Semi and Anti joins are supported"),
}?;
Ok(IntermediateOperatorResult::NeedMoreInput(Some(out)))
}
}
}

fn name(&self) -> &'static str {
"AntiSemiProbeOperator"
}

fn make_state(&self) -> Option<Box<dyn IntermediateOperatorState>> {
Some(Box::new(AntiSemiProbeState::Building))
}
}
Loading

0 comments on commit 7ee5fda

Please sign in to comment.