Skip to content

Commit

Permalink
Rework get_raw_message to return StreamMessage
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
Jarema committed Aug 2, 2024
1 parent bfaaf5d commit f35f44e
Show file tree
Hide file tree
Showing 6 changed files with 273 additions and 177 deletions.
239 changes: 127 additions & 112 deletions async-nats/src/jetstream/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,42 +21,31 @@ use std::{
task::Poll,
};

use crate::{HeaderValue, StatusCode};
use crate::HeaderValue;
use bytes::Bytes;
use futures::StreamExt;
use once_cell::sync::Lazy;
use regex::Regex;
use time::{format_description::well_known::Rfc3339, OffsetDateTime};
use time::OffsetDateTime;
use tracing::debug;

use crate::error::Error;
use crate::{header, Message};
use crate::header;

use self::bucket::Status;

use super::{
consumer::{push::OrderedError, DeliverPolicy, StreamError, StreamErrorKind},
context::{PublishError, PublishErrorKind},
message::StreamMessage,
stream::{
self, ConsumerError, ConsumerErrorKind, DirectGetError, DirectGetErrorKind, RawMessage,
Republish, Source, StorageType, Stream,
self, ConsumerError, ConsumerErrorKind, DirectGetError, DirectGetErrorKind, Republish,
Source, StorageType, Stream,
},
};

fn kv_operation_from_stream_message(message: &RawMessage) -> Operation {
match message.headers.as_deref() {
Some(headers) => headers.parse().unwrap_or(Operation::Put),
None => Operation::Put,
}
}

fn kv_operation_from_message(message: &Message) -> Result<Operation, EntryError> {
let headers = message
.headers
.as_ref()
.ok_or_else(|| EntryError::with_source(EntryErrorKind::Other, "missing headers"))?;

if let Some(op) = headers.get(KV_OPERATION) {
fn kv_operation_from_stream_message(message: &StreamMessage) -> Result<Operation, EntryError> {
if let Some(op) = message.headers.get(KV_OPERATION) {
Operation::from_str(op.as_str())
.map_err(|err| EntryError::with_source(EntryErrorKind::Other, err))
} else {
Expand All @@ -66,6 +55,18 @@ fn kv_operation_from_message(message: &Message) -> Result<Operation, EntryError>
))
}
}
fn kv_operation_from_message(message: &crate::message::Message) -> Result<Operation, EntryError> {
let headers = match message.headers.as_ref() {
Some(headers) => headers,
None => return Ok(Operation::Put),
};
if let Some(op) = headers.get(KV_OPERATION) {
Operation::from_str(op.as_str())
.map_err(|err| EntryError::with_source(EntryErrorKind::Other, err))
} else {
Ok(Operation::Put)
}
}

static VALID_BUCKET_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[a-zA-Z0-9_-]+\z").unwrap());
static VALID_KEY_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\A[-/_=\.a-zA-Z0-9]+\z").unwrap());
Expand Down Expand Up @@ -312,90 +313,44 @@ impl Store {
Ok(ack.sequence)
}

