Skip to content

Commit

Permalink
Switch to crossbeam-queue for events
Browse files Browse the repository at this point in the history
Use a lock-free queue to store the event list
because the event callbacks might be called from
inside a signal handler. Most system calls are not
safe in that context, so we just spin if the queue
is full.
  • Loading branch information
ahomescu committed May 9, 2024
1 parent 9511a4f commit d83f983
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 13 deletions.
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions analysis/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ bincode = "1.0.1"
once_cell = "1"
enum_dispatch = "0.3"
fs-err = "2"
crossbeam-queue = "0.3"
crossbeam-utils = "0.8"
23 changes: 20 additions & 3 deletions analysis/runtime/src/runtime/backend.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crossbeam_queue::ArrayQueue;
use crossbeam_utils::Backoff;
use enum_dispatch::enum_dispatch;
use fs_err::{File, OpenOptions};
use std::fmt::Debug;
use std::io::{stderr, BufWriter, Write};
use std::sync::Arc;

use bincode;

Expand Down Expand Up @@ -80,18 +83,32 @@ pub enum Backend {
}

impl Backend {
fn write_all(&mut self, events: impl IntoIterator<Item = Event>) {
for event in events {
fn write_all(&mut self, events: Arc<ArrayQueue<Event>>) {
let backoff = Backoff::new();
loop {
let event = match events.pop() {
Some(event) => event,
None => {
// We can't use anything with a futex here since
// the event sender might run inside a signal handler
backoff.snooze();
continue;
}
};

let done = matches!(event.kind, EventKind::Done);
self.write(event);
if done {
break;
}

// Reset the backoff timer since we got an event
backoff.reset();
}
self.flush();
}

pub fn run(&mut self, events: impl IntoIterator<Item = Event>) {
pub fn run(&mut self, events: Arc<ArrayQueue<Event>>) {
let (lock, cvar) = &*FINISHED;
let mut finished = lock.lock().unwrap();
self.write_all(events);
Expand Down
34 changes: 24 additions & 10 deletions analysis/runtime/src/runtime/scoped_runtime.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::{
sync::{
mpsc::{self, SyncSender},
Mutex,
},
sync::{Arc, Mutex},
thread,
};

use crossbeam_queue::ArrayQueue;
use crossbeam_utils::Backoff;
use enum_dispatch::enum_dispatch;
use once_cell::sync::OnceCell;

Expand Down Expand Up @@ -117,16 +116,32 @@ impl Runtime for MainThreadRuntime {
}

pub struct BackgroundThreadRuntime {
tx: SyncSender<Event>,
tx: Arc<ArrayQueue<Event>>,
finalized: OnceCell<()>,
}

impl BackgroundThreadRuntime {
fn push_event(&self, mut event: Event, can_sleep: bool) {
let backoff = Backoff::new();
while let Err(event_back) = self.tx.push(event) {
if can_sleep {
backoff.snooze();
} else {
// We have no choice but to spin here because
// we might be inside a signal handler
backoff.spin();
}
event = event_back;
}
}
}

impl ExistingRuntime for BackgroundThreadRuntime {
fn finalize(&self) {
// Only run the finalizer once.
self.finalized.get_or_init(|| {
// Notify the backend that we're done.
self.tx.send(Event::done()).unwrap();
self.push_event(Event::done(), true);

// Wait for the backend thread to finish.
let (lock, cvar) = &*FINISHED;
Expand All @@ -147,9 +162,7 @@ impl ExistingRuntime for BackgroundThreadRuntime {
fn send_event(&self, event: Event) {
match self.finalized.get() {
None => {
// `.unwrap()` as we're in no place to handle an error here,
// unless we should silently drop the [`Event`] instead.
self.tx.send(event).unwrap();
self.push_event(event, false);
}
Some(()) => {
// Silently drop the [`Event`] as the [`BackgroundThreadRuntime`] has already been [`BackgroundThreadRuntime::finalize`]d.
Expand All @@ -172,7 +185,8 @@ impl Runtime for BackgroundThreadRuntime {
/// Initialize the [`BackgroundThreadRuntime`], which includes [`thread::spawn`]ing,
/// so it must be run post-`main`.
fn try_init(mut backend: Backend) -> Result<Self, AnyError> {
let (tx, rx) = mpsc::sync_channel(1024);
let tx = Arc::new(ArrayQueue::new(1 << 20));
let rx = Arc::clone(&tx);
thread::spawn(move || backend.run(rx));
Ok(Self {
tx,
Expand Down

0 comments on commit d83f983

Please sign in to comment.