Skip to content

Commit

Permalink
Refactor Errors names to be less redundant
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
Jarema committed Jul 11, 2023
1 parent 6390b64 commit 5003a23
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 143 deletions.
67 changes: 29 additions & 38 deletions async-nats/src/jetstream/consumer/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,22 +462,17 @@ impl<'a> futures::Stream for Sequence<'a> {

self.next = Some(Box::pin(async move {
let inbox = context.client.new_inbox();
let subscriber =
context
.client
.subscribe(inbox.clone())
.await
.map_err(|err| {
MessagesError::with_source(MessagesErrorKind::PullFailed, err)
})?;
let subscriber = context
.client
.subscribe(inbox.clone())
.await
.map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;

context
.client
.publish_with_reply(subject, inbox, request)
.await
.map_err(|err| {
MessagesError::with_source(MessagesErrorKind::PullFailed, err)
})?;
.map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;

// TODO(tp): Add timeout config and defaults.
Ok(Batch {
Expand All @@ -493,7 +488,7 @@ impl<'a> futures::Stream for Sequence<'a> {
Poll::Ready(result) => {
self.next = None;
Poll::Ready(Some(result.map_err(|err| {
MessagesError::with_source(MessagesErrorKind::PullFailed, err)
MessagesError::with_source(MessagesErrorKind::Pull, err)
})))
}
Poll::Pending => Poll::Pending,
Expand All @@ -504,7 +499,7 @@ impl<'a> futures::Stream for Sequence<'a> {
Poll::Ready(result) => {
self.next = None;
Poll::Ready(Some(result.map_err(|err| {
MessagesError::with_source(MessagesErrorKind::PullFailed, err)
MessagesError::with_source(MessagesErrorKind::Pull, err)
})))
}
Poll::Pending => Poll::Pending,
Expand Down Expand Up @@ -822,7 +817,7 @@ impl<'a> futures::Stream for Ordered<'a> {
}
Err(err) => {
return Poll::Ready(Some(Err(OrderedError::with_source(
OrderedErrorKind::RecreationFailed,
OrderedErrorKind::Recreate,
err,
))))
}
Expand Down Expand Up @@ -1013,12 +1008,12 @@ impl std::fmt::Display for OrderedError {
match &self.kind() {
OrderedErrorKind::MissingHeartbeat => write!(f, "missed idle heartbeat"),
OrderedErrorKind::ConsumerDeleted => write!(f, "consumer deleted"),
OrderedErrorKind::PullFailed => {
OrderedErrorKind::Pull => {
write!(f, "pull request failed: {}", self.format_source())
}
OrderedErrorKind::Other => write!(f, "error: {}", self.format_source()),
OrderedErrorKind::PushBasedConsumer => write!(f, "cannot use with push consumer"),
OrderedErrorKind::RecreationFailed => write!(f, "consumer recreation failed"),
OrderedErrorKind::Recreate => write!(f, "consumer recreation failed"),
}
}
}
Expand All @@ -1034,8 +1029,8 @@ impl From<MessagesError> for OrderedError {
MessagesErrorKind::ConsumerDeleted => {
OrderedError::new(OrderedErrorKind::ConsumerDeleted)
}
MessagesErrorKind::PullFailed => OrderedError {
kind: OrderedErrorKind::PullFailed,
MessagesErrorKind::Pull => OrderedError {
kind: OrderedErrorKind::Pull,
source: err.source,
},
MessagesErrorKind::PushBasedConsumer => {
Expand All @@ -1053,9 +1048,9 @@ impl From<MessagesError> for OrderedError {
pub enum OrderedErrorKind {
MissingHeartbeat,
ConsumerDeleted,
PullFailed,
Pull,
PushBasedConsumer,
RecreationFailed,
Recreate,
Other,
}

Expand All @@ -1070,7 +1065,7 @@ impl std::fmt::Display for MessagesError {
match &self.kind() {
MessagesErrorKind::MissingHeartbeat => write!(f, "missed idle heartbeat"),
MessagesErrorKind::ConsumerDeleted => write!(f, "consumer deleted"),
MessagesErrorKind::PullFailed => {
MessagesErrorKind::Pull => {
write!(f, "pull request failed: {}", self.format_source())
}
MessagesErrorKind::Other => write!(f, "error: {}", self.format_source()),
Expand All @@ -1085,7 +1080,7 @@ crate::error_impls!(MessagesError, MessagesErrorKind);
pub enum MessagesErrorKind {
MissingHeartbeat,
ConsumerDeleted,
PullFailed,
Pull,
PushBasedConsumer,
Other,
}
Expand Down Expand Up @@ -1151,7 +1146,7 @@ impl futures::Stream for Stream {
}
Err(err) => {
return Poll::Ready(Some(Err(MessagesError::with_source(
MessagesErrorKind::PullFailed,
MessagesErrorKind::Pull,
err,
))))
}
Expand Down Expand Up @@ -2256,7 +2251,7 @@ crate::error_impls!(BatchError, BatchErrorKind);
impl std::fmt::Display for BatchError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.kind() {
BatchErrorKind::PullFailed => {
BatchErrorKind::Pull => {
write!(f, "pull request failed: {}", self.format_source())
}
BatchErrorKind::Flush => {
Expand All @@ -2278,14 +2273,14 @@ impl From<SubscribeError> for BatchError {

impl From<BatchRequestError> for BatchError {
fn from(err: BatchRequestError) -> Self {
BatchError::with_source(BatchErrorKind::PullFailed, err)
BatchError::with_source(BatchErrorKind::Pull, err)
}
}

#[derive(Debug, Clone, Copy, PartialEq)]
pub enum BatchErrorKind {
Subscribe,
PullFailed,
Pull,
Flush,
Serialize,
}
Expand All @@ -2301,10 +2296,10 @@ crate::error_impls!(ConsumerRecreateError, ConsumerRecreateErrorKind);
impl std::fmt::Display for ConsumerRecreateError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.kind() {
ConsumerRecreateErrorKind::StreamGetFailed => {
ConsumerRecreateErrorKind::GetStream => {
write!(f, "error getting stream: {}", self.format_source())
}
ConsumerRecreateErrorKind::RecreationFailed => {
ConsumerRecreateErrorKind::Recreate => {
write!(f, "consumer creation failed: {}", self.format_source())
}
ConsumerRecreateErrorKind::TimedOut => write!(f, "timed out"),
Expand All @@ -2314,8 +2309,8 @@ impl std::fmt::Display for ConsumerRecreateError {

#[derive(Debug, Clone, PartialEq, Copy)]
pub enum ConsumerRecreateErrorKind {
StreamGetFailed,
RecreationFailed,
GetStream,
Recreate,
TimedOut,
}

Expand All @@ -2331,13 +2326,13 @@ async fn recreate_consumer_stream(
.get_stream(stream_name.clone())
.await
.map_err(|err| {
ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::StreamGetFailed, err)
ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::GetStream, err)
})?;
stream
.delete_consumer(&consumer_name)
.await
.map_err(|err| {
ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::RecreationFailed, err)
ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err)
})?;

let deliver_policy = {
Expand All @@ -2358,12 +2353,8 @@ async fn recreate_consumer_stream(
)
.await
.map_err(|_| ConsumerRecreateError::new(ConsumerRecreateErrorKind::TimedOut))?
.map_err(|err| {
ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::RecreationFailed, err)
})?
.map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err))?
.messages()
.map_err(|err| {
ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::RecreationFailed, err)
})
.map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err))
.await
}
31 changes: 13 additions & 18 deletions async-nats/src/jetstream/consumer/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,10 +591,7 @@ impl<'a> futures::Stream for Ordered<'a> {
Poll::Ready(subscriber) => {
self.subscriber_future = None;
self.subscriber = Some(subscriber.map_err(|err| {
OrderedError::with_source(
OrderedErrorKind::RecreationFailed,
err,
)
OrderedError::with_source(OrderedErrorKind::Recreate, err)
})?);
}
Poll::Pending => {
Expand All @@ -607,7 +604,7 @@ impl<'a> futures::Stream for Ordered<'a> {
self.subscriber_future = None;
self.consumer_sequence.store(0, Ordering::Relaxed);
self.subscriber = Some(subscriber.map_err(|err| {
OrderedError::with_source(OrderedErrorKind::RecreationFailed, err)
OrderedError::with_source(OrderedErrorKind::Recreate, err)
})?);
}
Poll::Pending => {
Expand Down Expand Up @@ -727,7 +724,7 @@ impl std::fmt::Display for OrderedError {
OrderedErrorKind::ConsumerDeleted => write!(f, "consumer deleted"),
OrderedErrorKind::Other => write!(f, "error: {}", self.format_source()),
OrderedErrorKind::PullBasedConsumer => write!(f, "cannot use with push consumer"),
OrderedErrorKind::RecreationFailed => write!(f, "consumer recreation failed"),
OrderedErrorKind::Recreate => write!(f, "consumer recreation failed"),
}
}
}
Expand Down Expand Up @@ -759,7 +756,7 @@ pub enum OrderedErrorKind {
MissingHeartbeat,
ConsumerDeleted,
PullBasedConsumer,
RecreationFailed,
Recreate,
Other,
}

Expand Down Expand Up @@ -801,23 +798,23 @@ crate::error_impls!(ConsumerRecreateError, ConsumerRecreateErrorKind);
impl std::fmt::Display for ConsumerRecreateError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.kind() {
ConsumerRecreateErrorKind::StreamGetFailed => {
ConsumerRecreateErrorKind::GetStream => {
write!(f, "error getting stream: {}", self.format_source())
}
ConsumerRecreateErrorKind::RecreationFailed => {
ConsumerRecreateErrorKind::Recreate => {
write!(f, "consumer creation failed: {}", self.format_source())
}
ConsumerRecreateErrorKind::TimedOut => write!(f, "timed out"),
ConsumerRecreateErrorKind::SubscriptionFailed => write!(f, "failed to resubscribe"),
ConsumerRecreateErrorKind::Subscription => write!(f, "failed to resubscribe"),
}
}
}

#[derive(Debug, Clone, PartialEq, Copy)]
pub enum ConsumerRecreateErrorKind {
StreamGetFailed,
SubscriptionFailed,
RecreationFailed,
GetStream,
Subscription,
Recreate,
TimedOut,
}

Expand All @@ -832,7 +829,7 @@ async fn recreate_consumer_and_subscription(
.subscribe(config.deliver_subject.clone())
.await
.map_err(|err| {
ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::SubscriptionFailed, err)
ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Subscription, err)
})?;

recreate_ephemeral_consumer(context, config, stream_name, sequence).await?;
Expand All @@ -848,7 +845,7 @@ async fn recreate_ephemeral_consumer(
.get_stream(stream_name.clone())
.await
.map_err(|err| {
ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::StreamGetFailed, err)
ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::GetStream, err)
})?;

let deliver_policy = {
Expand All @@ -869,8 +866,6 @@ async fn recreate_ephemeral_consumer(
)
.await
.map_err(|_| ConsumerRecreateError::new(ConsumerRecreateErrorKind::TimedOut))?
.map_err(|err| {
ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::RecreationFailed, err)
})?;
.map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err))?;
Ok(())
}
Loading

0 comments on commit 5003a23

Please sign in to comment.