From 5003a23cb35a94063bec8c600cc753513bf9ac9a Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Mon, 10 Jul 2023 14:33:23 +0200 Subject: [PATCH] Refactor Errors names to be less redundant Signed-off-by: Tomasz Pietrek --- async-nats/src/jetstream/consumer/pull.rs | 67 ++++++++--------- async-nats/src/jetstream/consumer/push.rs | 31 ++++---- async-nats/src/jetstream/context.rs | 87 +++++++++++------------ async-nats/src/jetstream/kv/mod.rs | 24 +++---- async-nats/src/jetstream/stream.rs | 54 +++++++------- 5 files changed, 120 insertions(+), 143 deletions(-) diff --git a/async-nats/src/jetstream/consumer/pull.rs b/async-nats/src/jetstream/consumer/pull.rs index 9c581bc91..8ac1db0d3 100644 --- a/async-nats/src/jetstream/consumer/pull.rs +++ b/async-nats/src/jetstream/consumer/pull.rs @@ -462,22 +462,17 @@ impl<'a> futures::Stream for Sequence<'a> { self.next = Some(Box::pin(async move { let inbox = context.client.new_inbox(); - let subscriber = - context - .client - .subscribe(inbox.clone()) - .await - .map_err(|err| { - MessagesError::with_source(MessagesErrorKind::PullFailed, err) - })?; + let subscriber = context + .client + .subscribe(inbox.clone()) + .await + .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?; context .client .publish_with_reply(subject, inbox, request) .await - .map_err(|err| { - MessagesError::with_source(MessagesErrorKind::PullFailed, err) - })?; + .map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?; // TODO(tp): Add timeout config and defaults. Ok(Batch { @@ -493,7 +488,7 @@ impl<'a> futures::Stream for Sequence<'a> { Poll::Ready(result) => { self.next = None; Poll::Ready(Some(result.map_err(|err| { - MessagesError::with_source(MessagesErrorKind::PullFailed, err) + MessagesError::with_source(MessagesErrorKind::Pull, err) }))) } Poll::Pending => Poll::Pending, @@ -504,7 +499,7 @@ impl<'a> futures::Stream for Sequence<'a> { Poll::Ready(result) => { self.next = None; Poll::Ready(Some(result.map_err(|err| { - MessagesError::with_source(MessagesErrorKind::PullFailed, err) + MessagesError::with_source(MessagesErrorKind::Pull, err) }))) } Poll::Pending => Poll::Pending, @@ -822,7 +817,7 @@ impl<'a> futures::Stream for Ordered<'a> { } Err(err) => { return Poll::Ready(Some(Err(OrderedError::with_source( - OrderedErrorKind::RecreationFailed, + OrderedErrorKind::Recreate, err, )))) } @@ -1013,12 +1008,12 @@ impl std::fmt::Display for OrderedError { match &self.kind() { OrderedErrorKind::MissingHeartbeat => write!(f, "missed idle heartbeat"), OrderedErrorKind::ConsumerDeleted => write!(f, "consumer deleted"), - OrderedErrorKind::PullFailed => { + OrderedErrorKind::Pull => { write!(f, "pull request failed: {}", self.format_source()) } OrderedErrorKind::Other => write!(f, "error: {}", self.format_source()), OrderedErrorKind::PushBasedConsumer => write!(f, "cannot use with push consumer"), - OrderedErrorKind::RecreationFailed => write!(f, "consumer recreation failed"), + OrderedErrorKind::Recreate => write!(f, "consumer recreation failed"), } } } @@ -1034,8 +1029,8 @@ impl From for OrderedError { MessagesErrorKind::ConsumerDeleted => { OrderedError::new(OrderedErrorKind::ConsumerDeleted) } - MessagesErrorKind::PullFailed => OrderedError { - kind: OrderedErrorKind::PullFailed, + MessagesErrorKind::Pull => OrderedError { + kind: OrderedErrorKind::Pull, source: err.source, }, MessagesErrorKind::PushBasedConsumer => { @@ -1053,9 +1048,9 @@ impl From for OrderedError { pub enum OrderedErrorKind { MissingHeartbeat, ConsumerDeleted, - PullFailed, + Pull, PushBasedConsumer, - RecreationFailed, + Recreate, Other, } @@ -1070,7 +1065,7 @@ impl std::fmt::Display for MessagesError { match &self.kind() { MessagesErrorKind::MissingHeartbeat => write!(f, "missed idle heartbeat"), MessagesErrorKind::ConsumerDeleted => write!(f, "consumer deleted"), - MessagesErrorKind::PullFailed => { + MessagesErrorKind::Pull => { write!(f, "pull request failed: {}", self.format_source()) } MessagesErrorKind::Other => write!(f, "error: {}", self.format_source()), @@ -1085,7 +1080,7 @@ crate::error_impls!(MessagesError, MessagesErrorKind); pub enum MessagesErrorKind { MissingHeartbeat, ConsumerDeleted, - PullFailed, + Pull, PushBasedConsumer, Other, } @@ -1151,7 +1146,7 @@ impl futures::Stream for Stream { } Err(err) => { return Poll::Ready(Some(Err(MessagesError::with_source( - MessagesErrorKind::PullFailed, + MessagesErrorKind::Pull, err, )))) } @@ -2256,7 +2251,7 @@ crate::error_impls!(BatchError, BatchErrorKind); impl std::fmt::Display for BatchError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match &self.kind() { - BatchErrorKind::PullFailed => { + BatchErrorKind::Pull => { write!(f, "pull request failed: {}", self.format_source()) } BatchErrorKind::Flush => { @@ -2278,14 +2273,14 @@ impl From for BatchError { impl From for BatchError { fn from(err: BatchRequestError) -> Self { - BatchError::with_source(BatchErrorKind::PullFailed, err) + BatchError::with_source(BatchErrorKind::Pull, err) } } #[derive(Debug, Clone, Copy, PartialEq)] pub enum BatchErrorKind { Subscribe, - PullFailed, + Pull, Flush, Serialize, } @@ -2301,10 +2296,10 @@ crate::error_impls!(ConsumerRecreateError, ConsumerRecreateErrorKind); impl std::fmt::Display for ConsumerRecreateError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match &self.kind() { - ConsumerRecreateErrorKind::StreamGetFailed => { + ConsumerRecreateErrorKind::GetStream => { write!(f, "error getting stream: {}", self.format_source()) } - ConsumerRecreateErrorKind::RecreationFailed => { + ConsumerRecreateErrorKind::Recreate => { write!(f, "consumer creation failed: {}", self.format_source()) } ConsumerRecreateErrorKind::TimedOut => write!(f, "timed out"), @@ -2314,8 +2309,8 @@ impl std::fmt::Display for ConsumerRecreateError { #[derive(Debug, Clone, PartialEq, Copy)] pub enum ConsumerRecreateErrorKind { - StreamGetFailed, - RecreationFailed, + GetStream, + Recreate, TimedOut, } @@ -2331,13 +2326,13 @@ async fn recreate_consumer_stream( .get_stream(stream_name.clone()) .await .map_err(|err| { - ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::StreamGetFailed, err) + ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::GetStream, err) })?; stream .delete_consumer(&consumer_name) .await .map_err(|err| { - ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::RecreationFailed, err) + ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err) })?; let deliver_policy = { @@ -2358,12 +2353,8 @@ async fn recreate_consumer_stream( ) .await .map_err(|_| ConsumerRecreateError::new(ConsumerRecreateErrorKind::TimedOut))? - .map_err(|err| { - ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::RecreationFailed, err) - })? + .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err))? .messages() - .map_err(|err| { - ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::RecreationFailed, err) - }) + .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err)) .await } diff --git a/async-nats/src/jetstream/consumer/push.rs b/async-nats/src/jetstream/consumer/push.rs index c93991ed0..f7916be25 100644 --- a/async-nats/src/jetstream/consumer/push.rs +++ b/async-nats/src/jetstream/consumer/push.rs @@ -591,10 +591,7 @@ impl<'a> futures::Stream for Ordered<'a> { Poll::Ready(subscriber) => { self.subscriber_future = None; self.subscriber = Some(subscriber.map_err(|err| { - OrderedError::with_source( - OrderedErrorKind::RecreationFailed, - err, - ) + OrderedError::with_source(OrderedErrorKind::Recreate, err) })?); } Poll::Pending => { @@ -607,7 +604,7 @@ impl<'a> futures::Stream for Ordered<'a> { self.subscriber_future = None; self.consumer_sequence.store(0, Ordering::Relaxed); self.subscriber = Some(subscriber.map_err(|err| { - OrderedError::with_source(OrderedErrorKind::RecreationFailed, err) + OrderedError::with_source(OrderedErrorKind::Recreate, err) })?); } Poll::Pending => { @@ -727,7 +724,7 @@ impl std::fmt::Display for OrderedError { OrderedErrorKind::ConsumerDeleted => write!(f, "consumer deleted"), OrderedErrorKind::Other => write!(f, "error: {}", self.format_source()), OrderedErrorKind::PullBasedConsumer => write!(f, "cannot use with push consumer"), - OrderedErrorKind::RecreationFailed => write!(f, "consumer recreation failed"), + OrderedErrorKind::Recreate => write!(f, "consumer recreation failed"), } } } @@ -759,7 +756,7 @@ pub enum OrderedErrorKind { MissingHeartbeat, ConsumerDeleted, PullBasedConsumer, - RecreationFailed, + Recreate, Other, } @@ -801,23 +798,23 @@ crate::error_impls!(ConsumerRecreateError, ConsumerRecreateErrorKind); impl std::fmt::Display for ConsumerRecreateError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match &self.kind() { - ConsumerRecreateErrorKind::StreamGetFailed => { + ConsumerRecreateErrorKind::GetStream => { write!(f, "error getting stream: {}", self.format_source()) } - ConsumerRecreateErrorKind::RecreationFailed => { + ConsumerRecreateErrorKind::Recreate => { write!(f, "consumer creation failed: {}", self.format_source()) } ConsumerRecreateErrorKind::TimedOut => write!(f, "timed out"), - ConsumerRecreateErrorKind::SubscriptionFailed => write!(f, "failed to resubscribe"), + ConsumerRecreateErrorKind::Subscription => write!(f, "failed to resubscribe"), } } } #[derive(Debug, Clone, PartialEq, Copy)] pub enum ConsumerRecreateErrorKind { - StreamGetFailed, - SubscriptionFailed, - RecreationFailed, + GetStream, + Subscription, + Recreate, TimedOut, } @@ -832,7 +829,7 @@ async fn recreate_consumer_and_subscription( .subscribe(config.deliver_subject.clone()) .await .map_err(|err| { - ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::SubscriptionFailed, err) + ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Subscription, err) })?; recreate_ephemeral_consumer(context, config, stream_name, sequence).await?; @@ -848,7 +845,7 @@ async fn recreate_ephemeral_consumer( .get_stream(stream_name.clone()) .await .map_err(|err| { - ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::StreamGetFailed, err) + ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::GetStream, err) })?; let deliver_policy = { @@ -869,8 +866,6 @@ async fn recreate_ephemeral_consumer( ) .await .map_err(|_| ConsumerRecreateError::new(ConsumerRecreateErrorKind::TimedOut))? - .map_err(|err| { - ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::RecreationFailed, err) - })?; + .map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err))?; Ok(()) } diff --git a/async-nats/src/jetstream/context.rs b/async-nats/src/jetstream/context.rs index 2a7f43630..fb6ebbdf2 100644 --- a/async-nats/src/jetstream/context.rs +++ b/async-nats/src/jetstream/context.rs @@ -340,11 +340,11 @@ impl Context { let request: Response = self .request(subject, &()) .await - .map_err(|err| GetStreamError::with_source(GetStreamErrorKind::RequestError, err))?; + .map_err(|err| GetStreamError::with_source(GetStreamErrorKind::Request, err))?; match request { - Response::Err { error } => Err(GetStreamError::new( - GetStreamErrorKind::JetStreamError(error), - )), + Response::Err { error } => { + Err(GetStreamError::new(GetStreamErrorKind::JetStream(error))) + } Response::Ok(info) => Ok(Stream { context: self.clone(), info, @@ -420,11 +420,13 @@ impl Context { return Err(DeleteStreamError::new(DeleteStreamErrorKind::EmptyName)); } let subject = format!("STREAM.DELETE.{stream}"); - match self.request(subject, &json!({})).await.map_err(|err| { - DeleteStreamError::with_source(DeleteStreamErrorKind::RequestError, err) - })? { + match self + .request(subject, &json!({})) + .await + .map_err(|err| DeleteStreamError::with_source(DeleteStreamErrorKind::Request, err))? + { Response::Err { error } => Err(DeleteStreamError::new( - DeleteStreamErrorKind::JetStreamError(error), + DeleteStreamErrorKind::JetStream(error), )), Response::Ok(delete_response) => Ok(delete_response), } @@ -541,7 +543,7 @@ impl Context { let stream_name = format!("KV_{}", &bucket); let stream = self .get_stream(stream_name.clone()) - .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::FailedGetBucket, err)) + .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::GetBucket, err)) .await?; if stream.info.config.max_messages_per_subject < 1 { @@ -660,10 +662,7 @@ impl Context { if err.kind() == CreateStreamErrorKind::TimedOut { CreateKeyValueError::with_source(CreateKeyValueErrorKind::TimedOut, err) } else { - CreateKeyValueError::with_source( - CreateKeyValueErrorKind::BucketCreationFailed, - err, - ) + CreateKeyValueError::with_source(CreateKeyValueErrorKind::BucketCreate, err) } })?; @@ -720,7 +719,7 @@ impl Context { let stream_name = format!("KV_{}", bucket.as_ref()); self.delete_stream(stream_name) - .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::JetStreamError, err)) + .map_err(|err| KeyValueError::with_source(KeyValueErrorKind::JetStream, err)) .await } @@ -885,10 +884,7 @@ impl Context { }) .await .map_err(|err| { - CreateObjectStoreError::with_source( - CreateKeyValueErrorKind::BucketCreationFailed, - err, - ) + CreateObjectStoreError::with_source(CreateKeyValueErrorKind::BucketCreate, err) })?; Ok(ObjectStore { @@ -921,9 +917,10 @@ impl Context { )); } let stream_name = format!("OBJ_{bucket_name}"); - let stream = self.get_stream(stream_name).await.map_err(|err| { - ObjectStoreError::with_source(ObjectStoreErrorKind::FailedGetStore, err) - })?; + let stream = self + .get_stream(stream_name) + .await + .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?; Ok(ObjectStore { name: bucket_name.to_string(), @@ -949,9 +946,9 @@ impl Context { bucket_name: T, ) -> Result<(), DeleteObjectStore> { let stream_name = format!("OBJ_{}", bucket_name.as_ref()); - self.delete_stream(stream_name).await.map_err(|err| { - ObjectStoreError::with_source(ObjectStoreErrorKind::FailedGetStore, err) - })?; + self.delete_stream(stream_name) + .await + .map_err(|err| ObjectStoreError::with_source(ObjectStoreErrorKind::GetStore, err))?; Ok(()) } } @@ -1341,13 +1338,13 @@ impl Display for CreateStreamError { CreateStreamErrorKind::DomainAndExternalSet => { write!(f, "domain and external are both set") } - CreateStreamErrorKind::JetStreamError(err) => { + CreateStreamErrorKind::JetStream(err) => { write!(f, "jetstream error: {}", err) } CreateStreamErrorKind::TimedOut => write!(f, "jetstream request timed out"), CreateStreamErrorKind::JetStreamUnavailable => write!(f, "jetstream unavailable"), CreateStreamErrorKind::ResponseParse => write!(f, "failed to parse server response"), - CreateStreamErrorKind::ResponseError => { + CreateStreamErrorKind::Response => { write!(f, "response error: {}", self.format_source()) } } @@ -1356,7 +1353,7 @@ impl Display for CreateStreamError { impl From for CreateStreamError { fn from(error: super::errors::Error) -> Self { - CreateStreamError::new(CreateStreamErrorKind::JetStreamError(error)) + CreateStreamError::new(CreateStreamErrorKind::JetStream(error)) } } @@ -1368,10 +1365,10 @@ impl From for CreateStreamError { } RequestErrorKind::TimedOut => CreateStreamError::new(CreateStreamErrorKind::TimedOut), RequestErrorKind::Other => { - CreateStreamError::with_source(CreateStreamErrorKind::ResponseError, error) + CreateStreamError::with_source(CreateStreamErrorKind::Response, error) } RequestErrorKind::JetStream(err) => { - CreateStreamError::new(CreateStreamErrorKind::JetStreamError(err)) + CreateStreamError::new(CreateStreamErrorKind::JetStream(err)) } } } @@ -1383,9 +1380,9 @@ pub enum CreateStreamErrorKind { InvalidStreamName, DomainAndExternalSet, JetStreamUnavailable, - JetStreamError(super::errors::Error), + JetStream(super::errors::Error), TimedOut, - ResponseError, + Response, ResponseParse, } @@ -1401,10 +1398,10 @@ impl Display for GetStreamError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self.kind() { GetStreamErrorKind::EmptyName => write!(f, "empty name cannot be empty"), - GetStreamErrorKind::RequestError => { + GetStreamErrorKind::Request => { write!(f, "request error: {}", self.format_source()) } - GetStreamErrorKind::JetStreamError(err) => write!(f, "jetstream error: {}", err), + GetStreamErrorKind::JetStream(err) => write!(f, "jetstream error: {}", err), } } } @@ -1412,8 +1409,8 @@ impl Display for GetStreamError { #[derive(Debug, Clone, PartialEq)] pub enum GetStreamErrorKind { EmptyName, - RequestError, - JetStreamError(super::errors::Error), + Request, + JetStream(super::errors::Error), } pub type UpdateStreamError = CreateStreamError; @@ -1430,8 +1427,8 @@ pub struct KeyValueError { #[derive(Debug, Clone, Copy, PartialEq)] pub enum KeyValueErrorKind { InvalidStoreName, - FailedGetBucket, - JetStreamError, + GetBucket, + JetStream, } crate::error_impls!(KeyValueError, KeyValueErrorKind); @@ -1440,8 +1437,8 @@ impl Display for KeyValueError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self.kind() { KeyValueErrorKind::InvalidStoreName => write!(f, "invalid Key Value Store name"), - KeyValueErrorKind::FailedGetBucket => write!(f, "failed to get the bucket"), - KeyValueErrorKind::JetStreamError => { + KeyValueErrorKind::GetBucket => write!(f, "failed to get the bucket"), + KeyValueErrorKind::JetStream => { write!(f, "JetStream error: {}", self.format_source()) } } @@ -1458,8 +1455,8 @@ pub struct CreateKeyValueError { pub enum CreateKeyValueErrorKind { InvalidStoreName, TooLongHistory, - JetStreamError, - BucketCreationFailed, + JetStream, + BucketCreate, TimedOut, } @@ -1471,10 +1468,10 @@ impl Display for CreateKeyValueError { match self.kind() { CreateKeyValueErrorKind::InvalidStoreName => write!(f, "invalid Key Value Store name"), CreateKeyValueErrorKind::TooLongHistory => write!(f, "too long history"), - CreateKeyValueErrorKind::JetStreamError => { + CreateKeyValueErrorKind::JetStream => { write!(f, "JetStream error: {}", source) } - CreateKeyValueErrorKind::BucketCreationFailed => { + CreateKeyValueErrorKind::BucketCreate => { write!(f, "bucket creation failed: {}", source) } CreateKeyValueErrorKind::TimedOut => write!(f, "timed out"), @@ -1495,7 +1492,7 @@ crate::error_impls!(ObjectStoreError, ObjectStoreErrorKind); #[derive(Debug, Clone, Copy, PartialEq)] pub enum ObjectStoreErrorKind { InvalidBucketName, - FailedGetStore, + GetStore, } impl Display for ObjectStoreError { @@ -1504,7 +1501,7 @@ impl Display for ObjectStoreError { ObjectStoreErrorKind::InvalidBucketName => { write!(f, "invalid Object Store bucket name") } - ObjectStoreErrorKind::FailedGetStore => write!(f, "failed to get Object Store"), + ObjectStoreErrorKind::GetStore => write!(f, "failed to get Object Store"), } } } diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index 8dedb4f63..619e3ec6c 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -208,10 +208,10 @@ impl Store { .context .publish(subject, value) .await - .map_err(|err| PutError::with_source(PutErrorKind::PublishError, err))?; + .map_err(|err| PutError::with_source(PutErrorKind::Publish, err))?; let ack = publish_ack .await - .map_err(|err| PutError::with_source(PutErrorKind::AckError, err))?; + .map_err(|err| PutError::with_source(PutErrorKind::Ack, err))?; Ok(ack.sequence) } @@ -354,7 +354,7 @@ impl Store { Err(err) => match err.kind() { crate::jetstream::stream::LastRawMessageErrorKind::NoMessageFound => None, crate::jetstream::stream::LastRawMessageErrorKind::Other - | crate::jetstream::stream::LastRawMessageErrorKind::JetStreamError => { + | crate::jetstream::stream::LastRawMessageErrorKind::JetStream => { return Err(EntryError::with_source(EntryErrorKind::Other, err)) } }, @@ -1026,7 +1026,7 @@ pub struct StatusError { #[derive(Debug, PartialEq, Clone)] pub enum StatusErrorKind { - JetStreamError(crate::jetstream::Error), + JetStream(crate::jetstream::Error), TimedOut, } @@ -1035,7 +1035,7 @@ crate::error_impls!(StatusError, StatusErrorKind); impl Display for StatusError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self.kind.clone() { - StatusErrorKind::JetStreamError(err) => { + StatusErrorKind::JetStream(err) => { write!(f, "jetstream request failed: {}", err) } StatusErrorKind::TimedOut => write!(f, "timed out"), @@ -1052,8 +1052,8 @@ pub struct PutError { #[derive(Debug, PartialEq, Clone)] pub enum PutErrorKind { InvalidKey, - PublishError, - AckError, + Publish, + Ack, } crate::error_impls!(PutError, PutErrorKind); @@ -1061,10 +1061,10 @@ crate::error_impls!(PutError, PutErrorKind); impl Display for PutError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self.kind { - PutErrorKind::PublishError => { + PutErrorKind::Publish => { write!(f, "failed to put key into store: {}", self.format_source()) } - PutErrorKind::AckError => write!(f, "ack error: {}", self.format_source()), + PutErrorKind::Ack => write!(f, "ack error: {}", self.format_source()), PutErrorKind::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"), } } @@ -1170,14 +1170,14 @@ pub struct WatcherError { #[derive(Clone, Debug, PartialEq)] pub enum WatcherErrorKind { - ConsumerError, + Consumer, Other, } impl Display for WatcherError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self.kind { - WatcherErrorKind::ConsumerError => { + WatcherErrorKind::Consumer => { write!(f, "watcher consumer error: {}", self.format_source()) } WatcherErrorKind::Other => write!(f, "watcher error: {}", self.format_source()), @@ -1189,7 +1189,7 @@ crate::error_impls!(WatcherError, WatcherErrorKind); impl From for WatcherError { fn from(err: OrderedError) -> Self { - WatcherError::with_source(WatcherErrorKind::ConsumerError, err) + WatcherError::with_source(WatcherErrorKind::Consumer, err) } } diff --git a/async-nats/src/jetstream/stream.rs b/async-nats/src/jetstream/stream.rs index 4885ff134..b4858c2ab 100644 --- a/async-nats/src/jetstream/stream.rs +++ b/async-nats/src/jetstream/stream.rs @@ -57,7 +57,7 @@ pub enum DirectGetErrorKind { NotFound, InvalidSubject, TimedOut, - FailedRequest, + Request, ErrorResponse(StatusCode, String), Other, } @@ -75,7 +75,7 @@ impl Display for DirectGetError { write!(f, "error getting message: {}", source) } DirectGetErrorKind::TimedOut => write!(f, "timed out"), - DirectGetErrorKind::FailedRequest => write!(f, "request failed: {}", source), + DirectGetErrorKind::Request => write!(f, "request failed: {}", source), } } } @@ -106,9 +106,9 @@ pub struct DeleteMessageError { #[derive(Debug, Clone, PartialEq)] pub enum DeleteMessageErrorKind { - FailedRequest, + Request, TimedOut, - JetStreamError(super::errors::Error), + JetStream(super::errors::Error), } crate::error_impls!(DeleteMessageError, DeleteMessageErrorKind); @@ -116,9 +116,9 @@ impl Display for DeleteMessageError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let source = self.format_source(); match &self.kind { - DeleteMessageErrorKind::FailedRequest => write!(f, "request failed: {}", source), + DeleteMessageErrorKind::Request => write!(f, "request failed: {}", source), DeleteMessageErrorKind::TimedOut => write!(f, "timed out"), - DeleteMessageErrorKind::JetStreamError(err) => write!(f, "JetStream error: {}", err), + DeleteMessageErrorKind::JetStream(err) => write!(f, "JetStream error: {}", err), } } } @@ -599,9 +599,7 @@ impl Stream { LastRawMessageErrorKind::NoMessageFound, )) } else { - Err(LastRawMessageError::new( - LastRawMessageErrorKind::JetStreamError, - )) + Err(LastRawMessageError::new(LastRawMessageErrorKind::JetStream)) } } Response::Ok(value) => Ok(value.message), @@ -644,13 +642,13 @@ impl Stream { RequestErrorKind::TimedOut => { DeleteMessageError::new(DeleteMessageErrorKind::TimedOut) } - _ => DeleteMessageError::with_source(DeleteMessageErrorKind::FailedRequest, err), + _ => DeleteMessageError::with_source(DeleteMessageErrorKind::Request, err), }) .await?; match response { Response::Err { error } => Err(DeleteMessageError::new( - DeleteMessageErrorKind::JetStreamError(error), + DeleteMessageErrorKind::JetStream(error), )), Response::Ok(value) => Ok(value.success), } @@ -771,9 +769,7 @@ impl Stream { ) .await? { - Response::Err { error } => { - Err(ConsumerError::new(ConsumerErrorKind::JetStreamError(error))) - } + Response::Err { error } => Err(ConsumerError::new(ConsumerErrorKind::JetStream(error))), Response::Ok::(info) => Ok(Consumer::new( FromConsumer::try_from_consumer_config(info.clone().config) .map_err(|err| ConsumerError::with_source(ConsumerErrorKind::Other, err))?, @@ -1546,9 +1542,9 @@ pub struct PurgeError { #[derive(Debug, Clone, PartialEq)] pub enum PurgeErrorKind { - FailedRequest, + Request, TimedOut, - JetStreamError(super::errors::Error), + JetStream(super::errors::Error), } crate::error_impls!(PurgeError, PurgeErrorKind); @@ -1556,9 +1552,9 @@ impl Display for PurgeError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let source = self.format_source(); match &self.kind { - PurgeErrorKind::FailedRequest => write!(f, "request failed: {}", source), + PurgeErrorKind::Request => write!(f, "request failed: {}", source), PurgeErrorKind::TimedOut => write!(f, "timed out"), - PurgeErrorKind::JetStreamError(err) => write!(f, "JetStream error: {}", err), + PurgeErrorKind::JetStream(err) => write!(f, "JetStream error: {}", err), } } } @@ -1581,14 +1577,12 @@ where .request(request_subject, &self.inner) .map_err(|err| match err.kind() { RequestErrorKind::TimedOut => PurgeError::new(PurgeErrorKind::TimedOut), - _ => PurgeError::with_source(PurgeErrorKind::FailedRequest, err), + _ => PurgeError::with_source(PurgeErrorKind::Request, err), }) .await?; match response { - Response::Err { error } => { - Err(PurgeError::new(PurgeErrorKind::JetStreamError(error))) - } + Response::Err { error } => Err(PurgeError::new(PurgeErrorKind::JetStream(error))), Response::Ok(response) => Ok(response), } })) @@ -1784,7 +1778,7 @@ impl fmt::Display for LastRawMessageError { "failed to get last raw message: {}", self.format_source() ), - LastRawMessageErrorKind::JetStreamError => { + LastRawMessageErrorKind::JetStream => { write!( f, "JetStream error: {}", @@ -1800,7 +1794,7 @@ impl fmt::Display for LastRawMessageError { #[derive(Debug, PartialEq, Clone, Copy)] pub enum LastRawMessageErrorKind { NoMessageFound, - JetStreamError, + JetStream, Other, } @@ -1817,8 +1811,8 @@ impl Display for ConsumerError { let source = self.format_source(); match &self.kind() { ConsumerErrorKind::TimedOut => write!(f, "timed out"), - ConsumerErrorKind::RequestFailed => write!(f, "request failed: {}", source), - ConsumerErrorKind::JetStreamError(err) => write!(f, "JetStream error: {}", err), + ConsumerErrorKind::Request => write!(f, "request failed: {}", source), + ConsumerErrorKind::JetStream(err) => write!(f, "JetStream error: {}", err), ConsumerErrorKind::Other => write!(f, "consumer error: {}", source), ConsumerErrorKind::InvalidConsumerType => { write!(f, "invalid consumer type: {}", source) @@ -1831,14 +1825,14 @@ impl From for ConsumerError { fn from(err: super::context::RequestError) -> Self { match err.kind() { RequestErrorKind::TimedOut => ConsumerError::new(ConsumerErrorKind::TimedOut), - _ => ConsumerError::with_source(ConsumerErrorKind::RequestFailed, err), + _ => ConsumerError::with_source(ConsumerErrorKind::Request, err), } } } impl From for ConsumerError { fn from(err: super::errors::Error) -> Self { - ConsumerError::new(ConsumerErrorKind::JetStreamError(err)) + ConsumerError::new(ConsumerErrorKind::JetStream(err)) } } @@ -1846,8 +1840,8 @@ impl From for ConsumerError { pub enum ConsumerErrorKind { //TODO: get last should have timeout, which should be mapped here. TimedOut, - RequestFailed, + Request, InvalidConsumerType, - JetStreamError(super::errors::Error), + JetStream(super::errors::Error), Other, }