Skip to content

Commit

Permalink
Make Whatever errors thread-safe
Browse files Browse the repository at this point in the history
Fixes #43
  • Loading branch information
nightkr committed May 8, 2024
1 parent cf2df9f commit a3d8530
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 60 deletions.
10 changes: 10 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
use snafu::Snafu;

/// Errors returned by tokio-zookeeper (rather than by the ZooKeeper server)
// Largely a copy of snafu::Whatever, adapted to provide Send+Sync
#[derive(Debug, Snafu)]
#[snafu(whatever, display("{message}"))]
pub struct Error {
#[snafu(source(from(Box<dyn std::error::Error + Send + Sync>, Some)))]
source: Option<Box<dyn std::error::Error + Send + Sync>>,
message: String,
}

/// Errors that may cause a delete request to fail.
#[derive(Clone, Copy, PartialEq, Eq, Debug, Snafu)]
#[snafu(module)]
Expand Down
59 changes: 26 additions & 33 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,9 @@
#![deny(missing_debug_implementations)]
#![deny(missing_copy_implementations)]

use error::Error;
use futures::{channel::oneshot, Stream};
use snafu::{whatever as bail, ResultExt, Whatever};
use snafu::{whatever as bail, ResultExt};
use std::borrow::Cow;
use std::net::SocketAddr;
use std::time;
Expand All @@ -212,7 +213,7 @@ pub use crate::types::{

macro_rules! format_err {
($($x:tt)*) => {
<snafu::Whatever as snafu::FromString>::without_source(format!($($x)*))
<crate::error::Error as snafu::FromString>::without_source(format!($($x)*))
};
}
pub(crate) use format_err;
Expand Down Expand Up @@ -272,7 +273,7 @@ impl ZooKeeperBuilder {
pub async fn connect(
self,
addr: &SocketAddr,
) -> Result<(ZooKeeper, impl Stream<Item = WatchedEvent>), Whatever> {
) -> Result<(ZooKeeper, impl Stream<Item = WatchedEvent>), Error> {
let (tx, rx) = futures::channel::mpsc::unbounded();
let stream = tokio::net::TcpStream::connect(addr)
.await
Expand All @@ -293,7 +294,7 @@ impl ZooKeeperBuilder {
addr: SocketAddr,
stream: tokio::net::TcpStream,
default_watcher: futures::channel::mpsc::UnboundedSender<WatchedEvent>,
) -> Result<ZooKeeper, Whatever> {
) -> Result<ZooKeeper, Error> {
let request = proto::Request::Connect {
protocol_version: 0,
last_zxid_seen: 0,
Expand Down Expand Up @@ -321,7 +322,7 @@ impl ZooKeeper {
/// See [`ZooKeeperBuilder::connect`].
pub async fn connect(
addr: &SocketAddr,
) -> Result<(Self, impl Stream<Item = WatchedEvent>), Whatever> {
) -> Result<(Self, impl Stream<Item = WatchedEvent>), Error> {
ZooKeeperBuilder::default().connect(addr).await
}

Expand Down Expand Up @@ -361,7 +362,7 @@ impl ZooKeeper {
data: D,
acl: A,
mode: CreateMode,
) -> Result<Result<String, error::Create>, Whatever>
) -> Result<Result<String, error::Create>, Error>
where
D: Into<Cow<'static, [u8]>>,
A: Into<Cow<'static, [Acl]>>,
Expand Down Expand Up @@ -395,7 +396,7 @@ impl ZooKeeper {
path: &str,
version: Option<i32>,
data: D,
) -> Result<Result<Stat, error::SetData>, Whatever>
) -> Result<Result<Stat, error::SetData>, Error>
where
D: Into<Cow<'static, [u8]>>,
{
Expand Down Expand Up @@ -425,7 +426,7 @@ impl ZooKeeper {
&self,
path: &str,
version: Option<i32>,
) -> Result<Result<(), error::Delete>, Whatever> {
) -> Result<Result<(), error::Delete>, Error> {
let version = version.unwrap_or(-1);
self.connection
.enqueue(proto::Request::Delete {
Expand All @@ -445,7 +446,7 @@ impl ZooKeeper {
pub async fn get_acl(
&self,
path: &str,
) -> Result<Result<(Vec<Acl>, Stat), error::GetAcl>, Whatever> {
) -> Result<Result<(Vec<Acl>, Stat), error::GetAcl>, Error> {
self.connection
.enqueue(proto::Request::GetAcl {
path: path.to_string(),
Expand All @@ -469,7 +470,7 @@ impl ZooKeeper {
path: &str,
acl: A,
version: Option<i32>,
) -> Result<Result<Stat, error::SetAcl>, Whatever>
) -> Result<Result<Stat, error::SetAcl>, Error>
where
A: Into<Cow<'static, [Acl]>>,
{
Expand Down Expand Up @@ -498,7 +499,7 @@ impl ZooKeeper {
}

#[instrument(name = "exists")]
async fn exists_w(&self, path: &str, watch: Watch) -> Result<Option<Stat>, Whatever> {
async fn exists_w(&self, path: &str, watch: Watch) -> Result<Option<Stat>, Error> {
self.connection
.enqueue(proto::Request::Exists {
path: path.to_string(),
Expand All @@ -509,16 +510,12 @@ impl ZooKeeper {
}

/// Return the [`Stat`] of the node of the given `path`, or `None` if the node does not exist.
pub async fn exists(&self, path: &str) -> Result<Option<Stat>, Whatever> {
pub async fn exists(&self, path: &str) -> Result<Option<Stat>, Error> {
self.exists_w(path, Watch::None).await
}

#[instrument]
async fn get_children_w(
&self,
path: &str,
watch: Watch,
) -> Result<Option<Vec<String>>, Whatever> {
async fn get_children_w(&self, path: &str, watch: Watch) -> Result<Option<Vec<String>>, Error> {
self.connection
.enqueue(proto::Request::GetChildren {
path: path.to_string(),
Expand All @@ -533,16 +530,12 @@ impl ZooKeeper {
///
/// The returned list of children is not sorted and no guarantee is provided as to its natural
/// or lexical order.
pub async fn get_children(&self, path: &str) -> Result<Option<Vec<String>>, Whatever> {
pub async fn get_children(&self, path: &str) -> Result<Option<Vec<String>>, Error> {
self.get_children_w(path, Watch::None).await
}

#[instrument]
async fn get_data_w(
&self,
path: &str,
watch: Watch,
) -> Result<Option<(Vec<u8>, Stat)>, Whatever> {
async fn get_data_w(&self, path: &str, watch: Watch) -> Result<Option<(Vec<u8>, Stat)>, Error> {
self.connection
.enqueue(proto::Request::GetData {
path: path.to_string(),
Expand All @@ -554,7 +547,7 @@ impl ZooKeeper {

/// Return the data and the [`Stat`] of the node at the given `path`, or `None` if it does not
/// exist.
pub async fn get_data(&self, path: &str) -> Result<Option<(Vec<u8>, Stat)>, Whatever> {
pub async fn get_data(&self, path: &str) -> Result<Option<(Vec<u8>, Stat)>, Error> {
self.get_data_w(path, Watch::None).await
}

Expand All @@ -580,7 +573,7 @@ impl<'a> WatchGlobally<'a> {
/// If no errors occur, a watch is left on the node at the given `path`. The watch is triggered
/// by any successful operation that creates or deletes the node, or sets the node's data. When
/// the watch triggers, an event is sent to the global watcher stream.
pub async fn exists(&self, path: &str) -> Result<Option<Stat>, Whatever> {
pub async fn exists(&self, path: &str) -> Result<Option<Stat>, Error> {
self.0.exists_w(path, Watch::Global).await
}

Expand All @@ -594,7 +587,7 @@ impl<'a> WatchGlobally<'a> {
/// by any successful operation that deletes the node at the given `path`, or creates or
/// deletes a child of that node. When the watch triggers, an event is sent to the global
/// watcher stream.
pub async fn get_children(&self, path: &str) -> Result<Option<Vec<String>>, Whatever> {
pub async fn get_children(&self, path: &str) -> Result<Option<Vec<String>>, Error> {
self.0.get_children_w(path, Watch::Global).await
}

Expand All @@ -604,7 +597,7 @@ impl<'a> WatchGlobally<'a> {
/// If no errors occur, a watch is left on the node at the given `path`. The watch is triggered
/// by any successful operation that sets the node's data, or deletes it. When the watch
/// triggers, an event is sent to the global watcher stream.
pub async fn get_data(&self, path: &str) -> Result<Option<(Vec<u8>, Stat)>, Whatever> {
pub async fn get_data(&self, path: &str) -> Result<Option<(Vec<u8>, Stat)>, Error> {
self.0.get_data_w(path, Watch::Global).await
}
}
Expand All @@ -625,7 +618,7 @@ impl<'a> WithWatcher<'a> {
pub async fn exists(
&self,
path: &str,
) -> Result<(oneshot::Receiver<WatchedEvent>, Option<Stat>), Whatever> {
) -> Result<(oneshot::Receiver<WatchedEvent>, Option<Stat>), Error> {
let (tx, rx) = oneshot::channel();
self.0
.exists_w(path, Watch::Custom(tx))
Expand All @@ -646,7 +639,7 @@ impl<'a> WithWatcher<'a> {
pub async fn get_children(
&self,
path: &str,
) -> Result<Option<(oneshot::Receiver<WatchedEvent>, Vec<String>)>, Whatever> {
) -> Result<Option<(oneshot::Receiver<WatchedEvent>, Vec<String>)>, Error> {
let (tx, rx) = oneshot::channel();
self.0
.get_children_w(path, Watch::Custom(tx))
Expand All @@ -663,7 +656,7 @@ impl<'a> WithWatcher<'a> {
pub async fn get_data(
&self,
path: &str,
) -> Result<Option<(oneshot::Receiver<WatchedEvent>, Vec<u8>, Stat)>, Whatever> {
) -> Result<Option<(oneshot::Receiver<WatchedEvent>, Vec<u8>, Stat)>, Error> {
let (tx, rx) = oneshot::channel();
self.0
.get_data_w(path, Watch::Custom(tx))
Expand Down Expand Up @@ -736,7 +729,7 @@ impl<'a> MultiBuilder<'a> {
}

/// Run executes the attached requests in one atomic unit.
pub async fn run(self) -> Result<Vec<Result<MultiResponse, error::Multi>>, Whatever> {
pub async fn run(self) -> Result<Vec<Result<MultiResponse, error::Multi>>, Error> {
let (zk, requests) = (self.zk, self.requests);
let reqs_lite: Vec<transform::RequestMarker> = requests.iter().map(|r| r.into()).collect();
zk.connection
Expand Down Expand Up @@ -1022,13 +1015,13 @@ mod tests {
init_tracing_subscriber();
let builder = ZooKeeperBuilder::default();

async fn check_exists(zk: &ZooKeeper, paths: &[&str]) -> Result<Vec<bool>, Whatever> {
async fn check_exists(zk: &ZooKeeper, paths: &[&str]) -> Result<Vec<bool>, Error> {
let mut res = Vec::new();
for p in paths {
let exists = zk.exists(p).await?;
res.push(exists.is_some());
}
Result::<_, Whatever>::Ok(res)
Result::<_, Error>::Ok(res)
}

let (zk, _) = builder
Expand Down
6 changes: 3 additions & 3 deletions src/proto/active_packetizer.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use super::{request, watch::WatchType, Request, Response};
use crate::{WatchedEvent, WatchedEventType, ZkError};
use crate::{error::Error as DynError, WatchedEvent, WatchedEventType, ZkError};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use futures::{
channel::{mpsc, oneshot},
ready,
};
use pin_project::pin_project;
use snafu::{Snafu, Whatever};
use snafu::Snafu;
use std::collections::HashMap;
use std::{
future::Future,
Expand All @@ -23,7 +23,7 @@ pub enum Error {
#[snafu(transparent)]
Io { source: std::io::Error },
#[snafu(transparent)]
Whatever { source: Whatever },
Whatever { source: DynError },

#[snafu(display("connection closed with {len} bytes left in buffer: {buf:x?}", len = buf.len()))]
ConnectionClosed { buf: Vec<u8> },
Expand Down
2 changes: 1 addition & 1 deletion src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub(crate) use self::watch::Watch;
#[async_trait]
pub trait ZooKeeperTransport: AsyncRead + AsyncWrite + Sized + Send + 'static {
type Addr: Send + Clone;
type ConnectError: Error + 'static;
type ConnectError: Error + Send + Sync + 'static;
async fn connect(addr: Self::Addr) -> Result<Self, Self::ConnectError>;
}

Expand Down
12 changes: 6 additions & 6 deletions src/proto/packetizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ use super::{
active_packetizer::ActivePacketizer, request, watch::WatchType, Request, Response,
ZooKeeperTransport,
};
use crate::{format_err, Watch, WatchedEvent, ZkError};
use crate::{error::Error, format_err, Watch, WatchedEvent, ZkError};
use byteorder::{BigEndian, WriteBytesExt};
use futures::{
channel::{mpsc, oneshot},
future::Either,
ready, FutureExt, StreamExt, TryFutureExt,
};
use pin_project::pin_project;
use snafu::{ResultExt, Whatever};
use snafu::ResultExt;
use std::{
future::{self, Future},
mem,
Expand Down Expand Up @@ -83,7 +83,7 @@ where
enum PacketizerState<S> {
Connected(#[pin] ActivePacketizer<S>),
Reconnecting(
Pin<Box<dyn Future<Output = Result<ActivePacketizer<S>, Whatever>> + Send + 'static>>,
Pin<Box<dyn Future<Output = Result<ActivePacketizer<S>, Error>> + Send + 'static>>,
),
}

Expand All @@ -96,7 +96,7 @@ where
cx: &mut Context,
exiting: bool,
default_watcher: &mut mpsc::UnboundedSender<WatchedEvent>,
) -> Poll<Result<(), Whatever>> {
) -> Poll<Result<(), Error>> {
let ap = match self.as_mut().project() {
PacketizerStateProj::Connected(ref mut ap) => {
return ap
Expand Down Expand Up @@ -176,7 +176,7 @@ impl<S> Future for Packetizer<S>
where
S: ZooKeeperTransport,
{
type Output = Result<(), Whatever>;
type Output = Result<(), Error>;

#[instrument(skip(self, cx))]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
Expand Down Expand Up @@ -295,7 +295,7 @@ impl Enqueuer {
pub(crate) fn enqueue(
&self,
request: Request,
) -> impl Future<Output = Result<Result<Response, ZkError>, Whatever>> {
) -> impl Future<Output = Result<Result<Response, ZkError>, Error>> {
let (tx, rx) = oneshot::channel();
match self.0.unbounded_send((request, tx)) {
Ok(()) => {
Expand Down
Loading

0 comments on commit a3d8530

Please sign in to comment.