Skip to content

Commit

Permalink
Log frontiers in progress tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
teskje committed Nov 27, 2023
1 parent 64be92b commit 69285e3
Showing 1 changed file with 124 additions and 28 deletions.
152 changes: 124 additions & 28 deletions timely/src/progress/reachability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ impl<T:Timestamp> Tracker<T> {
.collect::<Vec<_>>();

if !target_changes.is_empty() {
logger.log_target_updates(Box::new(target_changes));
logger.log_target_pointstamp_updates(Box::new(target_changes));
}

let source_changes =
Expand All @@ -587,7 +587,7 @@ impl<T:Timestamp> Tracker<T> {
.collect::<Vec<_>>();

if !source_changes.is_empty() {
logger.log_source_updates(Box::new(source_changes));
logger.log_source_pointstamp_updates(Box::new(source_changes));
}
}

Expand Down Expand Up @@ -641,6 +641,9 @@ impl<T:Timestamp> Tracker<T> {
// The intent is that that by moving forward in layers through `time`, we
// will discover zero-change times when we first visit them, as no further
// changes can be made to them once we complete them.
let mut target_frontier_changes = Vec::new();
let mut source_frontier_changes = Vec::new();

while let Some(Reverse((time, location, mut diff))) = self.worklist.pop() {

// Drain and accumulate all updates that have the same time and location.
Expand Down Expand Up @@ -672,8 +675,10 @@ impl<T:Timestamp> Tracker<T> {
}
}
}
self.pushed_changes.update((location, time), diff);
self.pushed_changes.update((location, time.clone()), diff);
target_frontier_changes.push((location.node, port_index, time, diff));
}

}
// Update to an operator output.
// Propagate any changes forward along outgoing edges.
Expand All @@ -693,12 +698,19 @@ impl<T:Timestamp> Tracker<T> {
diff,
)));
}
self.pushed_changes.update((location, time), diff);
self.pushed_changes.update((location, time.clone()), diff);
source_frontier_changes.push((location.node, port_index, time, diff));
}
},
};
}
}

// If logging is enabled, log frontier updates.
if let Some(logger) = &mut self.logger {
logger.log_target_frontier_updates(Box::new(target_frontier_changes));
logger.log_source_frontier_updates(Box::new(source_frontier_changes));
}
}

/// Implications of maintained capabilities projected to each output.
Expand Down Expand Up @@ -846,19 +858,39 @@ pub mod logging {
Self { path, logger }
}

