Skip to content

Commit

Permalink
Add benchmark for logprocessor and its cloning costs (open-telemetry#…
Browse files Browse the repository at this point in the history
…2026)

Co-authored-by: Lalit Kumar Bhasin <[email protected]>
  • Loading branch information
cijothomas and lalitb authored Aug 14, 2024
1 parent 286cb6c commit cb1ffb5
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 0 deletions.
4 changes: 4 additions & 0 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ name = "trace"
harness = false
required-features = ["testing"]

[[bench]]
name = "log_processor"
harness = false

[[bench]]
name = "batch_span_processor"
harness = false
Expand Down
172 changes: 172 additions & 0 deletions opentelemetry-sdk/benches/log_processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
The benchmark results:
criterion = "0.5.1"
OS: Ubuntu 22.04.3 LTS (5.15.146.1-microsoft-standard-WSL2)
Hardware: AMD EPYC 7763 64-Core Processor - 2.44 GHz, 16vCPUs,
RAM: 64.0 GB
| Test | Average time|
|---------------------------------------------|-------------|
| log_noop_processor | 134 ns |
| log_cloning_processor | 236 ns |
| log_clone_and_send_to_channel_processor | 403 ns |
*/

use std::{
sync::{Arc, Mutex},
thread::sleep,
time::SystemTime,
};

use criterion::{criterion_group, criterion_main, Criterion};
use opentelemetry::logs::{LogRecord as _, LogResult, Logger as _, LoggerProvider as _, Severity};
use opentelemetry_sdk::{
export::logs::LogData,
logs::{LogProcessor, LogRecord, Logger, LoggerProvider},
};

// Run this benchmark with:
// cargo bench --bench log_processor

fn create_log_record(logger: &Logger) -> LogRecord {
let mut log_record = logger.create_log_record();
let now = SystemTime::now();
log_record.set_observed_timestamp(now);
log_record.set_target("my-target".to_string());
log_record.set_event_name("CheckoutFailed");
log_record.set_severity_number(Severity::Warn);
log_record.set_severity_text("WARN");
log_record.add_attribute("book_id", "12345");
log_record.add_attribute("book_title", "Rust Programming Adventures");
log_record.add_attribute("message", "Unable to process checkout.");
log_record
}

#[derive(Debug)]
struct NoopProcessor;

impl LogProcessor for NoopProcessor {
fn emit(&self, _data: &mut LogData) {}

fn force_flush(&self) -> LogResult<()> {
Ok(())
}

fn shutdown(&self) -> LogResult<()> {
Ok(())
}
}

#[derive(Debug)]
struct CloningProcessor;

impl LogProcessor for CloningProcessor {
fn emit(&self, data: &mut LogData) {
let _data_cloned = data.clone();
}

fn force_flush(&self) -> LogResult<()> {
Ok(())
}

fn shutdown(&self) -> LogResult<()> {
Ok(())
}
}

#[derive(Debug)]
struct SendToChannelProcessor {
sender: std::sync::mpsc::Sender<LogData>,
receiver: Arc<Mutex<std::sync::mpsc::Receiver<LogData>>>,
}

impl SendToChannelProcessor {
fn new() -> Self {
let (sender, receiver) = std::sync::mpsc::channel();
let s = Self {
sender,
receiver: Arc::new(Mutex::new(receiver)),
};
let receiver_cloned = s.receiver.clone();
let _ = std::thread::spawn(move || loop {
sleep(std::time::Duration::from_millis(10));
let data = receiver_cloned.lock().unwrap().recv();
if data.is_err() {
println!(
"Error receiving log data from channel {0}",
data.err().unwrap()
);
break;
}
});
s
}
}

impl LogProcessor for SendToChannelProcessor {
fn emit(&self, data: &mut LogData) {
let data_cloned = data.clone();
let res = self.sender.send(data_cloned);
if res.is_err() {
println!("Error sending log data to channel {0}", res.err().unwrap());
}
}

fn force_flush(&self) -> LogResult<()> {
Ok(())
}

fn shutdown(&self) -> LogResult<()> {
Ok(())
}
}

fn criterion_benchmark(c: &mut Criterion) {
log_noop_processor(c);
log_cloning_processor(c);
log_cloning_and_send_to_channel_processor(c);
}

fn log_noop_processor(c: &mut Criterion) {
let provider = LoggerProvider::builder()
.with_log_processor(NoopProcessor {})
.build();
let logger = provider.logger("benchmark");

c.bench_function("log_noop_processor", |b| {
b.iter(|| {
let log_record = create_log_record(&logger);
logger.emit(log_record);
});
});
}

fn log_cloning_processor(c: &mut Criterion) {
let provider = LoggerProvider::builder()
.with_log_processor(CloningProcessor {})
.build();
let logger = provider.logger("benchmark");

c.bench_function("log_cloning_processor", |b| {
b.iter(|| {
let log_record = create_log_record(&logger);
logger.emit(log_record);
});
});
}

fn log_cloning_and_send_to_channel_processor(c: &mut Criterion) {
let provider = LoggerProvider::builder()
.with_log_processor(SendToChannelProcessor::new())
.build();
let logger = provider.logger("benchmark");

c.bench_function("log_clone_and_send_to_channel_processor", |b| {
b.iter(|| {
let log_record = create_log_record(&logger);
logger.emit(log_record);
});
});
}
criterion_group!(benches, criterion_benchmark);

criterion_main!(benches);

0 comments on commit cb1ffb5

Please sign in to comment.