Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make std::time::Instant optional #577

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions communication/src/allocator/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,6 @@ impl Allocate for Generic {
fn receive(&mut self) { self.receive(); }
fn release(&mut self) { self.release(); }
fn events(&self) -> &Rc<RefCell<Vec<usize>>> { self.events() }
fn await_events(&self, _duration: Option<std::time::Duration>) {
match self {
Generic::Thread(t) => t.await_events(_duration),
Generic::Process(p) => p.await_events(_duration),
Generic::ProcessBinary(pb) => pb.await_events(_duration),
Generic::ZeroCopy(z) => z.await_events(_duration),
}
}
}


Expand Down
9 changes: 0 additions & 9 deletions communication/src/allocator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

use std::rc::Rc;
use std::cell::RefCell;
use std::time::Duration;

pub use self::thread::Thread;
pub use self::process::Process;
Expand Down Expand Up @@ -51,14 +50,6 @@ pub trait Allocate {
/// into a performance problem.
fn events(&self) -> &Rc<RefCell<Vec<usize>>>;

/// Awaits communication events.
///
/// This method may park the current thread, for at most `duration`,
/// until new events arrive.
/// The method is not guaranteed to wait for any amount of time, but
/// good implementations should use this as a hint to park the thread.
fn await_events(&self, _duration: Option<Duration>) { }

/// Ensure that received messages are surfaced in each channel.
///
/// This method should be called to ensure that received messages are
Expand Down
5 changes: 0 additions & 5 deletions communication/src/allocator/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::rc::Rc;
use std::cell::RefCell;
use std::sync::{Arc, Mutex};
use std::any::Any;
use std::time::Duration;
use std::collections::{HashMap};
use crossbeam_channel::{Sender, Receiver};

Expand Down Expand Up @@ -178,10 +177,6 @@ impl Allocate for Process {
self.inner.events()
}

fn await_events(&self, duration: Option<Duration>) {
self.inner.await_events(duration);
}

fn receive(&mut self) {
let mut events = self.inner.events().borrow_mut();
while let Ok(index) = self.counters_recv.try_recv() {
Expand Down
11 changes: 0 additions & 11 deletions communication/src/allocator/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

use std::rc::Rc;
use std::cell::RefCell;
use std::time::Duration;
use std::collections::VecDeque;

use crate::allocator::{Allocate, AllocateBuilder};
Expand Down Expand Up @@ -35,16 +34,6 @@ impl Allocate for Thread {
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
&self.events
}
fn await_events(&self, duration: Option<Duration>) {
if self.events.borrow().is_empty() {
if let Some(duration) = duration {
std::thread::park_timeout(duration);
}
else {
std::thread::park();
}
}
}
}

/// Thread-local counting channel push endpoint.
Expand Down
3 changes: 0 additions & 3 deletions communication/src/allocator/zero_copy/allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,4 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
self.inner.events()
}
fn await_events(&self, duration: Option<std::time::Duration>) {
self.inner.await_events(duration);
}
}
10 changes: 0 additions & 10 deletions communication/src/allocator/zero_copy/allocator_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,4 @@ impl Allocate for ProcessAllocator {
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
&self.events
}
fn await_events(&self, duration: Option<std::time::Duration>) {
if self.events.borrow().is_empty() {
if let Some(duration) = duration {
std::thread::park_timeout(duration);
}
else {
std::thread::park();
}
}
}
}
10 changes: 5 additions & 5 deletions timely/examples/logging-send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ fn main() {
let mut probe = ProbeHandle::new();

// Register timely worker logging.
worker.log_register().insert::<TimelyEvent,_>("timely", |_time, data|
worker.log_register().unwrap().insert::<TimelyEvent,_>("timely", |_time, data|
data.iter().for_each(|x| println!("LOG1: {:?}", x))
);

// Register timely progress logging.
// Less generally useful: intended for debugging advanced custom operators or timely
// internals.
worker.log_register().insert::<TimelyProgressEvent,_>("timely/progress", |_time, data|
worker.log_register().unwrap().insert::<TimelyProgressEvent,_>("timely/progress", |_time, data|
data.iter().for_each(|x| {
println!("PROGRESS: {:?}", x);
let (_, _, ev) = x;
Expand All @@ -50,7 +50,7 @@ fn main() {
});

// Register timely worker logging.
worker.log_register().insert::<TimelyEvent,_>("timely", |_time, data|
worker.log_register().unwrap().insert::<TimelyEvent,_>("timely", |_time, data|
data.iter().for_each(|x| println!("LOG2: {:?}", x))
);

Expand All @@ -63,13 +63,13 @@ fn main() {
});

// Register user-level logging.
worker.log_register().insert::<(),_>("input", |_time, data|
worker.log_register().unwrap().insert::<(),_>("input", |_time, data|
for element in data.iter() {
println!("Round tick at: {:?}", element.0);
}
);

let input_logger = worker.log_register().get::<()>("input").expect("Input logger absent");
let input_logger = worker.log_register().unwrap().get::<()>("input").expect("Input logger absent");

let timer = std::time::Instant::now();

Expand Down
2 changes: 1 addition & 1 deletion timely/examples/threadless.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ fn main() {

// create a naked single-threaded worker.
let allocator = timely::communication::allocator::Thread::new();
let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator);
let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None);

// create input and probe handles.
let mut input = InputHandle::new();
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/scopes/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ where
fn new_identifier(&mut self) -> usize {
self.parent.new_identifier()
}
fn log_register(&self) -> ::std::cell::RefMut<crate::logging_core::Registry<crate::logging::WorkerIdentifier>> {
fn log_register(&self) -> Option<::std::cell::RefMut<crate::logging_core::Registry<crate::logging::WorkerIdentifier>>> {
self.parent.log_register()
}
}
Expand Down
4 changes: 2 additions & 2 deletions timely/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ where
F: FnOnce(&mut Worker<crate::communication::allocator::thread::Thread>)->T+Send+Sync+'static
{
let alloc = crate::communication::allocator::thread::Thread::new();
let mut worker = crate::worker::Worker::new(WorkerConfig::default(), alloc);
let mut worker = crate::worker::Worker::new(WorkerConfig::default(), alloc, Some(std::time::Instant::now()));
let result = func(&mut worker);
while worker.has_dataflows() {
worker.step_or_park(None);
Expand Down Expand Up @@ -320,7 +320,7 @@ where
T: Send+'static,
F: Fn(&mut Worker<<A as AllocateBuilder>::Allocator>)->T+Send+Sync+'static {
initialize_from(builders, others, move |allocator| {
let mut worker = Worker::new(worker_config.clone(), allocator);
let mut worker = Worker::new(worker_config.clone(), allocator, Some(std::time::Instant::now()));
let result = func(&mut worker);
while worker.has_dataflows() {
worker.step_or_park(None);
Expand Down
7 changes: 5 additions & 2 deletions timely/src/progress/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,11 @@ where
let path = self.path.clone();
let reachability_logging =
worker.log_register()
.get::<reachability::logging::TrackerEvent>("timely/reachability")
.map(|logger| reachability::logging::TrackerLogger::new(path, logger));
.as_ref()
.and_then(|l|
l.get::<reachability::logging::TrackerEvent>("timely/reachability")
.map(|logger| reachability::logging::TrackerLogger::new(path, logger))
);
let (tracker, scope_summary) = builder.build(reachability_logging);

let progcaster = Progcaster::new(worker, &self.path, self.logging.clone(), self.progress_logging.clone());
Expand Down
115 changes: 83 additions & 32 deletions timely/src/scheduling/activate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,38 @@ use std::time::{Duration, Instant};
use std::cmp::Reverse;
use crossbeam_channel::{Sender, Receiver};

/// Methods required to act as a timely scheduler.
/// Methods required to act as a scheduler for timely operators.
///
/// The core methods are the activation of "paths", sequences of integers, and
/// the enumeration of active paths by prefix. A scheduler may delay the report
/// of a path indefinitely, but it should report at least one extension for the
/// empty path `&[]` or risk parking the worker thread without a certain unpark.
/// Operators are described by "paths" of integers, indicating the path along
/// a tree of regions, arriving at the the operator. Each path is either "idle"
/// or "active", where the latter indicates that someone has requested that the
/// operator be scheduled by the worker. Operators go from idle to active when
/// the `activate(path)` method is called, and from active to idle when the path
/// is returned through a call to `extensions(path, _)`.
///
/// There is no known harm to "spurious wake-ups" where a not-active path is
/// returned through `extensions()`.
/// The worker will continually probe for extensions to the root empty path `[]`,
/// and then follow all returned addresses, recursively. A scheduler need not
/// schedule all active paths, but it should return *some* active path when the
/// worker probes the empty path, or the worker may put the thread to sleep.
///
/// There is no known harm to scheduling an idle path.
/// The worker may speculatively schedule paths of its own accord.
pub trait Scheduler {
/// Mark a path as immediately scheduleable.
///
/// The scheduler is not required to immediately schedule the path, but it
/// should not signal that it has no work until the path has been scheduled.
fn activate(&mut self, path: &[usize]);
/// Populates `dest` with next identifiers on active extensions of `path`.
///
/// This method is where a scheduler is allowed to exercise some discretion,
/// in that it does not need to present *all* extensions, but it can instead
/// present only those that the runtime should schedule.
/// present only those that the runtime should immediately schedule.
///
/// The worker will schedule to all extensions before probing new prefixes.
/// The scheduler is invited to rely on this, and to schedule in "batches",
/// where the next time the worker probes for extensions to the empty path
/// then all addresses in the batch have certainly been scheduled.
fn extensions(&mut self, path: &[usize], dest: &mut Vec<usize>);
}

Expand All @@ -48,14 +63,14 @@ pub struct Activations {
rx: Receiver<Vec<usize>>,

// Delayed activations.
timer: Instant,
timer: Option<Instant>,
queue: BinaryHeap<Reverse<(Duration, Vec<usize>)>>,
}

impl Activations {

/// Creates a new activation tracker.
pub fn new(timer: Instant) -> Self {
pub fn new(timer: Option<Instant>) -> Self {
let (tx, rx) = crossbeam_channel::unbounded();
Self {
clean: 0,
Expand All @@ -77,29 +92,36 @@ impl Activations {

/// Schedules a future activation for the task addressed by `path`.
pub fn activate_after(&mut self, path: &[usize], delay: Duration) {
// TODO: We could have a minimum delay and immediately schedule anything less than that delay.
if delay == Duration::new(0, 0) {
self.activate(path);
}
if let Some(timer) = self.timer {
// TODO: We could have a minimum delay and immediately schedule anything less than that delay.
if delay == Duration::new(0, 0) {
self.activate(path);
}
else {
let moment = timer.elapsed() + delay;
self.queue.push(Reverse((moment, path.to_vec())));
}
}
else {
let moment = self.timer.elapsed() + delay;
self.queue.push(Reverse((moment, path.to_vec())));
self.activate(path);
}
}

/// Discards the current active set and presents the next active set.
pub fn advance(&mut self) {
fn advance(&mut self) {

// Drain inter-thread activations.
while let Ok(path) = self.rx.try_recv() {
self.activate(&path[..])
}

// Drain timer-based activations.
let now = self.timer.elapsed();
while self.queue.peek().map(|Reverse((t,_))| t <= &now) == Some(true) {
let Reverse((_time, path)) = self.queue.pop().unwrap();
self.activate(&path[..]);
if let Some(timer) = self.timer {
let now = timer.elapsed();
while self.queue.peek().map(|Reverse((t,_))| t <= &now) == Some(true) {
let Reverse((_time, path)) = self.queue.pop().unwrap();
self.activate(&path[..]);
}
}

self.bounds.drain(.. self.clean);
Expand All @@ -121,15 +143,15 @@ impl Activations {
self.clean = self.bounds.len();
}

/// Maps a function across activated paths.
pub fn map_active(&self, logic: impl Fn(&[usize])) {
for (offset, length) in self.bounds.iter() {
logic(&self.slices[*offset .. (*offset + *length)]);
}
}

/// Sets as active any symbols that follow `path`.
pub fn for_extensions(&self, path: &[usize], mut action: impl FnMut(usize)) {
pub fn for_extensions(&mut self, path: &[usize], mut action: impl FnMut(usize)) {

// Each call for the root path is a moment where the worker has reset.
// This relies on a worker implementation that follows the scheduling
// instructions perfectly; if any offered paths are not explored, oops.
if path.is_empty() {
self.advance();
}

let position =
self.bounds[..self.clean]
Expand Down Expand Up @@ -170,18 +192,47 @@ impl Activations {
/// This method should be used before putting a worker thread to sleep, as it
/// indicates the amount of time before the thread should be unparked for the
/// next scheduled activation.
pub fn empty_for(&self) -> Option<Duration> {
if !self.bounds.is_empty() {
fn empty_for(&self) -> Option<Duration> {
if !self.bounds.is_empty() || self.timer.is_none() {
Some(Duration::new(0,0))
}
else {
self.queue.peek().map(|Reverse((t,_a))| {
let elapsed = self.timer.elapsed();
let elapsed = self.timer.unwrap().elapsed();
if t < &elapsed { Duration::new(0,0) }
else { *t - elapsed }
})
}
}

/// Indicates that there is nothing to do for `timeout`, and that the scheduler
/// can allow the thread to sleep until then.
///
/// The method does not *need* to park the thread, and indeed it may elect to
/// unpark earlier if there are deferred activations.
pub fn park_timeout(&self, timeout: Option<Duration>) {
let empty_for = self.empty_for();
let timeout = match (timeout, empty_for) {
(Some(x), Some(y)) => Some(std::cmp::min(x,y)),
(x, y) => x.or(y),
};

if let Some(timeout) = timeout {
std::thread::park_timeout(timeout);
}
else {
std::thread::park();
}
}
}

impl Scheduler for Activations {
fn activate(&mut self, path: &[usize]) {
self.activate(path);
}
fn extensions(&mut self, path: &[usize], dest: &mut Vec<usize>) {
self.for_extensions(path, |index| dest.push(index));
}
}

/// A thread-safe handle to an `Activations`.
Expand Down
Loading
Loading