diff --git a/communication/src/allocator/generic.rs b/communication/src/allocator/generic.rs index 49f014072..6d95a4ef3 100644 --- a/communication/src/allocator/generic.rs +++ b/communication/src/allocator/generic.rs @@ -93,14 +93,6 @@ impl Allocate for Generic { fn receive(&mut self) { self.receive(); } fn release(&mut self) { self.release(); } fn events(&self) -> &Rc>> { self.events() } - fn await_events(&self, _duration: Option) { - match self { - Generic::Thread(t) => t.await_events(_duration), - Generic::Process(p) => p.await_events(_duration), - Generic::ProcessBinary(pb) => pb.await_events(_duration), - Generic::ZeroCopy(z) => z.await_events(_duration), - } - } } diff --git a/communication/src/allocator/mod.rs b/communication/src/allocator/mod.rs index e5b858f69..7c48f58bc 100644 --- a/communication/src/allocator/mod.rs +++ b/communication/src/allocator/mod.rs @@ -2,7 +2,6 @@ use std::rc::Rc; use std::cell::RefCell; -use std::time::Duration; pub use self::thread::Thread; pub use self::process::Process; @@ -51,14 +50,6 @@ pub trait Allocate { /// into a performance problem. fn events(&self) -> &Rc>>; - /// Awaits communication events. - /// - /// This method may park the current thread, for at most `duration`, - /// until new events arrive. - /// The method is not guaranteed to wait for any amount of time, but - /// good implementations should use this as a hint to park the thread. - fn await_events(&self, _duration: Option) { } - /// Ensure that received messages are surfaced in each channel. /// /// This method should be called to ensure that received messages are diff --git a/communication/src/allocator/process.rs b/communication/src/allocator/process.rs index 07d793684..96519aa52 100644 --- a/communication/src/allocator/process.rs +++ b/communication/src/allocator/process.rs @@ -4,7 +4,6 @@ use std::rc::Rc; use std::cell::RefCell; use std::sync::{Arc, Mutex}; use std::any::Any; -use std::time::Duration; use std::collections::{HashMap}; use crossbeam_channel::{Sender, Receiver}; @@ -178,10 +177,6 @@ impl Allocate for Process { self.inner.events() } - fn await_events(&self, duration: Option) { - self.inner.await_events(duration); - } - fn receive(&mut self) { let mut events = self.inner.events().borrow_mut(); while let Ok(index) = self.counters_recv.try_recv() { diff --git a/communication/src/allocator/thread.rs b/communication/src/allocator/thread.rs index f46e3532b..3a5ae3a6a 100644 --- a/communication/src/allocator/thread.rs +++ b/communication/src/allocator/thread.rs @@ -2,7 +2,6 @@ use std::rc::Rc; use std::cell::RefCell; -use std::time::Duration; use std::collections::VecDeque; use crate::allocator::{Allocate, AllocateBuilder}; @@ -35,16 +34,6 @@ impl Allocate for Thread { fn events(&self) -> &Rc>> { &self.events } - fn await_events(&self, duration: Option) { - if self.events.borrow().is_empty() { - if let Some(duration) = duration { - std::thread::park_timeout(duration); - } - else { - std::thread::park(); - } - } - } } /// Thread-local counting channel push endpoint. diff --git a/communication/src/allocator/zero_copy/allocator.rs b/communication/src/allocator/zero_copy/allocator.rs index 6ef9ef647..6badd8777 100644 --- a/communication/src/allocator/zero_copy/allocator.rs +++ b/communication/src/allocator/zero_copy/allocator.rs @@ -271,7 +271,4 @@ impl Allocate for TcpAllocator { fn events(&self) -> &Rc>> { self.inner.events() } - fn await_events(&self, duration: Option) { - self.inner.await_events(duration); - } } \ No newline at end of file diff --git a/communication/src/allocator/zero_copy/allocator_process.rs b/communication/src/allocator/zero_copy/allocator_process.rs index 74056ac29..c9f71176d 100644 --- a/communication/src/allocator/zero_copy/allocator_process.rs +++ b/communication/src/allocator/zero_copy/allocator_process.rs @@ -240,14 +240,4 @@ impl Allocate for ProcessAllocator { fn events(&self) -> &Rc>> { &self.events } - fn await_events(&self, duration: Option) { - if self.events.borrow().is_empty() { - if let Some(duration) = duration { - std::thread::park_timeout(duration); - } - else { - std::thread::park(); - } - } - } } \ No newline at end of file diff --git a/timely/src/scheduling/activate.rs b/timely/src/scheduling/activate.rs index 9f9d3e54c..658d473db 100644 --- a/timely/src/scheduling/activate.rs +++ b/timely/src/scheduling/activate.rs @@ -177,7 +177,7 @@ impl Activations { /// This method should be used before putting a worker thread to sleep, as it /// indicates the amount of time before the thread should be unparked for the /// next scheduled activation. - pub fn empty_for(&self) -> Option { + fn empty_for(&self) -> Option { if !self.bounds.is_empty() || self.timer.is_none() { Some(Duration::new(0,0)) } @@ -189,6 +189,34 @@ impl Activations { }) } } + + /// Indicates that there is nothing to do for `timeout`, and that the scheduler + /// can allow the thread to sleep until then. + /// + /// The method does not *need* to park the thread, and indeed it may elect to + /// unpark earlier if there are deferred activations. + pub fn park_timeout(&self, timeout: Option) { + let empty_for = self.empty_for(); + let timeout = match (timeout, empty_for) { + (Some(x), Some(y)) => Some(std::cmp::min(x,y)), + (x, y) => x.or(y), + }; + + if let Some(timeout) = timeout { + std::thread::park_timeout(timeout); + } + else { + std::thread::park(); + } + } + + /// True iff there are no immediate activations. + /// + /// Used by others to guard work done in anticipation of potentially parking. + /// An alternate method name could be `would_park`. + pub fn is_idle(&self) -> bool { + self.bounds.is_empty() || self.timer.is_none() + } } /// A thread-safe handle to an `Activations`. diff --git a/timely/src/worker.rs b/timely/src/worker.rs index ee1f5482a..7bbfdbfa9 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -333,7 +333,7 @@ impl Worker { /// worker.step_or_park(Some(Duration::from_secs(1))); /// }); /// ``` - pub fn step_or_park(&mut self, duration: Option) -> bool { + pub fn step_or_park(&mut self, timeout: Option) -> bool { { // Process channel events. Activate responders. let mut allocator = self.allocator.borrow_mut(); @@ -362,28 +362,23 @@ impl Worker { .borrow_mut() .advance(); - // Consider parking only if we have no pending events, some dataflows, and a non-zero duration. - let empty_for = self.activations.borrow().empty_for(); - // Determine the minimum park duration, where `None` are an absence of a constraint. - let delay = match (duration, empty_for) { - (Some(x), Some(y)) => Some(std::cmp::min(x,y)), - (x, y) => x.or(y), - }; + if self.activations.borrow().is_idle() { + // If the timeout is zero, don't bother trying to park. + // More generally, we could put some threshold in here. + if timeout != Some(Duration::new(0, 0)) { + // Log parking and flush log. + if let Some(l) = self.logging().as_mut() { + l.log(crate::logging::ParkEvent::park(timeout)); + l.flush(); + } - if delay != Some(Duration::new(0,0)) { + // We have just drained `allocator.events()` up above; + // otherwise we should first check it for emptiness. + self.activations.borrow().park_timeout(timeout); - // Log parking and flush log. - if let Some(l) = self.logging().as_mut() { - l.log(crate::logging::ParkEvent::park(delay)); - l.flush(); + // Log return from unpark. + self.logging().as_mut().map(|l| l.log(crate::logging::ParkEvent::unpark())); } - - self.allocator - .borrow() - .await_events(delay); - - // Log return from unpark. - self.logging().as_mut().map(|l| l.log(crate::logging::ParkEvent::unpark())); } else { // Schedule active dataflows.