/// Retrieves the last [Entry] for a given key from a bucket.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
/// let kv = jetstream
/// .create_key_value(async_nats::jetstream::kv::Config {
/// bucket: "kv".to_string(),
/// history: 10,
/// ..Default::default()
/// })
/// .await?;
/// let status = kv.put("key", "value".into()).await?;
/// let entry = kv.entry("key").await?;
/// println!("entry: {:?}", entry);
/// # Ok(())
/// # }
/// ```
pub async fn entry<T: Into<String>>(&self, key: T) -> Result<Option<Entry>, EntryError> {
async fn entry_maybe_revision<T: Into<String>>(
&self,
key: T,
revision: Option<u64>,
) -> Result<Option<Entry>, EntryError> {
let key: String = key.into();
if !is_valid_key(key.as_ref()) {
return Err(EntryError::new(EntryErrorKind::InvalidKey));
}

let subject = format!("{}{}", self.prefix.as_str(), &key);

let result: Option<(Message, Operation, u64, OffsetDateTime)> = {
let result: Option<(StreamMessage, Operation)> = {
if self.stream.info.config.allow_direct {
let message = self
.stream
.direct_get_last_for_subject(subject.as_str())
.await;
let message = match revision {
Some(revision) => {
let message = self.stream.direct_get(revision).await;
if let Ok(message) = message.as_ref() {
if message.subject.as_str() != subject {
println!("subject mismatch {}", message.subject);
return Ok(None);
}
}
message
}
None => {
self.stream
.direct_get_last_for_subject(subject.as_str())
.await
}
};

match message {
Ok(message) => {
let headers = message.headers.as_ref().ok_or_else(|| {
EntryError::with_source(EntryErrorKind::Other, "missing headers")
})?;

let operation =
kv_operation_from_message(&message).unwrap_or(Operation::Put);

let sequence = headers
.get_last(header::NATS_SEQUENCE)
.ok_or_else(|| {
EntryError::with_source(
EntryErrorKind::Other,
"missing sequence headers",
)
})?
.as_str()
.parse()
.map_err(|err| {
EntryError::with_source(
EntryErrorKind::Other,
format!("failed to parse headers sequence value: {}", err),
)
})?;

let created = headers
.get_last(header::NATS_TIME_STAMP)
.ok_or_else(|| {
EntryError::with_source(
EntryErrorKind::Other,
"did not found timestamp header",
)
})
.and_then(|created| {
OffsetDateTime::parse(created.as_str(), &Rfc3339).map_err(|err| {
EntryError::with_source(
EntryErrorKind::Other,
format!(
"failed to parse headers timestampt value: {}",
err
),
)
})
})?;

Some((message.message, operation, sequence, created))
kv_operation_from_stream_message(&message).unwrap_or(Operation::Put);

Some((message, operation))
}
Err(err) => {
if err.kind() == DirectGetErrorKind::NotFound {
Expand All @@ -406,22 +361,28 @@ impl Store {
}
}
} else {
let raw_message = self
.stream
.get_last_raw_message_by_subject(subject.as_str())
.await;
let raw_message = match revision {
Some(revision) => {
let message = self.stream.get_raw_message(revision).await;
if let Ok(message) = message.as_ref() {
if message.subject.as_str() != subject {
return Ok(None);
}
}
message
}
None => {
self.stream
.get_last_raw_message_by_subject(subject.as_str())
.await
}
};
match raw_message {
Ok(raw_message) => {
let operation = kv_operation_from_stream_message(&raw_message);
let operation = kv_operation_from_stream_message(&raw_message)
.unwrap_or(Operation::Put);
// TODO: unnecessary expensive, cloning whole Message.
let nats_message = Message::try_from(raw_message.clone())
.map_err(|err| EntryError::with_source(EntryErrorKind::Other, err))?;
Some((
nats_message,
operation,
raw_message.sequence,
raw_message.time,
))
Some((raw_message, operation))
}
Err(err) => match err.kind() {
crate::jetstream::stream::LastRawMessageErrorKind::NoMessageFound => None,
Expand All @@ -437,17 +398,13 @@ impl Store {
};

match result {
Some((message, operation, revision, created)) => {
if message.status == Some(StatusCode::NO_RESPONDERS) {
return Ok(None);
}

Some((message, operation)) => {
let entry = Entry {
bucket: self.name.clone(),
key,
value: message.payload,
revision,
created,
revision: message.sequence,
created: message.time,
operation,
delta: 0,
};
Expand All @@ -458,6 +415,63 @@ impl Store {
}
}

/// Retrieves the last [Entry] for a given key from a bucket.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
/// let kv = jetstream
/// .create_key_value(async_nats::jetstream::kv::Config {
/// bucket: "kv".to_string(),
/// history: 10,
/// ..Default::default()
/// })
/// .await?;
/// let status = kv.put("key", "value".into()).await?;
/// let entry = kv.entry("key").await?;
/// println!("entry: {:?}", entry);
/// # Ok(())
/// # }
/// ```
pub async fn entry<T: Into<String>>(&self, key: T) -> Result<Option<Entry>, EntryError> {
self.entry_maybe_revision(key, None).await
}

/// Retrieves the [Entry] for a given key revision from a bucket.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
/// let kv = jetstream
/// .create_key_value(async_nats::jetstream::kv::Config {
/// bucket: "kv".to_string(),
/// history: 10,
/// ..Default::default()
/// })
/// .await?;
/// let status = kv.put("key", "value".into()).await?;
/// let status = kv.put("key", "value2".into()).await?;
/// let entry = kv.entry_for_revision("key", 2).await?;
/// println!("entry: {:?}", entry);
/// # Ok(())
/// # }
/// ```
pub async fn entry_for_revision<T: Into<String>>(
&self,
key: T,
revision: u64,
) -> Result<Option<Entry>, EntryError> {
self.entry_maybe_revision(key, Some(revision)).await
}

/// Creates a [futures::Stream] over [Entries][Entry] a given key in the bucket, which yields
/// values whenever there are changes for that key.
///
Expand Down Expand Up @@ -1079,7 +1093,8 @@ impl futures::Stream for Watch {
)
})?;

let operation = kv_operation_from_message(&message).unwrap_or(Operation::Put);
let operation =
kv_operation_from_message(&message.message).unwrap_or(Operation::Put);

let key = message
.subject
Expand Down
Loading

0 comments on commit f35f44e

Please sign in to comment.