Skip to content

Commit

Permalink
Consolidate without a trace
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Dec 9, 2023
1 parent 0871fbd commit 869a3eb
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 13 deletions.
4 changes: 2 additions & 2 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ where
}

// Extract updates not in advance of `upper`.
let batch = batcher.seal::<Tr::Builder>(upper.clone());
let batch = batcher.seal::<Tr::Builder, _>(upper.borrow(), Tr::Builder::with_capacity);

writer.insert(batch.clone(), Some(capability.time().clone()));

Expand Down Expand Up @@ -723,7 +723,7 @@ where
}
else {
// Announce progress updates, even without data.
let _batch = batcher.seal::<Tr::Builder>(input.frontier().frontier().to_owned());
let _batch = batcher.seal::<Tr::Builder, _>(input.frontier().frontier(), Tr::Builder::with_capacity);
writer.seal(input.frontier().frontier().to_owned());
}

Expand Down
154 changes: 154 additions & 0 deletions src/operators/consolidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,157 @@ where
.as_collection()
}
}

pub mod neu {
//! Consolidate without building batches.

use timely::PartialOrder;
use timely::dataflow::Scope;
use timely::dataflow::channels::pact::Exchange;
use timely::dataflow::channels::pushers::buffer::Session;
use timely::dataflow::channels::pushers::{Counter, Tee};
use timely::dataflow::operators::{Capability, Operator};
use timely::progress::{Antichain, Timestamp};

use crate::collection::AsCollection;
use crate::difference::Semigroup;
use crate::lattice::Lattice;
use crate::trace::{Batcher, Builder};
use crate::{Collection, Data, ExchangeData, Hashable};

impl<G, D, R> Collection<G, D, R>
where
G: Scope,
G::Timestamp: Data + Lattice,
D: ExchangeData + Hashable,
R: Semigroup + ExchangeData,
{
/// Aggregates the weights of equal records into at most one record.
///
/// This method uses the type `D`'s `hashed()` method to partition the data. The data are
/// accumulated in place, each held back until their timestamp has completed.
///
/// # Examples
///
/// ```
/// use differential_dataflow::input::Input;
/// use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
///
/// ::timely::example(|scope| {
///
/// let x = scope.new_collection_from(1 .. 10u32).1;
///
/// x.negate()
/// .concat(&x)
/// .consolidate_named_neu::<MergeBatcher<_, _, _, _>>("Consolidate inputs") // <-- ensures cancellation occurs
/// .assert_empty();
/// });
/// ```
pub fn consolidate_named_neu<B>(&self, name: &str) -> Self
where
B: Batcher<Item=((D, ()), G::Timestamp, R), Time=G::Timestamp> + 'static,
{
let exchange = Exchange::new(move |update: &((D, ()), G::Timestamp, R)| (update.0).0.hashed().into());
self.map(|k| (k, ())).inner
.unary_frontier(exchange, name, |_cap, info| {

// Acquire a logger for arrange events.
let logger = {
let scope = self.scope();
let register = scope.log_register();
register.get::<crate::logging::DifferentialEvent>("differential/arrange")
};

let mut batcher = B::new(logger, info.global_id);
// Capabilities for the lower envelope of updates in `batcher`.
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());


move |input, output| {
input.for_each(|cap, data| {
capabilities.insert(cap.retain());
batcher.push_batch(data);
});

if prev_frontier.borrow() != input.frontier().frontier() {
if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) {
let mut upper = Antichain::new(); // re-used allocation for sealing batches.

// For each capability not in advance of the input frontier ...
for (index, capability) in capabilities.elements().iter().enumerate() {
if !input.frontier().less_equal(capability.time()) {

// Assemble the upper bound on times we can commit with this capabilities.
// We must respect the input frontier, and *subsequent* capabilities, as
// we are pretending to retire the capability changes one by one.
upper.clear();
for time in input.frontier().frontier().iter() {
upper.insert(time.clone());
}
for other_capability in &capabilities.elements()[(index + 1)..] {
upper.insert(other_capability.time().clone());
}

// send the batch to downstream consumers, empty or not.
let session = output.session(&capabilities.elements()[index]);
// Extract updates not in advance of `upper`.
let builder = ConsolidateBuilder(session);
let () = batcher.seal(upper.borrow(), |_, _, _| builder);
}
}

// Having extracted and sent batches between each capability and the input frontier,
// we should downgrade all capabilities to match the batcher's lower update frontier.
// This may involve discarding capabilities, which is fine as any new updates arrive
// in messages with new capabilities.

let mut new_capabilities = Antichain::new();
for time in batcher.frontier().iter() {
if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) {
new_capabilities.insert(capability.delayed(time));
} else {
panic!("failed to find capability");
}
}

capabilities = new_capabilities;
}

prev_frontier.clear();
prev_frontier.extend(input.frontier().frontier().iter().cloned());
}
}
})
.as_collection()
}
}

struct ConsolidateBuilder<'a, D: Data, T: Timestamp, R: Data>(Session<'a, T, Vec<(D, T, R)>, Counter<T, (D, T, R), Tee<T, (D, T, R)>>>);

impl<'a, D: Data, T: Timestamp, R: Data> Builder for ConsolidateBuilder<'a, D, T, R> {
type Item = ((D, ()), T, R);
type Time = T;
type Output = ();

fn new() -> Self {
unimplemented!()
}

fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
unimplemented!()
}

