Skip to content

Commit

Permalink
Make Context::publish return IntoFuture builder
Browse files Browse the repository at this point in the history
Co-authored-by: Casper Beyer <[email protected]>
  • Loading branch information
n1ghtmare and caspervonb committed Jul 17, 2023
1 parent 94a4e6b commit 67f29a0
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 191 deletions.
125 changes: 54 additions & 71 deletions async-nats/src/jetstream/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,8 @@ impl Context {
/// # Ok(())
/// # }
/// ```
pub async fn publish(
&self,
subject: String,
payload: Bytes,
) -> Result<PublishAckFuture, PublishError> {
self.send_publish(subject, Publish::build().payload(payload))
.await
pub fn publish(&self, subject: String, payload: Bytes) -> Publish {
Publish::new(self.clone(), subject, payload)
}

/// Publish a message with headers to a given subject associated with a stream and returns an acknowledgment from
Expand Down Expand Up @@ -162,67 +157,8 @@ impl Context {
headers: crate::header::HeaderMap,
payload: Bytes,
) -> Result<PublishAckFuture, PublishError> {
self.send_publish(subject, Publish::build().payload(payload).headers(headers))
.await
}

/// Publish a message built by [Publish] and returns an acknowledgment future.
///
/// If the stream does not exist, `no responders` error will be returned.
///
/// # Examples
///
/// ```no_run
/// # use async_nats::jetstream::context::Publish;
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("localhost:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
///
/// let ack = jetstream
/// .send_publish(
/// "events".to_string(),
/// Publish::build().payload("data".into()).message_id("uuid"),
/// )
/// .await?;
/// # Ok(())
/// # }
/// ```
pub async fn send_publish(
&self,
subject: String,
publish: Publish,
) -> Result<PublishAckFuture, PublishError> {
let inbox = self.client.new_inbox();
let response = self
.client
.subscribe(inbox.clone())
.await
.map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?;
tokio::time::timeout(self.timeout, async {
if let Some(headers) = publish.headers {
self.client
.publish_with_reply_and_headers(
subject,
inbox.clone(),
headers,
publish.payload,
)
.await
} else {
self.client
.publish_with_reply(subject, inbox.clone(), publish.payload)
.await
}
})
.map_err(|_| PublishError::new(PublishErrorKind::TimedOut))
.await?
.map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?;

Ok(PublishAckFuture {
timeout: self.timeout,
subscription: response,
})
let ack_future = self.publish(subject, payload).headers(headers).await?;
Ok(ack_future)
}

/// Query the server for account information
Expand Down Expand Up @@ -1202,15 +1138,23 @@ impl futures::Stream for Streams<'_> {
}
}
/// Used for building customized `publish` message.
#[derive(Default, Clone, Debug)]
#[derive(Clone, Debug)]
pub struct Publish {
context: Context,
subject: String,
payload: Bytes,
headers: Option<header::HeaderMap>,
}

impl Publish {
/// Creates a new custom Publish struct to be used with.
pub fn build() -> Self {
Default::default()
pub(crate) fn new(context: Context, subject: String, payload: Bytes) -> Self {
Publish {
context,
subject,
payload,
headers: None,
}
}

/// Sets the payload for the message.
Expand Down Expand Up @@ -1268,6 +1212,45 @@ impl Publish {
}
}

impl IntoFuture for Publish {
type Output = Result<PublishAckFuture, PublishError>;
type IntoFuture = Pin<Box<dyn Future<Output = Result<PublishAckFuture, PublishError>> + Send>>;

fn into_future(self) -> Self::IntoFuture {
Box::pin(std::future::IntoFuture::into_future(async move {
let inbox = self.context.client.new_inbox();
let subscription = self
.context
.client
.subscribe(inbox.clone())
.map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))
.await?;

let mut publish = self
.context
.client
.publish(self.subject, self.payload)
.reply(inbox);

if let Some(headers) = self.headers {
publish = publish.headers(headers);
}

let timeout = self.context.timeout;

tokio::time::timeout(timeout, publish.into_future())
.map_err(|_| PublishError::new(PublishErrorKind::TimedOut))
.await?
.map_err(|_| PublishError::new(PublishErrorKind::TimedOut))?;

Ok(PublishAckFuture {
timeout,
subscription,
})
}))
}
}

#[derive(Debug)]
pub struct RequestError {
kind: RequestErrorKind,
Expand Down
Loading

0 comments on commit 67f29a0

Please sign in to comment.