/// Log source update events with additional identifying information.
pub fn log_source_updates(&mut self, updates: Box<dyn ProgressEventTimestampVec>) {
/// Log source pointstamp update events with additional identifying information.
pub fn log_source_pointstamp_updates(&mut self, updates: Box<dyn ProgressEventTimestampVec>) {
self.logger.log({
SourcePointstampUpdate {
tracker_id: self.path.clone(),
updates,
}
})
}
/// Log target pointstamp update events with additional identifying information.
pub fn log_target_pointstamp_updates(&mut self, updates: Box<dyn ProgressEventTimestampVec>) {
self.logger.log({
TargetPointstampUpdate {
tracker_id: self.path.clone(),
updates,
}
})
}

/// Log source frontier update events with additional identifying information.
pub fn log_source_frontier_updates(&mut self, updates: Box<dyn ProgressEventTimestampVec>) {
self.logger.log({
SourceUpdate {
SourceFrontierUpdate {
tracker_id: self.path.clone(),
updates,
}
})
}
/// Log target update events with additional identifying information.
pub fn log_target_updates(&mut self, updates: Box<dyn ProgressEventTimestampVec>) {

/// Log target frontier update events with additional identifying information.
pub fn log_target_frontier_updates(&mut self, updates: Box<dyn ProgressEventTimestampVec>) {
self.logger.log({
TargetUpdate {
TargetFrontierUpdate {
tracker_id: self.path.clone(),
updates,
}
Expand All @@ -868,34 +900,70 @@ pub mod logging {

/// Events that the tracker may record.
pub enum TrackerEvent {
/// Updates made at a source of data.
SourceUpdate(SourceUpdate),
/// Updates made at a target of data.
TargetUpdate(TargetUpdate),
/// Pointstamp updates made at a source of data.
SourcePointstampUpdate(SourcePointstampUpdate),
/// Pointstamp updates made at a target of data.
TargetPointstampUpdate(TargetPointstampUpdate),
/// Frontier updates made at a source of data.
SourceFrontierUpdate(SourceFrontierUpdate),
/// Frontier updates made at a target of data.
TargetFrontierUpdate(TargetFrontierUpdate),
}

/// An update made at a source of data.
pub struct SourceUpdate {
/// A pointstamp update made at a source of data.
pub struct SourcePointstampUpdate {
/// An identifier for the tracker.
pub tracker_id: Vec<usize>,
/// Updates themselves, as `(node, port, time, diff)`.
pub updates: Box<dyn ProgressEventTimestampVec>,
}

/// An update made at a target of data.
pub struct TargetUpdate {
/// A pointstamp update made at a target of data.
pub struct TargetPointstampUpdate {
/// An identifier for the tracker.
pub tracker_id: Vec<usize>,
/// Updates themselves, as `(node, port, time, diff)`.
pub updates: Box<dyn ProgressEventTimestampVec>,
}

impl From<SourceUpdate> for TrackerEvent {
fn from(v: SourceUpdate) -> TrackerEvent { TrackerEvent::SourceUpdate(v) }
/// A frontier update at a source of data.
pub struct SourceFrontierUpdate {
/// An identifier for the tracker.
pub tracker_id: Vec<usize>,
/// Updates themselves, as `(node, port, time, diff)`.
pub updates: Box<dyn ProgressEventTimestampVec>,
}

impl From<TargetUpdate> for TrackerEvent {
fn from(v: TargetUpdate) -> TrackerEvent { TrackerEvent::TargetUpdate(v) }
/// A frontier update at a target of data.
pub struct TargetFrontierUpdate {
/// An identifier for the tracker.
pub tracker_id: Vec<usize>,
/// Updates themselves, as `(node, port, time, diff)`.
pub updates: Box<dyn ProgressEventTimestampVec>,
}

impl From<SourcePointstampUpdate> for TrackerEvent {
fn from(v: SourcePointstampUpdate) -> Self {
Self::SourcePointstampUpdate(v)
}
}

impl From<TargetPointstampUpdate> for TrackerEvent {
fn from(v: TargetPointstampUpdate) -> Self {
Self::TargetPointstampUpdate(v)
}
}

impl From<SourceFrontierUpdate> for TrackerEvent {
fn from(v: SourceFrontierUpdate) -> Self {
Self::SourceFrontierUpdate(v)
}
}

impl From<TargetFrontierUpdate> for TrackerEvent {
fn from(v: TargetFrontierUpdate) -> Self {
Self::TargetFrontierUpdate(v)
}
}
}

Expand All @@ -914,7 +982,7 @@ impl<T: Timestamp> Drop for Tracker<T> {

// Retract pending data that `propagate_all` would normally log.
for (index, per_operator) in self.per_operator.iter_mut().enumerate() {
let target_changes = per_operator.targets
let target_pointstamp_changes = per_operator.targets
.iter_mut()
.enumerate()
.flat_map(|(port, target)| {
Expand All @@ -923,11 +991,11 @@ impl<T: Timestamp> Drop for Tracker<T> {
.map(move |(time, diff)| (index, port, time.clone(), -diff))
})
.collect::<Vec<_>>();
if !target_changes.is_empty() {
logger.log_target_updates(Box::new(target_changes));
if !target_pointstamp_changes.is_empty() {
logger.log_target_pointstamp_updates(Box::new(target_pointstamp_changes));
}

let source_changes = per_operator.sources
let source_pointstamp_changes = per_operator.sources
.iter_mut()
.enumerate()
.flat_map(|(port, source)| {
Expand All @@ -936,8 +1004,36 @@ impl<T: Timestamp> Drop for Tracker<T> {
.map(move |(time, diff)| (index, port, time.clone(), -diff))
})
.collect::<Vec<_>>();
if !source_changes.is_empty() {
logger.log_source_updates(Box::new(source_changes));
if !source_pointstamp_changes.is_empty() {
logger.log_source_pointstamp_updates(Box::new(source_pointstamp_changes));
}

let target_frontier_changes = per_operator.targets
.iter_mut()
.enumerate()
.flat_map(|(port, target)| {
let frontier = target.implications.frontier().to_owned();
frontier
.into_iter()
.map(move |time| (index, port, time, -1))
})
.collect::<Vec<_>>();
if !target_frontier_changes.is_empty() {
logger.log_target_frontier_updates(Box::new(target_frontier_changes));
}

let source_frontier_changes = per_operator.sources
.iter_mut()
.enumerate()
.flat_map(|(port, source)| {
let frontier = source.implications.frontier().to_owned();
frontier
.into_iter()
.map(move |time| (index, port, time, -1))
})
.collect::<Vec<_>>();
if !source_frontier_changes.is_empty() {
logger.log_source_frontier_updates(Box::new(source_frontier_changes));
}
}
}
Expand Down

0 comments on commit 69285e3

Please sign in to comment.