Skip to content

Commit

Permalink
Add tests to validate spatial aggregation (open-telemetry#1416)
Browse files Browse the repository at this point in the history
  • Loading branch information
cijothomas authored Nov 30, 2023
1 parent 3594247 commit 0862af1
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 1 deletion.
166 changes: 166 additions & 0 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,4 +262,170 @@ mod tests {
"View rename of unit should be ignored and original unit retained."
);
}

// "multi_thread" tokio flavor must be used else flush won't
// be able to make progress!
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[ignore = "Spatial aggregation is not yet implemented."]
async fn spatial_aggregation_when_view_drops_attributes_observable_counter() {
// cargo test spatial_aggregation_when_view_drops_attributes_observable_counter --features=metrics,testing

// Arrange
let exporter = InMemoryMetricsExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
let criteria = Instrument::new().name("my_observable_counter");
// View drops all attributes.
let stream_invalid_aggregation = Stream::new().allowed_attribute_keys(vec![]);

let view =
new_view(criteria, stream_invalid_aggregation).expect("Expected to create a new view");
let meter_provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_view(view)
.build();

// Act
let meter = meter_provider.meter("test");
let observable_counter = meter.u64_observable_counter("my_observable_counter").init();

// Normally, these callbacks would generate 3 time-series, but since the view
// drops all attributes, we expect only 1 time-series.
meter
.register_callback(&[observable_counter.as_any()], move |observer| {
observer.observe_u64(
&observable_counter,
100,
[
KeyValue::new("statusCode", "200"),
KeyValue::new("verb", "get"),
]
.as_ref(),
);

observer.observe_u64(
&observable_counter,
100,
[
KeyValue::new("statusCode", "200"),
KeyValue::new("verb", "post"),
]
.as_ref(),
);

observer.observe_u64(
&observable_counter,
100,
[
KeyValue::new("statusCode", "500"),
KeyValue::new("verb", "get"),
]
.as_ref(),
);
})
.expect("Expected to register callback");

meter_provider.force_flush().unwrap();

// Assert
let resource_metrics = exporter
.get_finished_metrics()
.expect("metrics are expected to be exported.");
assert!(!resource_metrics.is_empty());
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
assert_eq!(metric.name, "my_observable_counter",);

let sum = metric
.data
.as_any()
.downcast_ref::<data::Sum<u64>>()
.expect("Sum aggregation expected for ObservableCounter instruments by default");

// Expecting 1 time-series only, as the view drops all attributes resulting
// in a single time-series.
// This is failing today, due to lack of support for spatial aggregation.
assert_eq!(sum.data_points.len(), 1);

// find and validate the single datapoint
let data_point = &sum.data_points[0];
assert_eq!(data_point.value, 300);
}

// "multi_thread" tokio flavor must be used else flush won't
// be able to make progress!
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[ignore = "Spatial aggregation is not yet implemented."]
async fn spatial_aggregation_when_view_drops_attributes_counter() {
// cargo test spatial_aggregation_when_view_drops_attributes_counter --features=metrics,testing

// Arrange
let exporter = InMemoryMetricsExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
let criteria = Instrument::new().name("my_counter");
// View drops all attributes.
let stream_invalid_aggregation = Stream::new().allowed_attribute_keys(vec![]);

let view =
new_view(criteria, stream_invalid_aggregation).expect("Expected to create a new view");
let meter_provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_view(view)
.build();

// Act
let meter = meter_provider.meter("test");
let counter = meter.u64_counter("my_counter").init();

// Normally, this would generate 3 time-series, but since the view
// drops all attributes, we expect only 1 time-series.
counter.add(
10,
[
KeyValue::new("statusCode", "200"),
KeyValue::new("verb", "Get"),
]
.as_ref(),
);

counter.add(
10,
[
KeyValue::new("statusCode", "500"),
KeyValue::new("verb", "Get"),
]
.as_ref(),
);

counter.add(
10,
[
KeyValue::new("statusCode", "200"),
KeyValue::new("verb", "Post"),
]
.as_ref(),
);

meter_provider.force_flush().unwrap();

// Assert
let resource_metrics = exporter
.get_finished_metrics()
.expect("metrics are expected to be exported.");
assert!(!resource_metrics.is_empty());
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
assert_eq!(metric.name, "my_counter",);

let sum = metric
.data
.as_any()
.downcast_ref::<data::Sum<u64>>()
.expect("Sum aggregation expected for Counter instruments by default");

// Expecting 1 time-series only, as the view drops all attributes resulting
// in a single time-series.
// This is failing today, due to lack of support for spatial aggregation.
assert_eq!(sum.data_points.len(), 1);
// find and validate the single datapoint
let data_point = &sum.data_points[0];
assert_eq!(data_point.value, 30);
}
}
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/metrics/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ pub fn new_view(criteria: Instrument, mask: Stream) -> Result<Box<dyn View>> {
Ok(_) => agg = Some(ma.clone()),
Err(err) => {
global::handle_error(MetricsError::Other(format!(
"{}, Proceeding as if View did not exist. criteria: {:?}, mask: {:?}",
"{}, proceeding as if view did not exist. criteria: {:?}, mask: {:?}",
err, err_msg_criteria, mask
)));
return Ok(Box::new(empty_view));
Expand Down

0 comments on commit 0862af1

Please sign in to comment.