Skip to content

Commit

Permalink
Rust telemetry to JSON files, instead of OTLP/gRPC (#79)
Browse files Browse the repository at this point in the history
Lots going on here, apologies...

- Write telemetry to file, instead of exporting via OTLP/gRPC via [opentelemetry-otlp](https://crates.io/crates/opentelemetry-otlp/) crate.
  - Reasons for change:
    - Exporting via OTLP/gRPC was impacting noticeably performance. I tried to improve this by delaying any actual work until the benchmark run was complete (increasing buffer sizes, manually flushing buffers), but it still slowed max throughput from ~70 -> ~50 Gb/s.
    - It's just much simpler to spit out a file
  - Add our own telemetry exporter code, that writes [OpenTelemetry Protocol](https://opentelemetry.io/docs/specs/otel/protocol/) (OTLP) in JSON format.
      - We started with code from an old version of the `opentelemetry-stdout` crate. Specifically, we took the commit just before the [Simplify LogExporter::Export interface](open-telemetry/opentelemetry-rust@3193320) change. This change removed JSON export, so the code would be simpler for others to copy/paste. But we wanted JSON export, so copy/pasting from that point.
      - Modify the exporter so it doesn't do ANY work (buffering up all spans) until `flush_to_file(filename)` is called.
        - Hopefully, this eliminates the performance impact of gathering telemetry
        - This gives us a unique file per run. It's much nicer to analyze a run on its own, vs trying to isolate run 5 of 10 within an enormous file.
        - File is named like `trace_20241009T185957Z_download-30GiB-1x-ram_run01.json`
- Add python scripts to visualize the telemetry data
    - Start with `allspans.py` which graphs ALL tracing spans, each in its own row, similar to tools like [Jaeger](https://github.com/jaegertracing/jaeger-ui)
        - Draw with [`plotly.express.timeline()`](https://plotly.com/python/gantt/).
            - `plotly` was recommended by ChatGPT, I asked for a python graphing library where you could hide information until you mouse-over it. I also specified that files should be easy to share, and `plotly` generates a single HTML file.
        - The figure could use more work, but it's a start
    - More visualizations coming...
  • Loading branch information
graebm authored Oct 15, 2024
1 parent 6780bee commit 71be44d
Show file tree
Hide file tree
Showing 14 changed files with 1,151 additions and 399 deletions.
518 changes: 192 additions & 326 deletions runners/s3-benchrunner-rust/Cargo.lock

Large diffs are not rendered by default.

19 changes: 10 additions & 9 deletions runners/s3-benchrunner-rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,25 @@ edition = "2021"
aws-s3-transfer-manager = { git = "https://github.com/awslabs/aws-s3-transfer-manager-rs.git", rev = "3dd20c8aa0872352100cf456beee02bfc53c73d1" }
# aws-s3-transfer-manager = { path = "../../../aws-s3-transfer-manager-rs/aws-s3-transfer-manager" }

# tracing-opentelemetry 0.26.0 is a bit broken (see https://github.com/tokio-rs/tracing-opentelemetry/issues/159)
# so use 0.24.0 and the exact opentelemetry-* versions it depends on.
tracing-opentelemetry = "0.24.0"
opentelemetry = { version = "0.23", features = ["trace", "metrics"] }
opentelemetry_sdk = { version = "0.23", default-features = false, features = ["trace", "rt-tokio"] }
opentelemetry-otlp = { version = "0.16", features = ["metrics"] }
opentelemetry-semantic-conventions = "0.15.0"
tracing-opentelemetry = "0.27"
opentelemetry = { version = "0.26", features = ["trace"] }
opentelemetry_sdk = { version = "0.26", default-features = false, features = ["trace", "rt-tokio"] }
opentelemetry-stdout = { version = "0.26", features = ["trace"] }
opentelemetry-semantic-conventions = "0.26"

anyhow = "1.0.86"
async-trait = "0.1.81"
aws-config = "1.5.4"
aws-sdk-s3 = "1.41.0"
bytes = "1"
chrono = "0.4.38"
clap = { version = "4.5.9", features = ["derive"] }
fastrand = "=2.1.0"
futures-util = "0.3"
ordered-float = "4.3.0"
serde = { version = "1.0.204", features = ["derive"] }
serde_json = "1.0.120"
thiserror = "1.0.62"
tokio = { version = "1.38.1", features = ["io-util"] }
tokio = { version = "1.40.0", features = ["io-util"] }
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
17 changes: 14 additions & 3 deletions runners/s3-benchrunner-rust/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,19 @@ See further instructions [here](../../README.md#run-a-benchmark).

### Viewing Telemetry

Use the `--telemetry` flag to export OpenTelemetry data to http://localhost:4317 as OTLP/gRPC payloads.
Use the `--telemetry` flag to export OpenTelemetry Protocol data as a `trace_*.json` files in the working directory.

The simplest way I know collect and view this data is with [Jaeger All in One](https://www.jaegertracing.io/docs/latest/getting-started/) or [otel-desktop-viewer](https://github.com/CtrlSpice/otel-desktop-viewer?tab=readme-ov-file#getting-started). Get one of these running, run the benchmark with the `--telemetry` flag, then view the data in your browser.
Run `graph.py`:
```sh
usage: graph.py [-h] TRACE_JSON

Graph a benchmark run

positional arguments:
TRACE_JSON trace_*.json file to graph.

options:
-h, --help show this help message and exit
```

TODO: document how to collect and view data from a non-local run.
View new `trace_*.html` file, in same directory as `trace_*.json` file.
40 changes: 40 additions & 0 deletions runners/s3-benchrunner-rust/graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import argparse
import json
from pathlib import Path

from graph import PerfTimer
import graph.allspans

PARSER = argparse.ArgumentParser(description="Graph a benchmark run")

# File contains JSON representation of OTLP TracesData.
# Contents look like:
# {"resourceSpans":[
# {"resource": {"attributes":[{"key":"service.name","value":{"stringValue":"s3-benchrunner-rust"}}, ...]},
# "scopeSpans":[
# {"scope":{"name":"s3-benchrunner-rust"},
# "spans":[
# {"traceId":"0e506aee98c24b869337620977f30cbb","spanId":"6fb4c16d1d1652d6", ...},
# {"traceId":"0e506aee98c24b869337620977f30cbb","spanId":"6440f82fb6fc6299", ...},
# ...
#
# Official protobuf format specified here:
# https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto
#
# Note that when proto data is mapped to JSON, snake_case names become camelCase
# see: https://protobuf.dev/programming-guides/proto3/#json
PARSER.add_argument('TRACE_JSON', help="trace_*.json file to graph.")

args = PARSER.parse_args()

with PerfTimer(f'Open {args.TRACE_JSON}'):
with open(args.TRACE_JSON) as f:
traces_data = json.load(f)

with PerfTimer('Graph all spans'):
fig = graph.allspans.draw(traces_data)

html_path = Path(args.TRACE_JSON).with_suffix('.html')
with PerfTimer(f'Write {html_path}'):
fig.write_html(html_path)
16 changes: 16 additions & 0 deletions runners/s3-benchrunner-rust/graph/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import time


class PerfTimer:
"""Context manager that prints how long a `with` statement took"""

def __init__(self, name):
self.name = name

def __enter__(self):
self.start = time.perf_counter()

def __exit__(self, exc_type, exc_value, traceback):
if exc_type is None:
end = time.perf_counter()
print(f"{self.name}: {end - self.start:.3f} sec")
160 changes: 160 additions & 0 deletions runners/s3-benchrunner-rust/graph/allspans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
from collections import defaultdict
import pandas as pd # type: ignore
import plotly.express as px # type: ignore


def draw(data):
# gather all spans into a single list
spans = []
for resource_span in data['resourceSpans']:
for scope_span in resource_span['scopeSpans']:
spans.extend(scope_span['spans'])

# simplify attributes of each span to be simple dict
for span in spans:
span['attributes'] = _simplify_attributes(span['attributes'])

# sort spans according to parent-child hierarchy
spans = _sort_spans_by_hierarchy(spans)

# prepare columns for plotly
columns = defaultdict(list)
name_count = defaultdict(int)
for (idx, span) in enumerate(spans):

name = span['name']
# we want each span in its own row, so assign a unique name and use that as Y value
# TODO: improve unique name, using "seq" or "part-num"
name_count[name] += 1
unique_name = f"{name}#{name_count[name]}"

columns['Name'].append(name)
columns['Unique Name'].append(unique_name)
columns['Duration (ns)'].append(
span['endTimeUnixNano'] - span['startTimeUnixNano'])
columns['Start Time'].append(pd.to_datetime(span['startTimeUnixNano']))
columns['End Time'].append(pd.to_datetime(span['endTimeUnixNano']))
columns['Index'].append(idx)
columns['Span ID'].append(span['spanId'])
columns['Parent ID'].append(span['parentSpanId'])
columns['Attributes'].append(
[f"<br> {k}: {v}" for (k, v) in span['attributes'].items()])

# if a span name occurs only once, remove the "#1" from its unique name
for (i, name) in enumerate(columns['Name']):
if name_count[name] == 1:
columns['Unique Name'][i] = name

df = pd.DataFrame(columns)

# By default, show all columns in hover text.
# Omit a column by setting false. You can also set special formatting rules here.
hover_data = {col: True for col in columns.keys()}
hover_data['Name'] = False # already shown
hover_data['Unique Name'] = False # already shown
hover_data['End Time'] = False # who cares

fig = px.timeline(
data_frame=df,
x_start='Start Time',
x_end='End Time',
y='Unique Name',
hover_data=hover_data,
# spans with same original name get same color
# TODO: combine name with code.namespace, in case same name used in multiple places
color='Name',
# force ordering, otherwise plotly will group by 'color'
category_orders={'Unique Name': df['Unique Name']},
)

# if there are lots of rows, ensure they're not drawn too small
num_rows = len(spans)
if num_rows > 20:
preferred_total_height = 800
min_row_height = 3
row_height = preferred_total_height / num_rows
row_height = int(max(min_row_height, row_height))
height = num_rows * row_height
# don't show yaxis labels if they're so squished that some are omitted
show_yaxis_labels = row_height >= 15
else:
# otherwise auto-height
height = None
show_yaxis_labels = True

fig.update_layout(
title="All Benchmark Spans",
xaxis_title="Time",
yaxis_title="Span Name",
height=height,
yaxis=dict(
showticklabels=show_yaxis_labels,
),
hovermode='y unified', # show hover if mouse anywhere in row
)

return fig


def _sort_spans_by_hierarchy(spans):
# map from ID to span
id_to_span = {}
# map from parent ID to to child span IDs
parent_to_child_ids = defaultdict(list)
for span in spans:
id = span['spanId']
id_to_span[id] = span

parent_id = span['parentSpanId']
parent_to_child_ids[parent_id].append(id)

# sort spans in depth-first order, by crawling the parent/child tree starting at root
sorted_spans = []
# ids_to_process is FIFO
# With each loop, we pop the last item in ids_to_process
# and then append its children, so that we process them next.
ids_to_process = ['0000000000000000']
while ids_to_process:
id = ids_to_process.pop(-1)
if id in parent_to_child_ids:
child_ids = parent_to_child_ids[id]
# sorted by start time, but reversed because we pop from the BACK of ids_to_process
child_ids = sorted(
child_ids, key=lambda x: id_to_span[x]['startTimeUnixNano'], reverse=True)
ids_to_process.extend(child_ids)

if id in id_to_span:
sorted_spans.append(id_to_span[id])

# warn if any spans are missing
if (num_leftover := len(spans) - len(sorted_spans)):
print(f"WARNING: {num_leftover} spans not shown (missing parents)")

return sorted_spans


# Transform attributes from like:
# [
# {"key": "code.namespace", "value": {"stringValue": "s3_benchrunner_rust::transfer_manager"}},
# {"key": "code.lineno", "value": {"intValue": 136}}
# ]
# To like:
# {
# "code.namespace": "s3_benchrunner_rust::transfer_manager",
# "code.lineno": 136,
# }
def _simplify_attributes(attributes_list):
simple_dict = {}
for attr in attributes_list:
key = attr['key']
# extract actual value, ignoring value's key which looks like "intValue"
value = next(iter(attr['value'].values()))

# trim down long filepaths by omitting everything before "src/"
if key == 'code.filepath':
if (src_idx := value.find("src/")) > 0:
value = value[src_idx:]

simple_dict[key] = value

return simple_dict
61 changes: 44 additions & 17 deletions runners/s3-benchrunner-rust/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use clap::{Parser, ValueEnum};
use std::process::exit;
use std::time::Instant;
use tracing::{self, info_span, instrument, Instrument};
use tracing::{info_span, Instrument};

use s3_benchrunner_rust::{
bytes_to_gigabits, prepare_run, telemetry, BenchmarkConfig, Result, RunBenchmark,
Expand Down Expand Up @@ -37,16 +37,6 @@ enum S3ClientId {
async fn main() {
let args = Args::parse();

let _telemetry_guard = if args.telemetry {
// If emitting telemetry, set that up as tracing_subscriber.
Some(telemetry::init_tracing_subscriber().unwrap())
} else {
// Otherwise, set the default subscriber,
// which prints to stdout if env-var set like RUST_LOG=trace
tracing_subscriber::fmt::init();
None
};

let result = execute(&args).await;
if let Err(e) = result {
match e.downcast_ref::<SkipBenchmarkError>() {
Expand All @@ -61,31 +51,52 @@ async fn main() {
}
}

#[instrument(name = "main")]
async fn execute(args: &Args) -> Result<()> {
let mut telemetry = if args.telemetry {
// If emitting telemetry, set that up as tracing_subscriber.
Some(telemetry::init_tracing_subscriber().unwrap())
} else {
// Otherwise, set the default subscriber,
// which prints to stdout if env-var set like RUST_LOG=trace
tracing_subscriber::fmt::init();
None
};

// create appropriate benchmark runner
let runner = new_runner(args).await?;

let workload = &runner.config().workload;
let workload_name = workload_name(&args.workload);
let bytes_per_run: u64 = workload.tasks.iter().map(|x| x.size).sum();
let gigabits_per_run = bytes_to_gigabits(bytes_per_run);

// repeat benchmark until we exceed max_repeat_count or max_repeat_secs
let app_start = Instant::now();
for run_i in 0..workload.max_repeat_count {
for run_num in 1..=workload.max_repeat_count {
prepare_run(workload)?;

let run_start = Instant::now();
let run_start_datetime = chrono::Utc::now();
let run_start = Instant::now(); // high resolution

runner
.run()
.instrument(info_span!("run", i = run_i))
.instrument(info_span!("run", num = run_num, workload = workload_name))
.await?;

let run_secs = run_start.elapsed().as_secs_f64();
println!(

// flush any telemetry
if let Some(telemetry) = &mut telemetry {
telemetry.flush_to_file(&trace_file_name(
workload_name,
&run_start_datetime,
run_num,
));
}

eprintln!(
"Run:{} Secs:{:.6} Gb/s:{:.6}",
run_i + 1,
run_num,
run_secs,
gigabits_per_run / run_secs
);
Expand Down Expand Up @@ -114,3 +125,19 @@ async fn new_runner(args: &Args) -> Result<Box<dyn RunBenchmark>> {
}
}
}

// Given "path/to/my-workload.run.json" return "my-workload"
fn workload_name(path: &str) -> &str {
let filename = path.rsplit('/').next().unwrap_or(path);
let without_extension = filename.split('.').next().unwrap_or(filename);
without_extension
}

fn trace_file_name(
workload: &str,
run_start: &chrono::DateTime<chrono::Utc>,
run_num: u32,
) -> String {
let run_start = run_start.format("%Y%m%dT%H%M%SZ").to_string();
format!("trace_{run_start}_{workload}_run{run_num:02}.json")
}
Loading

0 comments on commit 71be44d

Please sign in to comment.