From 67f29a090eb132030e64b1efe7fa1d1df1fc3ddb Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Thu, 22 Jun 2023 18:53:33 +0300 Subject: [PATCH] Make `Context::publish` return `IntoFuture` builder Co-authored-by: Casper Beyer --- async-nats/src/jetstream/context.rs | 125 +++++++---------- async-nats/tests/jetstream_tests.rs | 209 ++++++++++++---------------- 2 files changed, 143 insertions(+), 191 deletions(-) diff --git a/async-nats/src/jetstream/context.rs b/async-nats/src/jetstream/context.rs index 239d73a2e..30d666c7e 100644 --- a/async-nats/src/jetstream/context.rs +++ b/async-nats/src/jetstream/context.rs @@ -126,13 +126,8 @@ impl Context { /// # Ok(()) /// # } /// ``` - pub async fn publish( - &self, - subject: String, - payload: Bytes, - ) -> Result { - self.send_publish(subject, Publish::build().payload(payload)) - .await + pub fn publish(&self, subject: String, payload: Bytes) -> Publish { + Publish::new(self.clone(), subject, payload) } /// Publish a message with headers to a given subject associated with a stream and returns an acknowledgment from @@ -162,67 +157,8 @@ impl Context { headers: crate::header::HeaderMap, payload: Bytes, ) -> Result { - self.send_publish(subject, Publish::build().payload(payload).headers(headers)) - .await - } - - /// Publish a message built by [Publish] and returns an acknowledgment future. - /// - /// If the stream does not exist, `no responders` error will be returned. - /// - /// # Examples - /// - /// ```no_run - /// # use async_nats::jetstream::context::Publish; - /// # #[tokio::main] - /// # async fn main() -> Result<(), async_nats::Error> { - /// let client = async_nats::connect("localhost:4222").await?; - /// let jetstream = async_nats::jetstream::new(client); - /// - /// let ack = jetstream - /// .send_publish( - /// "events".to_string(), - /// Publish::build().payload("data".into()).message_id("uuid"), - /// ) - /// .await?; - /// # Ok(()) - /// # } - /// ``` - pub async fn send_publish( - &self, - subject: String, - publish: Publish, - ) -> Result { - let inbox = self.client.new_inbox(); - let response = self - .client - .subscribe(inbox.clone()) - .await - .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?; - tokio::time::timeout(self.timeout, async { - if let Some(headers) = publish.headers { - self.client - .publish_with_reply_and_headers( - subject, - inbox.clone(), - headers, - publish.payload, - ) - .await - } else { - self.client - .publish_with_reply(subject, inbox.clone(), publish.payload) - .await - } - }) - .map_err(|_| PublishError::new(PublishErrorKind::TimedOut)) - .await? - .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?; - - Ok(PublishAckFuture { - timeout: self.timeout, - subscription: response, - }) + let ack_future = self.publish(subject, payload).headers(headers).await?; + Ok(ack_future) } /// Query the server for account information @@ -1202,15 +1138,23 @@ impl futures::Stream for Streams<'_> { } } /// Used for building customized `publish` message. -#[derive(Default, Clone, Debug)] +#[derive(Clone, Debug)] pub struct Publish { + context: Context, + subject: String, payload: Bytes, headers: Option, } + impl Publish { /// Creates a new custom Publish struct to be used with. - pub fn build() -> Self { - Default::default() + pub(crate) fn new(context: Context, subject: String, payload: Bytes) -> Self { + Publish { + context, + subject, + payload, + headers: None, + } } /// Sets the payload for the message. @@ -1268,6 +1212,45 @@ impl Publish { } } +impl IntoFuture for Publish { + type Output = Result; + type IntoFuture = Pin> + Send>>; + + fn into_future(self) -> Self::IntoFuture { + Box::pin(std::future::IntoFuture::into_future(async move { + let inbox = self.context.client.new_inbox(); + let subscription = self + .context + .client + .subscribe(inbox.clone()) + .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err)) + .await?; + + let mut publish = self + .context + .client + .publish(self.subject, self.payload) + .reply(inbox); + + if let Some(headers) = self.headers { + publish = publish.headers(headers); + } + + let timeout = self.context.timeout; + + tokio::time::timeout(timeout, publish.into_future()) + .map_err(|_| PublishError::new(PublishErrorKind::TimedOut)) + .await? + .map_err(|_| PublishError::new(PublishErrorKind::TimedOut))?; + + Ok(PublishAckFuture { + timeout, + subscription, + }) + })) + } +} + #[derive(Debug)] pub struct RequestError { kind: RequestErrorKind, diff --git a/async-nats/tests/jetstream_tests.rs b/async-nats/tests/jetstream_tests.rs index d4c90dd0b..e9ac055b0 100644 --- a/async-nats/tests/jetstream_tests.rs +++ b/async-nats/tests/jetstream_tests.rs @@ -31,12 +31,12 @@ mod jetstream { use super::*; use async_nats::connection::State; - use async_nats::header::{self, HeaderMap, NATS_MESSAGE_ID}; + use async_nats::header::{self, HeaderMap}; use async_nats::jetstream::consumer::{ self, AckPolicy, DeliverPolicy, Info, OrderedPullConsumer, OrderedPushConsumer, PullConsumer, PushConsumer, }; - use async_nats::jetstream::context::{Publish, PublishErrorKind}; + use async_nats::jetstream::context::PublishErrorKind; use async_nats::jetstream::response::Response; use async_nats::jetstream::stream::{self, DiscardPolicy, StorageType}; use async_nats::jetstream::AckKind; @@ -60,11 +60,10 @@ mod jetstream { } #[tokio::test] - async fn publish_with_headers() { + async fn publish_headers() { let server = nats_server::run_server("tests/configs/jetstream.conf"); let client = async_nats::connect(server.client_url()).await.unwrap(); let context = async_nats::jetstream::new(client); - let _stream = context .create_stream(stream::Config { name: "TEST".to_string(), @@ -73,28 +72,27 @@ mod jetstream { }) .await .unwrap(); - let headers = HeaderMap::new(); let payload = b"Hello JetStream"; let ack = context - .publish_with_headers("foo".into(), headers, payload.as_ref().into()) + .publish("foo".into(), payload.as_ref().into()) + .headers(headers) .await .unwrap() .await .unwrap(); - assert_eq!(ack.stream, "TEST"); assert_eq!(ack.sequence, 1); } #[tokio::test] - async fn publish_async() { + async fn publish_with_headers() { let server = nats_server::run_server("tests/configs/jetstream.conf"); let client = async_nats::connect(server.client_url()).await.unwrap(); let context = async_nats::jetstream::new(client); - context + let _stream = context .create_stream(stream::Config { name: "TEST".to_string(), subjects: vec!["foo".into(), "bar".into(), "baz".into()], @@ -103,174 +101,145 @@ mod jetstream { .await .unwrap(); + let headers = HeaderMap::new(); + let payload = b"Hello JetStream"; + let ack = context - .publish("foo".to_string(), "payload".into()) + .publish_with_headers("foo".into(), headers, payload.as_ref().into()) .await - .unwrap(); - assert!(ack.await.is_ok()); - let ack = context - .publish("not_stream".to_string(), "payload".into()) + .unwrap() .await .unwrap(); - assert!(ack.await.is_err()); + + assert_eq!(ack.stream, "TEST"); + assert_eq!(ack.sequence, 1); } #[tokio::test] - async fn send_publish() { + async fn publish_control() { let server = nats_server::run_server("tests/configs/jetstream.conf"); let client = async_nats::connect(server.client_url()).await.unwrap(); let context = async_nats::jetstream::new(client); - let mut stream = context .create_stream(stream::Config { name: "TEST".to_string(), subjects: vec!["foo".into(), "bar".into(), "baz".into()], - allow_direct: true, ..Default::default() }) .await .unwrap(); let id = "UUID".to_string(); - // Publish first message - context - .send_publish( - "foo".to_string(), - Publish::build() - .message_id(id.clone()) - .payload("data".into()), - ) - .await - .unwrap() - .await - .unwrap(); - // Publish second message, a duplicate. + + // Publish duplicate messages + for _ in 0..3 { + context + .publish("foo".to_string(), "data".into()) + .message_id(id.clone()) + .await + .unwrap() + .await + .unwrap(); + } + + let info = stream.info().await.unwrap(); + assert_eq!(1, info.state.messages); + context - .send_publish("foo".to_string(), Publish::build().message_id(id.clone())) + .publish("foo".to_string(), "data".into()) + .expected_last_message_id(id.clone()) .await .unwrap() .await .unwrap(); - // Check if we still have one message. + let info = stream.info().await.unwrap(); - assert_eq!(1, info.state.messages); - let message = stream - .direct_get_last_for_subject("foo".to_string()) - .await - .unwrap(); - assert_eq!(message.payload, bytes::Bytes::from("data")); + assert_eq!(2, info.state.messages); - // Publish message with different ID and expect error. - let err = context - .send_publish( - "foo".to_string(), - Publish::build().expected_last_message_id("BAD_ID"), - ) - .await - .unwrap() - .await - .unwrap_err() - .kind(); - assert_eq!(err, PublishErrorKind::WrongLastMessageId); - // Publish a new message with expected ID. context - .send_publish( - "foo".to_string(), - Publish::build().expected_last_message_id(id.clone()), - ) + .publish("foo".to_string(), "data".into()) + .expected_last_message_id("invalid") .await .unwrap() .await - .unwrap(); + .unwrap_err(); + + let info = stream.info().await.unwrap(); + assert_eq!(2, info.state.messages); - // We should have now two messages. Check it. context - .send_publish( - "foo".to_string(), - Publish::build().expected_last_sequence(2), - ) + .publish("foo".to_string(), "data".into()) + .expected_last_sequence(2) .await .unwrap() .await .unwrap(); - // 3 messages should be there, so this should error. - assert_eq!( - context - .send_publish( - "foo".to_string(), - Publish::build().expected_last_sequence(2), - ) - .await - .unwrap() - .await - .unwrap_err() - .kind(), - PublishErrorKind::WrongLastSequence - ); - // 3 messages there, should be ok for this subject too. + + let info = stream.info().await.unwrap(); + assert_eq!(3, info.state.messages); + context - .send_publish( - "foo".to_string(), - Publish::build().expected_last_subject_sequence(3), - ) + .publish("bar".to_string(), "data".into()) + .expected_last_sequence(3) .await .unwrap() .await .unwrap(); - // 4 messages there, should error. - assert_eq!( - context - .send_publish( - "foo".to_string(), - Publish::build().expected_last_subject_sequence(3), - ) - .await - .unwrap() - .await - .unwrap_err() - .kind(), - PublishErrorKind::WrongLastSequence - ); - // Check if it works for the other subjects in the stream. + let info = stream.info().await.unwrap(); + assert_eq!(4, info.state.messages); + context - .send_publish( - "bar".to_string(), - Publish::build().expected_last_subject_sequence(0), - ) + .publish("foo".to_string(), "data".into()) + .expected_last_subject_sequence(3) .await .unwrap() .await .unwrap(); - // Sequence is now 1, so this should fail. + + let info = stream.info().await.unwrap(); + assert_eq!(5, info.state.messages); + context - .send_publish( - "bar".to_string(), - Publish::build().expected_last_subject_sequence(0), - ) + .publish("foo".to_string(), "data".into()) + .expected_last_sequence(1) .await .unwrap() .await .unwrap_err(); - // test header shorthand - assert_eq!(stream.info().await.unwrap().state.messages, 5); - context - .send_publish( - "foo".to_string(), - Publish::build().header(NATS_MESSAGE_ID, id.as_str()), - ) - .await - .unwrap() - .await - .unwrap(); - // above message should be ignored. - assert_eq!(stream.info().await.unwrap().state.messages, 5); + + let info = stream.info().await.unwrap(); + assert_eq!(5, info.state.messages); + context - .send_publish("bar".to_string(), Publish::build().expected_stream("TEST")) + .publish("foo".to_string(), "data".into()) + .expected_last_subject_sequence(1) .await .unwrap() .await - .unwrap(); + .unwrap_err(); + + let info = stream.info().await.unwrap(); + assert_eq!(5, info.state.messages); + + let subjects = ["foo", "bar", "baz"]; + for subject in subjects { + context + .publish(subject.into(), "data".into()) + .expected_stream("TEST") + .await + .unwrap() + .await + .unwrap(); + + context + .publish(subject.into(), "data".into()) + .expected_stream("INVALID") + .await + .unwrap() + .await + .unwrap_err(); + } } #[tokio::test]