Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JetStream: SampleFrequency is not set on Pull Consumers #1299

Closed
bengsparks opened this issue Aug 6, 2024 · 3 comments · Fixed by #1300
Closed

JetStream: SampleFrequency is not set on Pull Consumers #1299

bengsparks opened this issue Aug 6, 2024 · 3 comments · Fixed by #1300
Labels
defect Suspected defect such as a bug or regression

Comments

@bengsparks
Copy link
Contributor

Observed behavior

Create a pull consumer with sample_frequency set to anything fails.
Note that I have not checked if the same occurs for push consumers.
See example below.

Expected behavior

The created consumer should not contain SampleFrequency: ""

Server and client version

nats.rs

[package]
name = "async_nats_consumer_sample_frequency"
version = "0.1.0"
edition = "2021"

[dependencies]
async-nats = "0.35.1"
tokio = { version = "1.39", features = ["macros", "rt-multi-thread"] }

nats-server

$ docker run -p 4222:4222 nats:latest --version     
nats-server: v2.10.18

nats-cli

$ nats --version
(devel)

Host environment

No response

Steps to reproduce

Consumer Creation (with Cargo.toml above)
use async_nats::jetstream::{consumer::pull, stream};

#[tokio::main]
async fn main() {
    let connection = async_nats::connect("localhost:4222").await.unwrap();
    let js = async_nats::jetstream::new(connection);

    let stream = js
        .create_stream(stream::Config {
            name: "StreamWithSampledConsumer".into(),
            ..Default::default()
        })
        .await
        .unwrap();

    let mut consumer = stream
        .create_consumer(pull::Config {
            name: Some("SampledConsumer".into()),
            durable_name: Some("SampledConsumer".into()),
            description: Some("See below to check that Ack Sampling has been set to 100%!".to_string()),
            sample_frequency: 100, // <--- sample all the messages 
            ..Default::default() 
        })
        .await.unwrap();

    // Read using provided config
    // Output: 0
    println!("in-memory sample frequency {}", consumer.cached_info().config.sample_frequency);

    // Updating does nothing
    let new_config = consumer.info().await.unwrap();
    // Output: 0
    println!("fetched sample frequency {}", new_config.config.sample_frequency);
}
Double-check consumer by looking up information with nats-cli
[nix-shell:~/monitor]$ nats consumer info
[localhost] ? Select a Stream StreamWithSampledConsumer
[localhost] ? Select a Consumer SampledConsumer
Information for Consumer StreamWithSampledConsumer > SampledConsumer created 2024-08-06T16:26:26+02:00

Configuration:

                    Name: SampledConsumer
             Description: See below to check that Ack Sampling has been set to 100%!
               Pull Mode: true
          Deliver Policy: All
              Ack Policy: Explicit
                Ack Wait: 30.00s
           Replay Policy: Instant
         Max Ack Pending: 1,000
       Max Waiting Pulls: 512

State:

  Last Delivered Message: Consumer sequence: 0 Stream sequence: 0
    Acknowledgment Floor: Consumer sequence: 0 Stream sequence: 0
        Outstanding Acks: 0 out of maximum 1,000
    Redelivered Messages: 0
    Unprocessed Messages: 0
           Waiting Pulls: 0 of maximum 512
Triple-check consumer by manually setting the sample frequency with nats-cli
[nix-shell:~/monitor]$ nats consumer edit --sample 100
[localhost] ? Select a Stream StreamWithSampledConsumer
[localhost] ? Select a Consumer SampledConsumer
Differences (-old +new):
  api.ConsumerConfig{
        ... // 18 identical fields
        RateLimit:       0,
        ReplayPolicy:    s"Instant",
-       SampleFrequency: "",
+       SampleFrequency: "100",
        HeadersOnly:     false,
        MaxRequestBatch: 0,
        ... // 7 identical fields
  }
[localhost] ? Really edit Consumer StreamWithSampledConsumer > SampledConsumer Yes
Information for Consumer StreamWithSampledConsumer > SampledConsumer created 2024-08-06T16:26:26+02:00

Configuration:

                    Name: SampledConsumer
             Description: See below to check that Ack Sampling has been set to 100%!
               Pull Mode: true
          Deliver Policy: All
              Ack Policy: Explicit
                Ack Wait: 30.00s
           Replay Policy: Instant
           Sampling Rate: 100
         Max Ack Pending: 1,000
       Max Waiting Pulls: 512

State:

  Last Delivered Message: Consumer sequence: 0 Stream sequence: 0
    Acknowledgment Floor: Consumer sequence: 0 Stream sequence: 0
        Outstanding Acks: 0 out of maximum 1,000
    Redelivered Messages: 0
    Unprocessed Messages: 0
           Waiting Pulls: 0 of maximum 512
@bengsparks bengsparks added the defect Suspected defect such as a bug or regression label Aug 6, 2024
@Jarema
Copy link
Member

Jarema commented Aug 7, 2024

Hey!

Thanks for reporting this one.
This is because the actual field is sample_freq, not sample_frequency.

Are you interested in contributing and fixing the serde part?:)

@bengsparks
Copy link
Contributor Author

Certainly I can :)

@bengsparks
Copy link
Contributor Author

PR #1300 should fix this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants