Skip to content

Commit

Permalink
Log size/capacity/allocations from the merge batchers (TimelyDataflow…
Browse files Browse the repository at this point in the history
…#434)

* Log size/capacity/allocations from columnar merge batcher

This logs size/capacity/allocation changes from the columnar merge batcher.

Signed-off-by: Moritz Hoffmann <[email protected]>

* Merge batcher announce records

Signed-off-by: Moritz Hoffmann <[email protected]>

---------

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru authored Dec 8, 2023
1 parent 47ba492 commit 0871fbd
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 57 deletions.
19 changes: 19 additions & 0 deletions src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub enum DifferentialEvent {
MergeShortfall(MergeShortfall),
/// Trace sharing event.
TraceShare(TraceShare),
/// Batcher size event
Batcher(BatcherEvent),
}

/// Either the start or end of a merge event.
Expand All @@ -45,6 +47,23 @@ pub struct BatchEvent {
impl From<BatchEvent> for DifferentialEvent { fn from(e: BatchEvent) -> Self { DifferentialEvent::Batch(e) } }


/// Either the start or end of a merge event.
#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
pub struct BatcherEvent {
/// Operator identifier.
pub operator: usize,
/// Change in records.
pub records_diff: isize,
/// Change in used size.
pub size_diff: isize,
/// Change in capacity.
pub capacity_diff: isize,
/// Change in number of allocations.
pub allocations_diff: isize,
}

impl From<BatcherEvent> for DifferentialEvent { fn from(e: BatcherEvent) -> Self { DifferentialEvent::Batcher(e) } }

/// Either the start or end of a merge event.
#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
pub struct DropEvent {
Expand Down
2 changes: 1 addition & 1 deletion src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ where
};

// Where we will deposit received updates, and from which we extract batches.
let mut batcher = Tr::Batcher::new();
let mut batcher = Tr::Batcher::new(logger.clone(), info.global_id);

// Capabilities for the lower envelope of updates in `batcher`.
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
Expand Down
105 changes: 77 additions & 28 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
use std::collections::VecDeque;

use timely::communication::message::RefOrMut;
use timely::logging::WorkerIdentifier;
use timely::logging_core::Logger;
use timely::progress::{frontier::Antichain, Timestamp};

use crate::difference::Semigroup;

use crate::logging::{BatcherEvent, DifferentialEvent};
use crate::trace::{Batcher, Builder};

/// Creates batches from unordered tuples.
Expand All @@ -26,11 +28,11 @@ where
type Item = ((K,V),T,D);
type Time = T;

fn new() -> Self {
fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self {
MergeBatcher {
sorter: MergeSorter::new(),
sorter: MergeSorter::new(logger, operator_id),
frontier: Antichain::new(),
lower: Antichain::from_elem(<T as timely::progress::Timestamp>::minimum()),
lower: Antichain::from_elem(T::minimum()),
}
}

Expand Down Expand Up @@ -132,20 +134,23 @@ where
self.sorter.push(&mut buffer);
}

let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(<T as timely::progress::Timestamp>::minimum()));
let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(T::minimum()));
self.lower = upper;
seal
}

// the frontier of elements remaining after the most recent call to `self.seal`.
/// The frontier of elements remaining after the most recent call to `self.seal`.
fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<T> {
self.frontier.borrow()
}
}

struct MergeSorter<D, T, R> {
queue: Vec<Vec<Vec<(D, T, R)>>>, // each power-of-two length list of allocations.
/// each power-of-two length list of allocations. Do not push/pop directly but use the corresponding functions.
queue: Vec<Vec<Vec<(D, T, R)>>>,
stash: Vec<Vec<(D, T, R)>>,
logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>,
operator_id: usize,
}

impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {
Expand All @@ -164,21 +169,20 @@ impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {
}

#[inline]
pub fn new() -> Self { MergeSorter { queue: Vec::new(), stash: Vec::new() } }
fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self {
Self {
logger,
operator_id,
queue: Vec::new(),
stash: Vec::new(),
}
}

#[inline]
pub fn empty(&mut self) -> Vec<(D, T, R)> {
self.stash.pop().unwrap_or_else(|| Vec::with_capacity(Self::buffer_size()))
}

#[inline(never)]
pub fn _sort(&mut self, list: &mut Vec<Vec<(D, T, R)>>) {
for mut batch in list.drain(..) {
self.push(&mut batch);
}
self.finish_into(list);
}

#[inline]
pub fn push(&mut self, batch: &mut Vec<(D, T, R)>) {
// TODO: Reason about possible unbounded stash growth. How to / should we return them?
Expand All @@ -192,12 +196,13 @@ impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {

if !batch.is_empty() {
crate::consolidation::consolidate_updates(&mut batch);
self.queue.push(vec![batch]);
self.account([batch.len()], 1);
self.queue_push(vec![batch]);
while self.queue.len() > 1 && (self.queue[self.queue.len()-1].len() >= self.queue[self.queue.len()-2].len() / 2) {
let list1 = self.queue.pop().unwrap();
let list2 = self.queue.pop().unwrap();
let list1 = self.queue_pop().unwrap();
let list2 = self.queue_pop().unwrap();
let merged = self.merge_by(list1, list2);
self.queue.push(merged);
self.queue_push(merged);
}
}
}
Expand All @@ -206,31 +211,32 @@ impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {
// to break it down to be so.
pub fn push_list(&mut self, list: Vec<Vec<(D, T, R)>>) {
while self.queue.len() > 1 && self.queue[self.queue.len()-1].len() < list.len() {
let list1 = self.queue.pop().unwrap();
let list2 = self.queue.pop().unwrap();
let list1 = self.queue_pop().unwrap();
let list2 = self.queue_pop().unwrap();
let merged = self.merge_by(list1, list2);
self.queue.push(merged);
self.queue_push(merged);
}
self.queue.push(list);
self.queue_push(list);
}

#[inline(never)]
pub fn finish_into(&mut self, target: &mut Vec<Vec<(D, T, R)>>) {
while self.queue.len() > 1 {
let list1 = self.queue.pop().unwrap();
let list2 = self.queue.pop().unwrap();
let list1 = self.queue_pop().unwrap();
let list2 = self.queue_pop().unwrap();
let merged = self.merge_by(list1, list2);
self.queue.push(merged);
self.queue_push(merged);
}

if let Some(mut last) = self.queue.pop() {
if let Some(mut last) = self.queue_pop() {
::std::mem::swap(&mut last, target);
}
}

// merges two sorted input lists into one sorted output list.
#[inline(never)]
fn merge_by(&mut self, list1: Vec<Vec<(D, T, R)>>, list2: Vec<Vec<(D, T, R)>>) -> Vec<Vec<(D, T, R)>> {
self.account(list1.iter().chain(list2.iter()).map(Vec::len), -1);

use std::cmp::Ordering;

Expand Down Expand Up @@ -305,3 +311,46 @@ impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {
output
}
}

impl<D, T, R> MergeSorter<D, T, R> {
/// Pop a batch from `self.queue` and account size changes.
#[inline]
fn queue_pop(&mut self) -> Option<Vec<Vec<(D, T, R)>>> {
let batch = self.queue.pop();
self.account(batch.iter().flatten().map(Vec::len), -1);
batch
}

/// Push a batch to `self.queue` and account size changes.
#[inline]
fn queue_push(&mut self, batch: Vec<Vec<(D, T, R)>>) {
self.account(batch.iter().map(Vec::len), 1);
self.queue.push(batch);
}

/// Account size changes. Only performs work if a logger exists.
///
/// Calculate the size based on the [`TimelyStack`]s passed along, with each attribute
/// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff.
fn account<I: IntoIterator<Item=usize>>(&self, items: I, diff: isize) {
if let Some(logger) = &self.logger {
let mut records= 0isize;
for len in items {
records = records.saturating_add_unsigned(len);
}
logger.log(BatcherEvent {
operator: self.operator_id,
records_diff: records * diff,
size_diff: 0,
capacity_diff: 0,
allocations_diff: 0,
})
}
}
}

impl<D, T, R> Drop for MergeSorter<D, T, R> {
fn drop(&mut self) {
while self.queue_pop().is_some() { }
}
}
Loading

0 comments on commit 0871fbd

Please sign in to comment.