fn push(&mut self, element: Self::Item) {
self.0.give((element.0.0, element.1, element.2));
}

fn copy(&mut self, element: &Self::Item) {
self.0.give((element.0.0.clone(), element.1.clone(), element.2.clone()));
}

fn done(self, _lower: Antichain<Self::Time>, _upper: Antichain<Self::Time>, _since: Antichain<Self::Time>) -> Self::Output {
()
}
}
}
9 changes: 5 additions & 4 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use timely::communication::message::RefOrMut;
use timely::logging::WorkerIdentifier;
use timely::logging_core::Logger;
use timely::progress::{frontier::Antichain, Timestamp};
use timely::progress::frontier::AntichainRef;

use crate::difference::Semigroup;
use crate::logging::{BatcherEvent, DifferentialEvent};
Expand Down Expand Up @@ -58,7 +59,7 @@ where
// which we call `lower`, by assumption that after sealing a batcher we receive no more
// updates with times not greater or equal to `upper`.
#[inline(never)]
fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<T>) -> B::Output {
fn seal<B: Builder<Item=Self::Item, Time=Self::Time>, C: FnOnce(usize, usize, usize) -> B>(&mut self, upper: AntichainRef<T>, constructor: C) -> B::Output {

let mut merged = Vec::new();
self.sorter.finish_into(&mut merged);
Expand Down Expand Up @@ -87,7 +88,7 @@ where
}
}
}
B::with_capacity(keys, vals, upds)
constructor(keys, vals, upds)
};

let mut kept = Vec::new();
Expand Down Expand Up @@ -134,8 +135,8 @@ where
self.sorter.push(&mut buffer);
}

let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(T::minimum()));
self.lower = upper;
let seal = builder.done(self.lower.clone(), upper.to_owned(), Antichain::from_elem(T::minimum()));
self.lower = upper.to_owned();
seal
}

Expand Down
9 changes: 5 additions & 4 deletions src/trace/implementations/merge_batcher_col.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use timely::container::columnation::{Columnation, TimelyStack};
use timely::logging::WorkerIdentifier;
use timely::logging_core::Logger;
use timely::progress::{frontier::Antichain, Timestamp};
use timely::progress::frontier::AntichainRef;

use crate::difference::Semigroup;
use crate::logging::{BatcherEvent, DifferentialEvent};
Expand Down Expand Up @@ -62,7 +63,7 @@ where
// which we call `lower`, by assumption that after sealing a batcher we receive no more
// updates with times not greater or equal to `upper`.
#[inline]
fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<T>) -> B::Output {
fn seal<B: Builder<Item=Self::Item, Time=Self::Time>, C: FnOnce(usize, usize, usize) -> B>(&mut self, upper: AntichainRef<T>, constructor: C) -> B::Output {

let mut merged = Default::default();
self.sorter.finish_into(&mut merged);
Expand Down Expand Up @@ -91,7 +92,7 @@ where
}
}
}
B::with_capacity(keys, vals, upds)
constructor(keys, vals, upds)
};

let mut kept = Vec::new();
Expand Down Expand Up @@ -132,8 +133,8 @@ where
// Drain buffers (fast reclamation).
self.sorter.clear_stash();

let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(T::minimum()));
self.lower = upper;
let seal = builder.done(self.lower.clone(), upper.to_owned(), Antichain::from_elem(T::minimum()));
self.lower = upper.to_owned();
seal
}

Expand Down
2 changes: 1 addition & 1 deletion src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ pub trait Batcher {
/// Adds an unordered batch of elements to the batcher.
fn push_batch(&mut self, batch: RefOrMut<Vec<Self::Item>>);
/// Returns all updates not greater or equal to an element of `upper`.
fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<Self::Time>) -> B::Output;
fn seal<B: Builder<Item=Self::Item, Time=Self::Time>, C: FnOnce(usize, usize, usize) -> B>(&mut self, upper: AntichainRef<Self::Time>, constructor: C) -> B::Output;
/// Returns the lower envelope of contained update times.
fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<Self::Time>;
}
Expand Down
4 changes: 2 additions & 2 deletions tests/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use timely::dataflow::operators::generic::OperatorInfo;
use timely::progress::{Antichain, frontier::AntichainRef};

use differential_dataflow::trace::implementations::ValSpine;
use differential_dataflow::trace::{Trace, TraceReader, Batcher};
use differential_dataflow::trace::{Builder, Trace, TraceReader, Batcher};
use differential_dataflow::trace::cursor::Cursor;

type IntegerTrace = ValSpine<u64, u64, usize, i64>;
Expand All @@ -22,7 +22,7 @@ fn get_trace() -> ValSpine<u64, u64, usize, i64> {
]));

let batch_ts = &[1, 2, 3];
let batches = batch_ts.iter().map(move |i| batcher.seal::<IntegerBuilder>(Antichain::from_elem(*i)));
let batches = batch_ts.iter().map(move |i| batcher.seal::<IntegerBuilder, _>(Antichain::from_elem(*i).borrow(), IntegerBuilder::with_capacity));
for b in batches {
trace.insert(b);
}
Expand Down

0 comments on commit 869a3eb

Please sign in to comment.