diff --git a/Cargo.lock b/Cargo.lock index bcb5900042..6951a19c47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2597,6 +2597,7 @@ dependencies = [ "iroh-quinn", "iroh-test", "num_cpus", + "oneshot", "parking_lot", "pin-project", "postcard", @@ -3601,6 +3602,12 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "oneshot" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e296cf87e61c9cfc1a61c3c63a0f7f286ed4554e0e22be84e8a38e1d264a2a29" + [[package]] name = "oorandom" version = "11.1.4" diff --git a/iroh-blobs/Cargo.toml b/iroh-blobs/Cargo.toml index aa6ecf67d4..44129dd473 100644 --- a/iroh-blobs/Cargo.toml +++ b/iroh-blobs/Cargo.toml @@ -32,6 +32,7 @@ iroh-io = { version = "0.6.0", features = ["stats"] } iroh-metrics = { version = "0.22.0", path = "../iroh-metrics", default-features = false } iroh-net = { version = "0.22.0", path = "../iroh-net" } num_cpus = "1.15.0" +oneshot = "0.1.8" parking_lot = { version = "0.12.1", optional = true } pin-project = "1.1.5" postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] } diff --git a/iroh-blobs/src/store/fs.rs b/iroh-blobs/src/store/fs.rs index 0f79f17001..fb6596ef02 100644 --- a/iroh-blobs/src/store/fs.rs +++ b/iroh-blobs/src/store/fs.rs @@ -83,7 +83,7 @@ use iroh_io::AsyncSliceReader; use redb::{AccessGuard, DatabaseError, ReadableTable, StorageError}; use serde::{Deserialize, Serialize}; use smallvec::SmallVec; -use tokio::{io::AsyncWriteExt, sync::oneshot}; +use tokio::io::AsyncWriteExt; use tracing::trace_span; mod import_flat_store; @@ -534,25 +534,25 @@ pub(crate) enum ActorMessage { /// Query method: get the rough entry status for a hash. Just complete, partial or not found. EntryStatus { hash: Hash, - tx: async_channel::Sender>, + tx: oneshot::Sender>, }, #[cfg(test)] /// Query method: get the full entry state for a hash, both in memory and in redb. /// This is everything we got about the entry, including the actual inline outboard and data. EntryState { hash: Hash, - tx: async_channel::Sender>, + tx: oneshot::Sender>, }, /// Query method: get the full entry state for a hash. GetFullEntryState { hash: Hash, - tx: async_channel::Sender>>, + tx: oneshot::Sender>>, }, /// Modification method: set the full entry state for a hash. SetFullEntryState { hash: Hash, entry: Option, - tx: async_channel::Sender>, + tx: oneshot::Sender>, }, /// Modification method: get or create a file handle for a hash. /// @@ -575,7 +575,7 @@ pub(crate) enum ActorMessage { /// At this point the size, hash and outboard must already be known. Import { cmd: Import, - tx: async_channel::Sender>, + tx: oneshot::Sender>, }, /// Modification method: export data from a redb store /// @@ -921,18 +921,18 @@ impl StoreInner { } async fn entry_status(&self, hash: &Hash) -> OuterResult { - let (tx, rx) = async_channel::bounded(1); + let (tx, rx) = oneshot::channel(); self.tx .send(ActorMessage::EntryStatus { hash: *hash, tx }) .await?; - Ok(rx.recv().await??) + Ok(rx.await??) } fn entry_status_sync(&self, hash: &Hash) -> OuterResult { - let (tx, rx) = async_channel::bounded(1); + let (tx, rx) = oneshot::channel(); self.tx .send_blocking(ActorMessage::EntryStatus { hash: *hash, tx })?; - Ok(rx.recv_blocking()??) + Ok(rx.recv()??) } async fn complete(&self, entry: Entry) -> OuterResult<()> { @@ -1128,7 +1128,7 @@ impl StoreInner { let tag = self.temp.temp_tag(HashAndFormat { hash, format }); let hash = *tag.hash(); // blocking send for the import - let (tx, rx) = async_channel::bounded(1); + let (tx, rx) = oneshot::channel(); self.tx.send_blocking(ActorMessage::Import { cmd: Import { content_id: HashAndFormat { hash, format }, @@ -1138,7 +1138,7 @@ impl StoreInner { }, tx, })?; - Ok(rx.recv_blocking()??) + Ok(rx.recv()??) } fn temp_file_name(&self) -> PathBuf { @@ -1231,18 +1231,24 @@ pub(crate) type ActorResult = std::result::Result; pub(crate) enum OuterError { #[error("inner error: {0}")] Inner(#[from] ActorError), - #[error("send error: {0}")] - Send(#[from] async_channel::SendError), + #[error("send error")] + Send, #[error("progress send error: {0}")] ProgressSend(#[from] ProgressSendError), #[error("recv error: {0}")] - Recv(#[from] oneshot::error::RecvError), + Recv(#[from] oneshot::RecvError), #[error("recv error: {0}")] AsyncChannelRecv(#[from] async_channel::RecvError), #[error("join error: {0}")] JoinTask(#[from] tokio::task::JoinError), } +impl From> for OuterError { + fn from(_e: async_channel::SendError) -> Self { + OuterError::Send + } +} + /// Result type for calling the redb actor from the store. /// /// See [`OuterError`] for what can go wrong. @@ -2236,7 +2242,7 @@ impl ActorState { } ActorMessage::EntryStatus { hash, tx } => { let res = self.entry_status(tables, hash); - tx.send_blocking(res).ok(); + tx.send(res).ok(); } ActorMessage::Blobs { filter, tx } => { let res = self.blobs(tables, filter); @@ -2256,11 +2262,11 @@ impl ActorState { } #[cfg(test)] ActorMessage::EntryState { hash, tx } => { - tx.send_blocking(self.entry_state(tables, hash)).ok(); + tx.send(self.entry_state(tables, hash)).ok(); } ActorMessage::GetFullEntryState { hash, tx } => { let res = self.get_full_entry_state(tables, hash); - tx.send_blocking(res).ok(); + tx.send(res).ok(); } x => return Ok(Err(x)), } @@ -2275,7 +2281,7 @@ impl ActorState { match msg { ActorMessage::Import { cmd, tx } => { let res = self.import(tables, cmd); - tx.send_blocking(res).ok(); + tx.send(res).ok(); } ActorMessage::SetTag { tag, value, tx } => { let res = self.set_tag(tables, tag, value); @@ -2306,7 +2312,7 @@ impl ActorState { } ActorMessage::SetFullEntryState { hash, entry, tx } => { let res = self.set_full_entry_state(tables, hash, entry); - tx.send_blocking(res).ok(); + tx.send(res).ok(); } msg => { // try to handle it as readonly diff --git a/iroh-blobs/src/store/fs/test_support.rs b/iroh-blobs/src/store/fs/test_support.rs index 733cba146f..8b11bb2609 100644 --- a/iroh-blobs/src/store/fs/test_support.rs +++ b/iroh-blobs/src/store/fs/test_support.rs @@ -7,8 +7,6 @@ use std::{ path::{Path, PathBuf}, }; -use tokio::sync::oneshot; - use super::{ tables::{ReadableTables, Tables}, ActorError, ActorMessage, ActorResult, ActorState, DataLocation, EntryState, FilterPredicate, @@ -106,25 +104,25 @@ impl Store { impl StoreInner { #[cfg(test)] async fn entry_state(&self, hash: Hash) -> OuterResult { - let (tx, rx) = async_channel::bounded(1); + let (tx, rx) = oneshot::channel(); self.tx.send(ActorMessage::EntryState { hash, tx }).await?; - Ok(rx.recv().await??) + Ok(rx.await??) } async fn set_full_entry_state(&self, hash: Hash, entry: Option) -> OuterResult<()> { - let (tx, rx) = async_channel::bounded(1); + let (tx, rx) = oneshot::channel(); self.tx .send(ActorMessage::SetFullEntryState { hash, entry, tx }) .await?; - Ok(rx.recv().await??) + Ok(rx.await??) } async fn get_full_entry_state(&self, hash: Hash) -> OuterResult> { - let (tx, rx) = async_channel::bounded(1); + let (tx, rx) = oneshot::channel(); self.tx .send(ActorMessage::GetFullEntryState { hash, tx }) .await?; - Ok(rx.recv().await??) + Ok(rx.await??) } async fn all_blobs(&self) -> OuterResult>> {