From a2c8c7dea196c80183d3be6ce6fa80b3fb939be9 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Mon, 26 Aug 2024 10:57:23 -0700 Subject: [PATCH] Simplified stdout exporter for logs and traces (#2040) Co-authored-by: Lalit Kumar Bhasin Co-authored-by: Utkarsh Umesan Pillai <66651184+utpilla@users.noreply.github.com> --- examples/logs-basic/src/main.rs | 6 +- examples/tracing-grpc/src/client.rs | 12 +- examples/tracing-grpc/src/server.rs | 12 +- opentelemetry-appender-log/src/lib.rs | 4 +- opentelemetry-stdout/CHANGELOG.md | 4 + opentelemetry-stdout/examples/basic.rs | 132 +++++- opentelemetry-stdout/src/lib.rs | 5 +- opentelemetry-stdout/src/logs/exporter.rs | 173 +++----- opentelemetry-stdout/src/logs/mod.rs | 9 +- opentelemetry-stdout/src/logs/transform.rs | 157 ------- opentelemetry-stdout/src/metrics/exporter.rs | 233 +++++++--- opentelemetry-stdout/src/metrics/mod.rs | 2 - opentelemetry-stdout/src/metrics/transform.rs | 415 ------------------ opentelemetry-stdout/src/trace/exporter.rs | 193 ++++---- opentelemetry-stdout/src/trace/mod.rs | 2 - opentelemetry-stdout/src/trace/transform.rs | 232 ---------- 16 files changed, 473 insertions(+), 1118 deletions(-) delete mode 100644 opentelemetry-stdout/src/logs/transform.rs delete mode 100644 opentelemetry-stdout/src/metrics/transform.rs delete mode 100644 opentelemetry-stdout/src/trace/transform.rs diff --git a/examples/logs-basic/src/main.rs b/examples/logs-basic/src/main.rs index 86eb3873c3..8ebd092c80 100644 --- a/examples/logs-basic/src/main.rs +++ b/examples/logs-basic/src/main.rs @@ -7,11 +7,7 @@ use opentelemetry_semantic_conventions::resource::SERVICE_NAME; fn main() { // Setup LoggerProvider with a stdout exporter - let exporter = opentelemetry_stdout::LogExporterBuilder::default() - // uncomment the below lines to pretty print output. - // .with_encoder(|writer, data| - // Ok(serde_json::to_writer_pretty(writer, &data).unwrap())) - .build(); + let exporter = opentelemetry_stdout::LogExporter::default(); let logger_provider = LoggerProvider::builder() .with_resource(Resource::new([KeyValue::new( SERVICE_NAME, diff --git a/examples/tracing-grpc/src/client.rs b/examples/tracing-grpc/src/client.rs index 38a7c93f92..0f24e23710 100644 --- a/examples/tracing-grpc/src/client.rs +++ b/examples/tracing-grpc/src/client.rs @@ -4,7 +4,7 @@ use opentelemetry::{global, propagation::Injector}; use opentelemetry_sdk::{ propagation::TraceContextPropagator, runtime::Tokio, trace::TracerProvider, }; -use opentelemetry_stdout::SpanExporterBuilder; +use opentelemetry_stdout::SpanExporter; use opentelemetry::{ trace::{SpanKind, TraceContextExt, Tracer}, @@ -15,15 +15,7 @@ fn init_tracer() { global::set_text_map_propagator(TraceContextPropagator::new()); // Install stdout exporter pipeline to be able to retrieve the collected spans. let provider = TracerProvider::builder() - .with_batch_exporter( - SpanExporterBuilder::default() - .with_encoder(|writer, data| { - serde_json::to_writer_pretty(writer, &data).unwrap(); - Ok(()) - }) - .build(), - Tokio, - ) + .with_batch_exporter(SpanExporter::default(), Tokio) .build(); global::set_tracer_provider(provider); diff --git a/examples/tracing-grpc/src/server.rs b/examples/tracing-grpc/src/server.rs index c3ffe44472..3831907cf0 100644 --- a/examples/tracing-grpc/src/server.rs +++ b/examples/tracing-grpc/src/server.rs @@ -8,22 +8,14 @@ use opentelemetry::{ use opentelemetry_sdk::{ propagation::TraceContextPropagator, runtime::Tokio, trace::TracerProvider, }; -use opentelemetry_stdout::SpanExporterBuilder; +use opentelemetry_stdout::SpanExporter; use tonic::{transport::Server, Request, Response, Status}; fn init_tracer() { global::set_text_map_propagator(TraceContextPropagator::new()); // Install stdout exporter pipeline to be able to retrieve the collected spans. let provider = TracerProvider::builder() - .with_batch_exporter( - SpanExporterBuilder::default() - .with_encoder(|writer, data| { - serde_json::to_writer_pretty(writer, &data).unwrap(); - Ok(()) - }) - .build(), - Tokio, - ) + .with_batch_exporter(SpanExporter::default(), Tokio) .build(); global::set_tracer_provider(provider); diff --git a/opentelemetry-appender-log/src/lib.rs b/opentelemetry-appender-log/src/lib.rs index 291f888488..7aff449410 100644 --- a/opentelemetry-appender-log/src/lib.rs +++ b/opentelemetry-appender-log/src/lib.rs @@ -12,7 +12,7 @@ //! # #[tokio::main] async fn main() { //! # use opentelemetry_sdk::logs::{BatchLogProcessor, LoggerProvider}; //! # use opentelemetry_sdk::runtime; -//! let exporter = opentelemetry_stdout::LogExporterBuilder::default().build(); +//! let exporter = opentelemetry_stdout::LogExporter::default(); //! //! let logger_provider = LoggerProvider::builder() //! .with_log_processor(BatchLogProcessor::builder(exporter, runtime::Tokio).build()) @@ -27,7 +27,7 @@ //! # use opentelemetry_sdk::logs::{BatchLogProcessor, LoggerProvider}; //! # use opentelemetry_sdk::runtime; //! # use opentelemetry_appender_log::OpenTelemetryLogBridge; -//! # let exporter = opentelemetry_stdout::LogExporterBuilder::default().build(); +//! # let exporter = opentelemetry_stdout::LogExporter::default(); //! # let logger_provider = LoggerProvider::builder() //! # .with_log_processor(BatchLogProcessor::builder(exporter, runtime::Tokio).build()) //! # .build(); diff --git a/opentelemetry-stdout/CHANGELOG.md b/opentelemetry-stdout/CHANGELOG.md index e3701221ee..29b3becdf3 100644 --- a/opentelemetry-stdout/CHANGELOG.md +++ b/opentelemetry-stdout/CHANGELOG.md @@ -4,6 +4,10 @@ - **Breaking** [1994](https://github.com/open-telemetry/opentelemetry-rust/pull/1994) The logrecord event-name is added as attribute with key `name` only if the feature flag `populate-logs-event-name` is enabled. +- **Breaking** [2040](https://github.com/open-telemetry/opentelemetry-rust/pull/2040) Simplified stdout exporter: + - Now only supports writing to stdout, removing ability to send telemetry to other streams. + - Output format improved for better human readability. + - Note: This exporter is intended for learning and debugging purposes only. Not recommended for production use or automated parsing. ## v0.5.0 diff --git a/opentelemetry-stdout/examples/basic.rs b/opentelemetry-stdout/examples/basic.rs index baa2b41d18..4289c74ec4 100644 --- a/opentelemetry-stdout/examples/basic.rs +++ b/opentelemetry-stdout/examples/basic.rs @@ -64,21 +64,129 @@ fn init_logs() -> opentelemetry_sdk::logs::LoggerProvider { #[cfg(feature = "trace")] fn emit_span() { - let tracer = global::tracer("stdout-test"); - let mut span = tracer.start("test_span"); - span.set_attribute(KeyValue::new("test_key", "test_value")); + use opentelemetry::trace::{ + SpanContext, SpanId, TraceFlags, TraceId, TraceState, TracerProvider, + }; + + let tracer = global::tracer_provider() + .tracer_builder("stdout-example") + .with_version("v1") + .with_schema_url("schema_url") + .with_attributes([KeyValue::new("scope_key", "scope_value")]) + .build(); + let mut span = tracer.start("example-span"); + span.set_attribute(KeyValue::new("attribute_key1", "attribute_value1")); + span.set_attribute(KeyValue::new("attribute_key2", "attribute_value2")); span.add_event( - "test_event", - vec![KeyValue::new("test_event_key", "test_event_value")], + "example-event-name", + vec![KeyValue::new("event_attribute1", "event_value1")], + ); + span.add_link( + SpanContext::new( + TraceId::from_hex("58406520a006649127e371903a2de979").expect("invalid"), + SpanId::from_hex("b6d7d7f6d7d6d7f6").expect("invalid"), + TraceFlags::default(), + false, + TraceState::NONE, + ), + vec![ + KeyValue::new("link_attribute1", "link_value1"), + KeyValue::new("link_attribute2", "link_value2"), + ], + ); + + span.add_link( + SpanContext::new( + TraceId::from_hex("23401120a001249127e371903f2de971").expect("invalid"), + SpanId::from_hex("cd37d765d743d7f6").expect("invalid"), + TraceFlags::default(), + false, + TraceState::NONE, + ), + vec![ + KeyValue::new("link_attribute1", "link_value1"), + KeyValue::new("link_attribute2", "link_value2"), + ], ); span.end(); } #[cfg(feature = "metrics")] fn emit_metrics() { - let meter = global::meter("stdout-test"); - let c = meter.u64_counter("test_counter").init(); - c.add(1, &[KeyValue::new("test_key", "test_value")]); + let meter = global::meter("stdout-example"); + let c = meter.u64_counter("example_counter").init(); + c.add( + 1, + &[ + KeyValue::new("name", "apple"), + KeyValue::new("color", "green"), + ], + ); + c.add( + 1, + &[ + KeyValue::new("name", "apple"), + KeyValue::new("color", "green"), + ], + ); + c.add( + 2, + &[ + KeyValue::new("name", "apple"), + KeyValue::new("color", "red"), + ], + ); + c.add( + 1, + &[ + KeyValue::new("name", "banana"), + KeyValue::new("color", "yellow"), + ], + ); + c.add( + 11, + &[ + KeyValue::new("name", "banana"), + KeyValue::new("color", "yellow"), + ], + ); + + let h = meter.f64_histogram("example_histogram").init(); + h.record( + 1.0, + &[ + KeyValue::new("name", "apple"), + KeyValue::new("color", "green"), + ], + ); + h.record( + 1.0, + &[ + KeyValue::new("name", "apple"), + KeyValue::new("color", "green"), + ], + ); + h.record( + 2.0, + &[ + KeyValue::new("name", "apple"), + KeyValue::new("color", "red"), + ], + ); + h.record( + 1.0, + &[ + KeyValue::new("name", "banana"), + KeyValue::new("color", "yellow"), + ], + ); + h.record( + 11.0, + &[ + KeyValue::new("name", "banana"), + KeyValue::new("color", "yellow"), + ], + ); } #[cfg(feature = "logs")] @@ -101,17 +209,9 @@ async fn main() -> Result<(), Box> { #[cfg(feature = "logs")] emit_log(); - println!( - "======================================================================================" - ); - #[cfg(feature = "trace")] emit_span(); - println!( - "======================================================================================" - ); - #[cfg(feature = "metrics")] emit_metrics(); diff --git a/opentelemetry-stdout/src/lib.rs b/opentelemetry-stdout/src/lib.rs index 4ab74a82be..d34ee3996d 100644 --- a/opentelemetry-stdout/src/lib.rs +++ b/opentelemetry-stdout/src/lib.rs @@ -7,7 +7,7 @@ //! # Feature Flags //! The following feature flags can enable exporters for different telemetry signals: //! -//! * `trace`: Includes the trace exporters (enabled by default). +//! * `trace`: Includes the trace exporters. //! * `metrics`: Includes the metrics exporters. //! * `logs`: Includes the logs exporters. //! @@ -56,9 +56,6 @@ //! //! // recorded traces, metrics and logs will now be sent to stdout: //! -//! // {"resourceMetrics":{"resource":{"attributes":[{"key":"service.name","value":{"str.. -//! // {"resourceSpans":[{"resource":{"attributes":[{"key":"service.name","value":{"stri.. -//! // {"resourceLogs": [{"resource": {"attributes": [{"key": "service.name", "value": {"str.. //! # } //! ``` #![warn(missing_debug_implementations, missing_docs)] diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index fd59701c0b..959d66d049 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -1,38 +1,26 @@ use async_trait::async_trait; +use chrono::{DateTime, Utc}; use core::fmt; +use opentelemetry::logs::LogResult; use opentelemetry::InstrumentationLibrary; -use opentelemetry::{ - logs::{LogError, LogResult}, - ExportError, -}; use opentelemetry_sdk::logs::LogRecord; use opentelemetry_sdk::Resource; -use std::io::{stdout, Write}; +use std::sync::atomic; -type Encoder = - Box LogResult<()> + Send + Sync>; - -/// A [`LogExporter`] that writes to [`Stdout`] or other configured [`Write`]. -/// -/// [`LogExporter`]: opentelemetry_sdk::export::logs::LogExporter -/// [`Write`]: std::io::Write -/// [`Stdout`]: std::io::Stdout +/// An OpenTelemetry exporter that writes Logs to stdout on export. pub struct LogExporter { - writer: Option>, - encoder: Encoder, resource: Resource, -} - -impl LogExporter { - /// Create a builder to configure this exporter. - pub fn builder() -> LogExporterBuilder { - Default::default() - } + is_shutdown: atomic::AtomicBool, + resource_emitted: bool, } impl Default for LogExporter { fn default() -> Self { - LogExporterBuilder::default().build() + LogExporter { + resource: Resource::default(), + is_shutdown: atomic::AtomicBool::new(false), + resource_emitted: false, + } } } @@ -46,16 +34,31 @@ impl fmt::Debug for LogExporter { impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { /// Export spans to stdout async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> { - if let Some(writer) = &mut self.writer { - let result = (self.encoder)(writer, (batch, &self.resource).into()) as LogResult<()>; - result.and_then(|_| writer.write_all(b"\n").map_err(|e| Error(e).into())) + if self.is_shutdown.load(atomic::Ordering::SeqCst) { + return Err("exporter is shut down".into()); } else { - Err("exporter is shut down".into()) + println!("Logs"); + if self.resource_emitted { + print_logs(batch); + } else { + self.resource_emitted = true; + println!("Resource"); + if let Some(schema_url) = self.resource.schema_url() { + println!("\t Resource SchemaUrl: {:?}", schema_url); + } + self.resource.iter().for_each(|(k, v)| { + println!("\t -> {}={:?}", k, v); + }); + + print_logs(batch); + } + + Ok(()) } } fn shutdown(&mut self) { - self.writer.take(); + self.is_shutdown.store(true, atomic::Ordering::SeqCst); } fn set_resource(&mut self, res: &opentelemetry_sdk::Resource) { @@ -63,84 +66,44 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { } } -/// Stdout exporter's error -#[derive(thiserror::Error, Debug)] -#[error(transparent)] -struct Error(#[from] std::io::Error); - -impl ExportError for Error { - fn exporter_name(&self) -> &'static str { - "stdout" - } -} - -/// Configuration for the stdout log exporter -#[derive(Default)] -pub struct LogExporterBuilder { - writer: Option>, - encoder: Option, -} - -impl fmt::Debug for LogExporterBuilder { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("LogExporterBuilder") - } -} - -impl LogExporterBuilder { - /// Set the writer that the exporter will write to - /// - /// # Examples - /// - /// ``` - /// use opentelemetry_stdout::LogExporterBuilder; - /// - /// let buffer = Vec::new(); // Any type that implements `Write` - /// let exporter = LogExporterBuilder::default().with_writer(buffer).build(); - /// ``` - pub fn with_writer(mut self, writer: W) -> Self - where - W: Write + Send + Sync + 'static, - { - self.writer = Some(Box::new(writer)); - self - } - - /// Set the encoder that the exporter will use. - /// - /// # Examples - /// - /// ``` - /// use opentelemetry_stdout::LogExporterBuilder; - /// use serde_json; - /// - /// let exporter = LogExporterBuilder::default() - /// .with_encoder(|writer, data| - /// Ok(serde_json::to_writer_pretty(writer, &data).unwrap())) - /// .build(); - /// ``` - pub fn with_encoder(mut self, encoder: E) -> Self - where - E: Fn(&mut dyn Write, crate::logs::transform::LogData) -> LogResult<()> - + Send - + Sync - + 'static, - { - self.encoder = Some(Box::new(encoder)); - self - } +fn print_logs(batch: Vec<(&LogRecord, &InstrumentationLibrary)>) { + for (i, log) in batch.into_iter().enumerate() { + println!("Log #{}", i); + let (record, _library) = log; + if let Some(event_name) = record.event_name { + println!("\t EventName: {:?}", event_name); + } + if let Some(target) = &record.target { + println!("\t Target (Scope): {:?}", target); + } + if let Some(trace_context) = &record.trace_context { + println!("\t TraceId: {:?}", trace_context.trace_id); + println!("\t SpanId: {:?}", trace_context.span_id); + } + if let Some(timestamp) = record.timestamp { + let datetime: DateTime = timestamp.into(); + println!("\t Timestamp: {}", datetime.format("%Y-%m-%d %H:%M:%S%.6f")); + } + if let Some(timestamp) = record.observed_timestamp { + let datetime: DateTime = timestamp.into(); + println!( + "\t Observed Timestamp: {}", + datetime.format("%Y-%m-%d %H:%M:%S%.6f") + ); + } + if let Some(severity) = record.severity_text { + println!("\t SeverityText: {:?}", severity); + } + if let Some(severity) = record.severity_number { + println!("\t SeverityNumber: {:?}", severity); + } + if let Some(body) = &record.body { + println!("\t Body: {:?}", body); + } - /// Create a log exporter with the current configuration. - pub fn build(self) -> LogExporter { - LogExporter { - writer: Some(self.writer.unwrap_or_else(|| Box::new(stdout()))), - resource: Resource::default(), - encoder: self.encoder.unwrap_or_else(|| { - Box::new(|writer, logs| { - serde_json::to_writer(writer, &logs) - .map_err(|err| LogError::Other(Box::new(err))) - }) - }), + println!("\t Attributes:"); + for (k, v) in record.attributes_iter() { + println!("\t\t -> {}: {:?}", k, v); } } } diff --git a/opentelemetry-stdout/src/logs/mod.rs b/opentelemetry-stdout/src/logs/mod.rs index e83d73da67..76a8b1debe 100644 --- a/opentelemetry-stdout/src/logs/mod.rs +++ b/opentelemetry-stdout/src/logs/mod.rs @@ -1,15 +1,8 @@ //! # Stdout Log Exporter //! -//! The stdout [`LogExporter`] writes debug printed [`LogRecord`]s to its configured -//! [`Write`] instance. By default it will write to [`Stdout`]. +//! The stdout [`LogExporter`] writes debug printed [`LogRecord`]s to Stdout. //! //! [`LogExporter`]: opentelemetry_sdk::export::logs::LogExporter //! [`LogRecord`]: opentelemetry::logs::LogRecord -//! [`Write`]: std::io::Write -//! [`Stdout`]: std::io::Stdout -// TODO: Add an example for using this exporter. mod exporter; -mod transform; - pub use exporter::*; -pub use transform::*; diff --git a/opentelemetry-stdout/src/logs/transform.rs b/opentelemetry-stdout/src/logs/transform.rs deleted file mode 100644 index 84e864f469..0000000000 --- a/opentelemetry-stdout/src/logs/transform.rs +++ /dev/null @@ -1,157 +0,0 @@ -use std::{borrow::Cow, collections::HashMap, time::SystemTime}; - -use crate::common::{ - as_human_readable, as_opt_human_readable, as_opt_unix_nano, as_unix_nano, AttributeSet, - KeyValue, Resource, Scope, Value, -}; -use serde::Serialize; - -/// Transformed logs data that can be serialized. -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct LogData { - #[serde(skip_serializing_if = "Vec::is_empty")] - resource_logs: Vec, -} - -impl - From<( - Vec<( - &opentelemetry_sdk::logs::LogRecord, - &opentelemetry::InstrumentationLibrary, - )>, - &opentelemetry_sdk::Resource, - )> for LogData -{ - fn from( - (sdk_logs, sdk_resource): ( - Vec<( - &opentelemetry_sdk::logs::LogRecord, - &opentelemetry::InstrumentationLibrary, - )>, - &opentelemetry_sdk::Resource, - ), - ) -> Self { - let mut resource_logs = HashMap::::new(); - - for sdk_log in sdk_logs { - let resource_schema_url = sdk_resource.schema_url().map(|s| s.to_string().into()); - let schema_url = sdk_log.1.schema_url.clone(); - let scope: Scope = sdk_log.1.clone().into(); - let resource: Resource = sdk_resource.into(); - - let rl = resource_logs - .entry(sdk_resource.into()) - .or_insert_with(move || ResourceLogs { - resource, - scope_logs: Vec::with_capacity(1), - schema_url: resource_schema_url, - }); - - match rl.scope_logs.iter_mut().find(|sl| sl.scope == scope) { - Some(sl) => sl.log_records.push(sdk_log.0.into()), - None => rl.scope_logs.push(ScopeLogs { - scope, - log_records: vec![sdk_log.0.into()], - schema_url, - }), - } - } - - LogData { - resource_logs: resource_logs.into_values().collect(), - } - } -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -struct ResourceLogs { - resource: Resource, - #[serde(skip_serializing_if = "Vec::is_empty")] - scope_logs: Vec, - #[serde(skip_serializing_if = "Option::is_none")] - schema_url: Option>, -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -struct ScopeLogs { - scope: Scope, - #[serde(skip_serializing_if = "Vec::is_empty")] - log_records: Vec, - #[serde(skip_serializing_if = "Option::is_none")] - schema_url: Option>, -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -struct LogRecord { - #[serde(serialize_with = "as_opt_unix_nano")] - time_unix_nano: Option, - #[serde(serialize_with = "as_opt_human_readable")] - time: Option, - #[serde(serialize_with = "as_unix_nano")] - observed_time_unix_nano: SystemTime, - #[serde(serialize_with = "as_human_readable")] - observed_time: SystemTime, - severity_number: u32, - #[serde(skip_serializing_if = "Option::is_none")] - severity_text: Option<&'static str>, - #[serde(skip_serializing_if = "Option::is_none")] - body: Option, - attributes: Vec, - dropped_attributes_count: u32, - #[serde(skip_serializing_if = "Option::is_none")] - flags: Option, - #[serde(skip_serializing_if = "Option::is_none")] - span_id: Option, - #[serde(skip_serializing_if = "Option::is_none")] - trace_id: Option, -} - -impl From<&opentelemetry_sdk::logs::LogRecord> for LogRecord { - fn from(record: &opentelemetry_sdk::logs::LogRecord) -> Self { - LogRecord { - attributes: { - let attributes = record - .attributes_iter() - .map(|(k, v)| KeyValue::from((k.clone(), v.clone()))) // Map each pair to a KeyValue - .collect::>(); // Collect into a Vecs - - #[cfg(feature = "populate-logs-event-name")] - if let Some(event_name) = record.event_name { - let mut attributes_with_name = attributes; - attributes_with_name.push(KeyValue::from(( - "name".into(), - opentelemetry::Value::String(event_name.into()), - ))); - attributes_with_name - } else { - attributes - } - - #[cfg(not(feature = "populate-logs-event-name"))] - attributes - }, - trace_id: record - .trace_context - .as_ref() - .map(|c| c.trace_id.to_string()), - span_id: record.trace_context.as_ref().map(|c| c.span_id.to_string()), - flags: record - .trace_context - .as_ref() - .map(|c| c.trace_flags.map(|f| f.to_u8())) - .unwrap_or_default(), - time_unix_nano: record.timestamp, - time: record.timestamp, - observed_time_unix_nano: record.observed_timestamp.unwrap(), - observed_time: record.observed_timestamp.unwrap(), - severity_number: record.severity_number.map(|u| u as u32).unwrap_or_default(), - dropped_attributes_count: 0, - severity_text: record.severity_text, - body: record.body.clone().map(|a| a.into()), - } - } -} diff --git a/opentelemetry-stdout/src/metrics/exporter.rs b/opentelemetry-stdout/src/metrics/exporter.rs index 31d67f6c45..a961222d46 100644 --- a/opentelemetry-stdout/src/metrics/exporter.rs +++ b/opentelemetry-stdout/src/metrics/exporter.rs @@ -1,8 +1,9 @@ use async_trait::async_trait; -use core::fmt; +use chrono::{DateTime, Utc}; +use core::{f64, fmt}; use opentelemetry::metrics::{MetricsError, Result}; use opentelemetry_sdk::metrics::{ - data, + data::{self, ScopeMetrics}, exporter::PushMetricsExporter, reader::{ AggregationSelector, DefaultAggregationSelector, DefaultTemporalitySelector, @@ -10,19 +11,12 @@ use opentelemetry_sdk::metrics::{ }, Aggregation, InstrumentKind, }; -use std::{ - io::{stdout, Write}, - sync::Mutex, -}; - -use crate::MetricsData; - -type Encoder = Box Result<()> + Send + Sync>; +use std::fmt::Debug; +use std::sync::atomic; /// An OpenTelemetry exporter that writes to stdout on export. pub struct MetricsExporter { - writer: Mutex>>, - encoder: Encoder, + is_shutdown: atomic::AtomicBool, temporality_selector: Box, aggregation_selector: Box, } @@ -59,14 +53,22 @@ impl AggregationSelector for MetricsExporter { #[async_trait] impl PushMetricsExporter for MetricsExporter { + /// Write Metrics to stdout async fn export(&self, metrics: &mut data::ResourceMetrics) -> Result<()> { - if let Some(writer) = self.writer.lock()?.as_mut() { - (self.encoder)(writer, crate::metrics::MetricsData::from(metrics))?; - writer - .write_all(b"\n") - .map_err(|err| MetricsError::Other(err.to_string())) - } else { + if self.is_shutdown.load(atomic::Ordering::SeqCst) { Err(MetricsError::Other("exporter is shut down".into())) + } else { + println!("Metrics"); + println!("Resource"); + if let Some(schema_url) = metrics.resource.schema_url() { + println!("\tResource SchemaUrl: {:?}", schema_url); + } + + metrics.resource.iter().for_each(|(k, v)| { + println!("\t -> {}={:?}", k, v); + }); + print_metrics(&metrics.scope_metrics); + Ok(()) } } @@ -76,57 +78,166 @@ impl PushMetricsExporter for MetricsExporter { } fn shutdown(&self) -> Result<()> { - self.writer.lock()?.take(); + self.is_shutdown.store(true, atomic::Ordering::SeqCst); Ok(()) } } +fn print_metrics(metrics: &[ScopeMetrics]) { + for (i, metric) in metrics.iter().enumerate() { + println!("\tInstrumentation Scope #{}", i); + println!("\t\tName : {}", &metric.scope.name); + if let Some(version) = &metric.scope.version { + println!("\t\tVersion : {:?}", version); + } + if let Some(schema_url) = &metric.scope.schema_url { + println!("\t\tSchemaUrl: {:?}", schema_url); + } + metric + .scope + .attributes + .iter() + .enumerate() + .for_each(|(index, kv)| { + if index == 0 { + println!("\t\tScope Attributes:"); + } + println!("\t\t\t -> {}: {}", kv.key, kv.value); + }); + + metric.metrics.iter().enumerate().for_each(|(i, metric)| { + println!("Metric #{}", i); + println!("\t\tName : {}", &metric.name); + println!("\t\tDescription : {}", &metric.description); + println!("\t\tUnit : {}", &metric.unit); + + let data = metric.data.as_any(); + if let Some(hist) = data.downcast_ref::>() { + println!("\t\tType : Histogram"); + print_histogram(hist); + } else if let Some(hist) = data.downcast_ref::>() { + println!("\t\tType : Histogram"); + print_histogram(hist); + } else if let Some(_hist) = data.downcast_ref::>() { + println!("\t\tType : Exponential Histogram"); + // TODO + } else if let Some(_hist) = data.downcast_ref::>() { + println!("\t\tType : Exponential Histogram"); + // TODO + } else if let Some(sum) = data.downcast_ref::>() { + println!("\t\tType : Sum"); + print_sum(sum); + } else if let Some(sum) = data.downcast_ref::>() { + println!("\t\tType : Sum"); + print_sum(sum); + } else if let Some(sum) = data.downcast_ref::>() { + println!("\t\tType : Sum"); + print_sum(sum); + } else if let Some(gauge) = data.downcast_ref::>() { + println!("\t\tType : Gauge"); + print_gauge(gauge); + } else if let Some(gauge) = data.downcast_ref::>() { + println!("\t\tType : Gauge"); + print_gauge(gauge); + } else if let Some(gauge) = data.downcast_ref::>() { + println!("\t\tType : Gauge"); + print_gauge(gauge); + } else { + println!("Unsupported data type"); + } + }); + } +} + +fn print_sum(sum: &data::Sum) { + println!("\t\tSum DataPoints"); + println!("\t\tMonotonic : {}", sum.is_monotonic); + if sum.temporality == data::Temporality::Cumulative { + println!("\t\tTemporality : Cumulative"); + } else { + println!("\t\tTemporality : Delta"); + } + print_data_points(&sum.data_points); +} + +fn print_gauge(gauge: &data::Gauge) { + println!("\t\tGauge DataPoints"); + print_data_points(&gauge.data_points); +} + +fn print_histogram(histogram: &data::Histogram) { + if histogram.temporality == data::Temporality::Cumulative { + println!("\t\tTemporality : Cumulative"); + } else { + println!("\t\tTemporality : Delta"); + } + println!("\t\tHistogram DataPoints"); + print_hist_data_points(&histogram.data_points); +} + +fn print_data_points(data_points: &[data::DataPoint]) { + for (i, data_point) in data_points.iter().enumerate() { + println!("\t\tDataPoint #{}", i); + if let Some(start_time) = data_point.start_time { + let datetime: DateTime = start_time.into(); + println!( + "\t\t\tStartTime : {}", + datetime.format("%Y-%m-%d %H:%M:%S%.6f") + ); + } + if let Some(end_time) = data_point.time { + let datetime: DateTime = end_time.into(); + println!( + "\t\t\tEndTime : {}", + datetime.format("%Y-%m-%d %H:%M:%S%.6f") + ); + } + println!("\t\t\tValue : {:#?}", data_point.value); + println!("\t\t\tAttributes :"); + for kv in data_point.attributes.iter() { + println!("\t\t\t\t -> {}: {}", kv.key, kv.value.as_str()); + } + } +} + +fn print_hist_data_points(data_points: &[data::HistogramDataPoint]) { + for (i, data_point) in data_points.iter().enumerate() { + println!("\t\tDataPoint #{}", i); + let datetime: DateTime = data_point.start_time.into(); + println!( + "\t\t\tStartTime : {}", + datetime.format("%Y-%m-%d %H:%M:%S%.6f") + ); + let datetime: DateTime = data_point.time.into(); + println!( + "\t\t\tEndTime : {}", + datetime.format("%Y-%m-%d %H:%M:%S%.6f") + ); + println!("\t\t\tCount : {}", data_point.count); + println!("\t\t\tSum : {:?}", data_point.sum); + if let Some(min) = &data_point.min { + println!("\t\t\tMin : {:?}", min); + } + + if let Some(max) = &data_point.max { + println!("\t\t\tMax : {:?}", max); + } + + println!("\t\t\tAttributes :"); + for kv in data_point.attributes.iter() { + println!("\t\t\t\t -> {}: {}", kv.key, kv.value.as_str()); + } + } +} + /// Configuration for the stdout metrics exporter #[derive(Default)] pub struct MetricsExporterBuilder { - writer: Option>, - encoder: Option, temporality_selector: Option>, aggregation_selector: Option>, } impl MetricsExporterBuilder { - /// Set the writer that the exporter will write to - /// - /// # Examples - /// - /// ``` - /// use opentelemetry_stdout::MetricsExporterBuilder; - /// - /// let buffer = Vec::new(); // Any type that implements `Write` - /// let exporter = MetricsExporterBuilder::default().with_writer(buffer).build(); - /// ``` - pub fn with_writer(mut self, writer: impl Write + Send + Sync + 'static) -> Self { - self.writer = Some(Box::new(writer)); - self - } - - /// Set the encoder that this exporter will use - /// - /// # Examples - /// - /// ``` - /// use opentelemetry_stdout::MetricsExporterBuilder; - /// - /// let exporter = MetricsExporterBuilder::default() - /// // Can be any function that can write formatted data - /// // serde ecosystem crates for example provide such functions - /// .with_encoder(|writer, data| Ok(serde_json::to_writer_pretty(writer, &data).unwrap())) - /// .build(); - /// ``` - pub fn with_encoder( - mut self, - encoder: impl Fn(&mut dyn Write, MetricsData) -> Result<()> + Send + Sync + 'static, - ) -> Self { - self.encoder = Some(Box::new(encoder)); - self - } - /// Set the temporality exporter for the exporter pub fn with_temporality_selector( mut self, @@ -148,19 +259,13 @@ impl MetricsExporterBuilder { /// Create a metrics exporter with the current configuration pub fn build(self) -> MetricsExporter { MetricsExporter { - writer: Mutex::new(Some(self.writer.unwrap_or_else(|| Box::new(stdout())))), - encoder: self.encoder.unwrap_or_else(|| { - Box::new(|writer, metrics| { - serde_json::to_writer(writer, &metrics) - .map_err(|err| MetricsError::Other(err.to_string())) - }) - }), temporality_selector: self .temporality_selector .unwrap_or_else(|| Box::new(DefaultTemporalitySelector::new())), aggregation_selector: self .aggregation_selector .unwrap_or_else(|| Box::new(DefaultAggregationSelector::new())), + is_shutdown: atomic::AtomicBool::new(false), } } } diff --git a/opentelemetry-stdout/src/metrics/mod.rs b/opentelemetry-stdout/src/metrics/mod.rs index 418c5c2904..cf224ca510 100644 --- a/opentelemetry-stdout/src/metrics/mod.rs +++ b/opentelemetry-stdout/src/metrics/mod.rs @@ -1,5 +1,3 @@ mod exporter; -mod transform; pub use exporter::*; -pub use transform::*; diff --git a/opentelemetry-stdout/src/metrics/transform.rs b/opentelemetry-stdout/src/metrics/transform.rs deleted file mode 100644 index 9df88ff7da..0000000000 --- a/opentelemetry-stdout/src/metrics/transform.rs +++ /dev/null @@ -1,415 +0,0 @@ -use crate::common::{KeyValue, Resource, Scope}; -use opentelemetry::{global, metrics::MetricsError}; -use opentelemetry_sdk::metrics::data; -use serde::{Serialize, Serializer}; -use std::{any::Any, borrow::Cow, time::SystemTime}; - -use crate::common::{as_human_readable, as_opt_human_readable, as_opt_unix_nano, as_unix_nano}; - -/// Transformed metrics data that can be serialized -#[derive(Serialize, Debug, Clone)] -#[serde(rename_all = "camelCase")] -pub struct MetricsData { - resource_metrics: ResourceMetrics, -} - -impl From<&mut data::ResourceMetrics> for MetricsData { - fn from(value: &mut data::ResourceMetrics) -> Self { - MetricsData { - resource_metrics: value.into(), - } - } -} - -#[derive(Serialize, Debug, Clone)] -#[serde(rename_all = "camelCase")] -pub(crate) struct ResourceMetrics { - resource: Resource, - scope_metrics: Vec, - #[serde(skip_serializing_if = "Option::is_none")] - schema_url: Option, -} - -impl From<&mut data::ResourceMetrics> for ResourceMetrics { - fn from(value: &mut data::ResourceMetrics) -> Self { - ResourceMetrics { - resource: Resource::from(&value.resource), - scope_metrics: value.scope_metrics.drain(..).map(Into::into).collect(), - schema_url: value.resource.schema_url().map(Into::into), - } - } -} - -#[derive(Serialize, Debug, Clone)] -#[serde(rename_all = "camelCase")] -struct ScopeMetrics { - scope: Scope, - metrics: Vec, - #[serde(skip_serializing_if = "Option::is_none")] - schema_url: Option>, -} - -impl From for ScopeMetrics { - fn from(value: data::ScopeMetrics) -> Self { - let schema_url = value.scope.schema_url.clone(); - ScopeMetrics { - scope: value.scope.into(), - metrics: value.metrics.into_iter().map(Into::into).collect(), - schema_url, - } - } -} - -#[derive(Serialize, Debug, Clone)] -#[serde(rename_all = "camelCase")] -struct Metric { - name: Cow<'static, str>, - #[serde(skip_serializing_if = "str::is_empty")] - description: Cow<'static, str>, - #[serde(skip_serializing_if = "str::is_empty")] - unit: Cow<'static, str>, - #[serde(flatten)] - data: Option, -} - -impl From for Metric { - fn from(value: data::Metric) -> Self { - Metric { - name: value.name, - description: value.description, - unit: value.unit, - data: map_data(value.data.as_any()), - } - } -} - -fn map_data(data: &dyn Any) -> Option { - if let Some(hist) = data.downcast_ref::>() { - Some(MetricData::Histogram(hist.into())) - } else if let Some(hist) = data.downcast_ref::>() { - Some(MetricData::Histogram(hist.into())) - } else if let Some(hist) = data.downcast_ref::>() { - Some(MetricData::Histogram(hist.into())) - } else if let Some(hist) = data.downcast_ref::>() { - Some(MetricData::ExponentialHistogram(hist.into())) - } else if let Some(hist) = data.downcast_ref::>() { - Some(MetricData::ExponentialHistogram(hist.into())) - } else if let Some(hist) = data.downcast_ref::>() { - Some(MetricData::ExponentialHistogram(hist.into())) - } else if let Some(sum) = data.downcast_ref::>() { - Some(MetricData::Sum(sum.into())) - } else if let Some(sum) = data.downcast_ref::>() { - Some(MetricData::Sum(sum.into())) - } else if let Some(sum) = data.downcast_ref::>() { - Some(MetricData::Sum(sum.into())) - } else if let Some(gauge) = data.downcast_ref::>() { - Some(MetricData::Gauge(gauge.into())) - } else if let Some(gauge) = data.downcast_ref::>() { - Some(MetricData::Gauge(gauge.into())) - } else if let Some(gauge) = data.downcast_ref::>() { - Some(MetricData::Gauge(gauge.into())) - } else { - global::handle_error(MetricsError::Other("unknown aggregator".into())); - None - } -} - -#[derive(Serialize, Debug, Clone)] -#[serde(rename_all = "camelCase")] -enum MetricData { - Gauge(Gauge), - Sum(Sum), - Histogram(Histogram), - ExponentialHistogram(ExponentialHistogram), -} - -#[derive(Serialize, Debug, Clone)] -#[serde(untagged)] -enum DataValue { - F64(f64), - I64(i64), - U64(u64), -} - -impl From for DataValue { - fn from(value: f64) -> Self { - DataValue::F64(value) - } -} - -impl From for DataValue { - fn from(value: i64) -> Self { - DataValue::I64(value) - } -} - -impl From for DataValue { - fn from(value: u64) -> Self { - DataValue::U64(value) - } -} - -#[derive(Serialize, Debug, Clone)] -#[serde(rename_all = "camelCase")] -struct Gauge { - data_points: Vec, -} - -impl + Copy> From<&data::Gauge> for Gauge { - fn from(value: &data::Gauge) -> Self { - Gauge { - data_points: value.data_points.iter().map(Into::into).collect(), - } - } -} - -#[derive(Debug, Clone, Copy)] -enum Temporality { - #[allow(dead_code)] - Unspecified = 0, // explicitly never used - Delta = 1, - Cumulative = 2, -} - -impl Serialize for Temporality { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - match &self { - Temporality::Cumulative => serializer.serialize_str("Cumulative"), - Temporality::Delta => serializer.serialize_str("Delta"), - Temporality::Unspecified => serializer.serialize_str("Unspecified"), - } - } -} - -impl From for Temporality { - fn from(value: data::Temporality) -> Self { - match value { - data::Temporality::Cumulative => Temporality::Cumulative, - data::Temporality::Delta => Temporality::Delta, - _ => panic!("unexpected temporality"), - } - } -} - -#[derive(Serialize, Debug, Clone)] -#[serde(rename_all = "camelCase")] -struct Sum { - data_points: Vec, - aggregation_temporality: Temporality, - is_monotonic: bool, -} - -impl + Copy> From<&data::Sum> for Sum { - fn from(value: &data::Sum) -> Self { - Sum { - data_points: value.data_points.iter().map(Into::into).collect(), - aggregation_temporality: value.temporality.into(), - is_monotonic: value.is_monotonic, - } - } -} - -#[derive(Serialize, Debug, Clone)] -#[serde(rename_all = "camelCase")] -struct DataPoint { - attributes: Vec, - #[serde(serialize_with = "as_opt_human_readable")] - start_time: Option, - #[serde(serialize_with = "as_opt_human_readable")] - time: Option, - #[serde(serialize_with = "as_opt_unix_nano")] - start_time_unix_nano: Option, - #[serde(serialize_with = "as_opt_unix_nano")] - time_unix_nano: Option, - value: DataValue, - #[serde(skip_serializing_if = "Vec::is_empty")] - exemplars: Vec, - #[serde(skip_serializing_if = "is_zero_u8")] - flags: u8, -} - -fn is_zero_u8(v: &u8) -> bool { - *v == 0 -} - -impl + Copy> From<&data::DataPoint> for DataPoint { - fn from(value: &data::DataPoint) -> Self { - DataPoint { - attributes: value.attributes.iter().map(Into::into).collect(), - start_time_unix_nano: value.start_time, - time_unix_nano: value.time, - start_time: value.start_time, - time: value.time, - value: value.value.into(), - exemplars: value.exemplars.iter().map(Into::into).collect(), - flags: 0, - } - } -} - -#[derive(Serialize, Debug, Clone)] -#[serde(rename_all = "camelCase")] -struct Histogram { - data_points: Vec, - aggregation_temporality: Temporality, -} - -impl + Copy> From<&data::Histogram> for Histogram { - fn from(value: &data::Histogram) -> Self { - Histogram { - data_points: value.data_points.iter().map(Into::into).collect(), - aggregation_temporality: value.temporality.into(), - } - } -} - -#[derive(Serialize, Debug, Clone)] -#[serde(rename_all = "camelCase")] -struct HistogramDataPoint { - attributes: Vec, - #[serde(serialize_with = "as_unix_nano")] - start_time_unix_nano: SystemTime, - #[serde(serialize_with = "as_unix_nano")] - time_unix_nano: SystemTime, - #[serde(serialize_with = "as_human_readable")] - start_time: SystemTime, - #[serde(serialize_with = "as_human_readable")] - time: SystemTime, - count: u64, - explicit_bounds: Vec, - bucket_counts: Vec, - min: Option, - max: Option, - sum: DataValue, - exemplars: Vec, - flags: u8, -} - -impl + Copy> From<&data::HistogramDataPoint> for HistogramDataPoint { - fn from(value: &data::HistogramDataPoint) -> Self { - HistogramDataPoint { - attributes: value.attributes.iter().map(Into::into).collect(), - start_time_unix_nano: value.start_time, - time_unix_nano: value.time, - start_time: value.start_time, - time: value.time, - count: value.count, - explicit_bounds: value.bounds.clone(), - bucket_counts: value.bucket_counts.clone(), - min: value.min.map(Into::into), - max: value.max.map(Into::into), - sum: value.sum.into(), - exemplars: value.exemplars.iter().map(Into::into).collect(), - flags: 0, - } - } -} -#[derive(Serialize, Debug, Clone)] -#[serde(rename_all = "camelCase")] -struct ExponentialHistogram { - data_points: Vec, - aggregation_temporality: Temporality, -} - -impl + Copy> From<&data::ExponentialHistogram> for ExponentialHistogram { - fn from(value: &data::ExponentialHistogram) -> Self { - ExponentialHistogram { - data_points: value.data_points.iter().map(Into::into).collect(), - aggregation_temporality: value.temporality.into(), - } - } -} - -#[derive(Serialize, Debug, Clone)] -#[serde(rename_all = "camelCase")] -struct ExponentialHistogramDataPoint { - attributes: Vec, - #[serde(serialize_with = "as_unix_nano")] - start_time_unix_nano: SystemTime, - #[serde(serialize_with = "as_unix_nano")] - time_unix_nano: SystemTime, - #[serde(serialize_with = "as_human_readable")] - start_time: SystemTime, - #[serde(serialize_with = "as_human_readable")] - time: SystemTime, - count: usize, - min: Option, - max: Option, - sum: DataValue, - scale: i8, - zero_count: u64, - positive: ExponentialBucket, - negative: ExponentialBucket, - zero_threshold: f64, - exemplars: Vec, - flags: u8, -} - -impl + Copy> From<&data::ExponentialHistogramDataPoint> - for ExponentialHistogramDataPoint -{ - fn from(value: &data::ExponentialHistogramDataPoint) -> Self { - ExponentialHistogramDataPoint { - attributes: value.attributes.iter().map(Into::into).collect(), - start_time_unix_nano: value.start_time, - time_unix_nano: value.time, - start_time: value.start_time, - time: value.time, - count: value.count, - min: value.min.map(Into::into), - max: value.max.map(Into::into), - sum: value.sum.into(), - scale: value.scale, - zero_count: value.zero_count, - positive: (&value.positive_bucket).into(), - negative: (&value.negative_bucket).into(), - zero_threshold: value.zero_threshold, - exemplars: value.exemplars.iter().map(Into::into).collect(), - flags: 0, - } - } -} - -impl From<&data::ExponentialBucket> for ExponentialBucket { - fn from(b: &data::ExponentialBucket) -> Self { - ExponentialBucket { - offset: b.offset, - bucket_counts: b.counts.clone(), - } - } -} - -#[derive(Serialize, Debug, Clone)] -struct ExponentialBucket { - offset: i32, - bucket_counts: Vec, -} - -#[derive(Serialize, Debug, Clone)] -#[serde(rename_all = "camelCase")] -struct Exemplar { - filtered_attributes: Vec, - #[serde(serialize_with = "as_unix_nano")] - time_unix_nano: SystemTime, - #[serde(serialize_with = "as_human_readable")] - time: SystemTime, - value: DataValue, - span_id: String, - trace_id: String, -} - -impl + Copy> From<&data::Exemplar> for Exemplar { - fn from(value: &data::Exemplar) -> Self { - Exemplar { - filtered_attributes: value.filtered_attributes.iter().map(Into::into).collect(), - time_unix_nano: value.time, - time: value.time, - value: value.value.into(), - span_id: format!("{:016x}", u64::from_be_bytes(value.span_id)), - trace_id: format!("{:032x}", u128::from_be_bytes(value.trace_id)), - } - } -} diff --git a/opentelemetry-stdout/src/trace/exporter.rs b/opentelemetry-stdout/src/trace/exporter.rs index c4d319ff31..a5c057ca46 100644 --- a/opentelemetry-stdout/src/trace/exporter.rs +++ b/opentelemetry-stdout/src/trace/exporter.rs @@ -1,19 +1,17 @@ +use chrono::{DateTime, Utc}; use core::fmt; use futures_util::future::BoxFuture; -use opentelemetry::trace::{TraceError, TraceResult}; +use opentelemetry::trace::TraceError; use opentelemetry_sdk::export::{self, trace::ExportResult}; -use std::io::{stdout, Write}; +use std::sync::atomic; -use crate::trace::transform::SpanData; use opentelemetry_sdk::resource::Resource; -type Encoder = Box TraceResult<()> + Send + Sync>; - -/// An OpenTelemetry exporter that writes to stdout on export. +/// An OpenTelemetry exporter that writes Spans to stdout on export. pub struct SpanExporter { - writer: Option>, - encoder: Encoder, resource: Resource, + is_shutdown: atomic::AtomicBool, + resource_emitted: bool, } impl fmt::Debug for SpanExporter { @@ -22,38 +20,47 @@ impl fmt::Debug for SpanExporter { } } -impl SpanExporter { - /// Create a builder to configure this exporter. - pub fn builder() -> SpanExporterBuilder { - SpanExporterBuilder::default() - } -} - impl Default for SpanExporter { fn default() -> Self { - SpanExporterBuilder::default().build() + SpanExporter { + resource: Resource::default(), + is_shutdown: atomic::AtomicBool::new(false), + resource_emitted: false, + } } } impl opentelemetry_sdk::export::trace::SpanExporter for SpanExporter { + /// Write Spans to stdout fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult> { - let res = if let Some(writer) = &mut self.writer { - (self.encoder)(writer, crate::trace::SpanData::new(batch, &self.resource)).and_then( - |_| { - writer - .write_all(b"\n") - .map_err(|err| TraceError::Other(Box::new(err))) - }, - ) + if self.is_shutdown.load(atomic::Ordering::SeqCst) { + Box::pin(std::future::ready(Err(TraceError::from( + "exporter is shut down", + )))) } else { - Err("exporter is shut down".into()) - }; + println!("Spans"); + if self.resource_emitted { + print_spans(batch); + } else { + self.resource_emitted = true; + println!("Resource"); + if let Some(schema_url) = self.resource.schema_url() { + println!("\tResource SchemaUrl: {:?}", schema_url); + } - Box::pin(std::future::ready(res)) + self.resource.iter().for_each(|(k, v)| { + println!("\t -> {}={:?}", k, v); + }); + + print_spans(batch); + } + + Box::pin(std::future::ready(Ok(()))) + } } fn shutdown(&mut self) { - self.writer.take(); + self.is_shutdown.store(true, atomic::Ordering::SeqCst); } fn set_resource(&mut self, res: &opentelemetry_sdk::Resource) { @@ -61,67 +68,81 @@ impl opentelemetry_sdk::export::trace::SpanExporter for SpanExporter { } } -/// Configuration for the stdout trace exporter -#[derive(Default)] -pub struct SpanExporterBuilder { - writer: Option>, - encoder: Option, -} - -impl fmt::Debug for SpanExporterBuilder { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("SpanExporterBuilder") - } -} +fn print_spans(batch: Vec) { + for (i, span) in batch.into_iter().enumerate() { + println!("Span #{}", i); + println!("\tInstrumentation Scope"); + println!("\t\tName : {:?}", &span.instrumentation_lib.name); + if let Some(version) = &span.instrumentation_lib.version { + println!("\t\tVersion : {:?}", version); + } + if let Some(schema_url) = &span.instrumentation_lib.schema_url { + println!("\t\tSchemaUrl: {:?}", schema_url); + } + span.instrumentation_lib + .attributes + .iter() + .enumerate() + .for_each(|(index, kv)| { + if index == 0 { + println!("\t\tScope Attributes:"); + } + println!("\t\t\t -> {}: {}", kv.key, kv.value); + }); -impl SpanExporterBuilder { - /// Set the writer that the exporter will write to - /// - /// # Examples - /// - /// ``` - /// use opentelemetry_stdout::SpanExporterBuilder; - /// - /// let buffer = Vec::new(); // Any type that implements `Write` - /// let exporter = SpanExporterBuilder::default().with_writer(buffer).build(); - /// ``` - pub fn with_writer(mut self, writer: impl Write + Send + Sync + 'static) -> Self { - self.writer = Some(Box::new(writer)); - self - } + println!(); + println!("\tName : {}", &span.name); + println!("\tTraceId : {}", &span.span_context.trace_id()); + println!("\tSpanId : {}", &span.span_context.span_id()); + println!("\tParentSpanId: {}", &span.parent_span_id); + println!("\tKind : {:?}", &span.span_kind); - /// Set the encoder that this exporter will use - /// - /// # Examples - /// - /// ``` - /// use opentelemetry_stdout::SpanExporterBuilder; - /// - /// let exporter = SpanExporterBuilder::default() - /// // Can be any function that can write formatted data - /// // serde ecosystem crates for example provide such functions - /// .with_encoder(|writer, data| Ok(serde_json::to_writer_pretty(writer, &data).unwrap())) - /// .build(); - /// ``` - pub fn with_encoder( - mut self, - writer: impl Fn(&mut dyn Write, SpanData) -> TraceResult<()> + Send + Sync + 'static, - ) -> Self { - self.encoder = Some(Box::new(writer)); - self - } + let datetime: DateTime = span.start_time.into(); + println!("\tStart time: {}", datetime.format("%Y-%m-%d %H:%M:%S%.6f")); + let datetime: DateTime = span.end_time.into(); + println!("\tEnd time: {}", datetime.format("%Y-%m-%d %H:%M:%S%.6f")); + println!("\tStatus: {:?}", &span.status); - /// Create a span exporter with the current configuration - pub fn build(self) -> SpanExporter { - SpanExporter { - writer: Some(self.writer.unwrap_or_else(|| Box::new(stdout()))), - resource: Resource::empty(), - encoder: self.encoder.unwrap_or_else(|| { - Box::new(|writer, spans| { - serde_json::to_writer(writer, &spans) - .map_err(|err| TraceError::Other(Box::new(err))) - }) - }), + let mut print_header = true; + for kv in span.attributes.iter() { + if print_header { + println!("\tAttributes:"); + print_header = false; + } + println!("\t\t -> {}: {:?}", kv.key, kv.value); } + + span.events.iter().enumerate().for_each(|(index, event)| { + if index == 0 { + println!("\tEvents:"); + } + println!("\tEvent #{}", index); + println!("\tName : {}", event.name); + let datetime: DateTime = event.timestamp.into(); + println!("\tTimestamp : {}", datetime.format("%Y-%m-%d %H:%M:%S%.6f")); + + event.attributes.iter().enumerate().for_each(|(index, kv)| { + if index == 0 { + println!("\tAttributes:"); + } + println!("\t\t -> {}: {:?}", kv.key, kv.value); + }); + }); + + span.links.iter().enumerate().for_each(|(index, link)| { + if index == 0 { + println!("\tLinks:"); + } + println!("\tLink #{}", index); + println!("\tTraceId: {}", link.span_context.trace_id()); + println!("\tSpanId : {}", link.span_context.span_id()); + + link.attributes.iter().enumerate().for_each(|(index, kv)| { + if index == 0 { + println!("\tAttributes:"); + } + println!("\t\t -> {}: {:?}", kv.key, kv.value); + }); + }); } } diff --git a/opentelemetry-stdout/src/trace/mod.rs b/opentelemetry-stdout/src/trace/mod.rs index 418c5c2904..cf224ca510 100644 --- a/opentelemetry-stdout/src/trace/mod.rs +++ b/opentelemetry-stdout/src/trace/mod.rs @@ -1,5 +1,3 @@ mod exporter; -mod transform; pub use exporter::*; -pub use transform::*; diff --git a/opentelemetry-stdout/src/trace/transform.rs b/opentelemetry-stdout/src/trace/transform.rs deleted file mode 100644 index 484a222f42..0000000000 --- a/opentelemetry-stdout/src/trace/transform.rs +++ /dev/null @@ -1,232 +0,0 @@ -use crate::common::{as_human_readable, as_unix_nano, AttributeSet, KeyValue, Resource, Scope}; -use serde::{Serialize, Serializer}; -use std::{borrow::Cow, collections::HashMap, time::SystemTime}; - -/// Transformed trace data that can be serialized -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct SpanData { - resource_spans: Vec, -} - -impl SpanData { - pub(crate) fn new( - sdk_spans: Vec, - sdk_resource: &opentelemetry_sdk::Resource, - ) -> Self { - let mut resource_spans = HashMap::::new(); - for sdk_span in sdk_spans { - let resource_schema_url = sdk_resource.schema_url().map(|s| s.to_string().into()); - let schema_url = sdk_span.instrumentation_lib.schema_url.clone(); - let scope = sdk_span.instrumentation_lib.clone().into(); - let resource: Resource = sdk_resource.into(); - - let rs = resource_spans - .entry(sdk_resource.into()) - .or_insert_with(move || ResourceSpans { - resource, - scope_spans: Vec::with_capacity(1), - schema_url: resource_schema_url, - }); - - match rs.scope_spans.iter_mut().find(|ss| ss.scope == scope) { - Some(ss) => ss.spans.push(sdk_span.into()), - None => rs.scope_spans.push(ScopeSpans { - scope, - spans: vec![sdk_span.into()], - schema_url, - }), - }; - } - - SpanData { - resource_spans: resource_spans.into_values().collect(), - } - } -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -struct ResourceSpans { - resource: Resource, - scope_spans: Vec, - #[serde(skip_serializing_if = "Option::is_none")] - schema_url: Option>, -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -struct ScopeSpans { - scope: Scope, - spans: Vec, - #[serde(skip_serializing_if = "Option::is_none")] - schema_url: Option>, -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -struct Span { - trace_id: String, - span_id: String, - #[serde(skip_serializing_if = "Option::is_none")] - trace_state: Option, - parent_span_id: String, - name: Cow<'static, str>, - kind: SpanKind, - #[serde(serialize_with = "as_unix_nano")] - start_time_unix_nano: SystemTime, - #[serde(serialize_with = "as_human_readable")] - start_time: SystemTime, - #[serde(serialize_with = "as_unix_nano")] - end_time_unix_nano: SystemTime, - #[serde(serialize_with = "as_human_readable")] - end_time: SystemTime, - attributes: Vec, - dropped_attributes_count: u32, - #[serde(skip_serializing_if = "Vec::is_empty")] - events: Vec, - dropped_events_count: u32, - flags: u32, - #[serde(skip_serializing_if = "Vec::is_empty")] - links: Vec, - dropped_links_count: u32, - status: Status, -} - -impl From for Span { - fn from(value: opentelemetry_sdk::export::trace::SpanData) -> Self { - Span { - trace_id: value.span_context.trace_id().to_string(), - span_id: value.span_context.span_id().to_string(), - trace_state: Some(value.span_context.trace_state().header()).filter(|s| !s.is_empty()), - parent_span_id: Some(value.parent_span_id.to_string()) - .filter(|s| s != "0") - .unwrap_or_default(), - name: value.name, - kind: value.span_kind.into(), - start_time_unix_nano: value.start_time, - start_time: value.start_time, - end_time_unix_nano: value.end_time, - end_time: value.end_time, - dropped_attributes_count: value.dropped_attributes_count, - attributes: value.attributes.into_iter().map(Into::into).collect(), - dropped_events_count: value.events.dropped_count, - flags: value.span_context.trace_flags().to_u8() as u32, - events: value.events.into_iter().map(Into::into).collect(), - dropped_links_count: value.links.dropped_count, - links: value.links.iter().map(Into::into).collect(), - status: value.status.into(), - } - } -} - -#[derive(Debug, Clone, Copy)] -enum SpanKind { - #[allow(dead_code)] - Unspecified = 0, - Internal = 1, - Server = 2, - Client = 3, - Producer = 4, - Consumer = 5, -} - -impl Serialize for SpanKind { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - serializer.serialize_u8(*self as u32 as u8) - } -} - -impl From for SpanKind { - fn from(value: opentelemetry::trace::SpanKind) -> Self { - match value { - opentelemetry::trace::SpanKind::Client => SpanKind::Client, - opentelemetry::trace::SpanKind::Server => SpanKind::Server, - opentelemetry::trace::SpanKind::Producer => SpanKind::Producer, - opentelemetry::trace::SpanKind::Consumer => SpanKind::Consumer, - opentelemetry::trace::SpanKind::Internal => SpanKind::Internal, - } - } -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -struct Event { - name: Cow<'static, str>, - attributes: Vec, - dropped_attributes_count: u32, - #[serde(serialize_with = "as_unix_nano")] - time_unix_nano: SystemTime, - #[serde(serialize_with = "as_human_readable")] - time: SystemTime, -} - -impl From for Event { - fn from(value: opentelemetry::trace::Event) -> Self { - Event { - name: value.name, - attributes: value.attributes.into_iter().map(Into::into).collect(), - dropped_attributes_count: value.dropped_attributes_count, - time_unix_nano: value.timestamp, - time: value.timestamp, - } - } -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -struct Link { - trace_id: String, - span_id: String, - #[serde(skip_serializing_if = "Option::is_none")] - trace_state: Option, - attributes: Vec, - dropped_attributes_count: u32, -} - -impl From<&opentelemetry::trace::Link> for Link { - fn from(value: &opentelemetry::trace::Link) -> Self { - Link { - trace_id: value.span_context.trace_id().to_string(), - span_id: value.span_context.span_id().to_string(), - trace_state: Some(value.span_context.trace_state().header()).filter(|s| !s.is_empty()), - attributes: value.attributes.iter().map(Into::into).collect(), - dropped_attributes_count: value.dropped_attributes_count, - } - } -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -struct Status { - #[serde(skip_serializing_if = "Option::is_none")] - message: Option>, - #[serde(skip_serializing_if = "is_zero")] - code: u32, -} - -fn is_zero(v: &u32) -> bool { - *v == 0 -} - -impl From for Status { - fn from(value: opentelemetry::trace::Status) -> Self { - match value { - opentelemetry::trace::Status::Unset => Status { - message: None, - code: 0, - }, - opentelemetry::trace::Status::Error { description } => Status { - message: Some(description), - code: 2, - }, - opentelemetry::trace::Status::Ok => Status { - message: None, - code: 1, - }, - } - } -}