Skip to content

Commit

Permalink
Improve rust tracing spans
Browse files Browse the repository at this point in the history
  • Loading branch information
graebm committed Oct 18, 2024
1 parent 71be44d commit f34e3ac
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 27 deletions.
2 changes: 1 addition & 1 deletion runners/s3-benchrunner-rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion runners/s3-benchrunner-rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2021"
[dependencies]

# Swap which line is commented-out to use GitHub or local aws-s3-transfer-manager
aws-s3-transfer-manager = { git = "https://github.com/awslabs/aws-s3-transfer-manager-rs.git", rev = "3dd20c8aa0872352100cf456beee02bfc53c73d1" }
aws-s3-transfer-manager = { git = "https://github.com/awslabs/aws-s3-transfer-manager-rs.git", rev = "33031d9945cd9961bb5e1b5207ac8870b0a9dbbd" }
# aws-s3-transfer-manager = { path = "../../../aws-s3-transfer-manager-rs/aws-s3-transfer-manager" }

tracing-opentelemetry = "0.27"
Expand Down
15 changes: 12 additions & 3 deletions runners/s3-benchrunner-rust/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,22 @@

args = PARSER.parse_args()

with PerfTimer(f'Open {args.TRACE_JSON}'):
with open(args.TRACE_JSON) as f:
trace_json = Path(args.TRACE_JSON)

# if directory passed in, pick the newest file
if trace_json.is_dir():
all_traces = list(trace_json.glob('trace_*.json'))
if len(all_traces) == 0:
exit(f"No trace_*.json found under: {trace_json.absolute()}")
trace_json = sorted(all_traces, key=lambda x: x.stat().st_mtime)[-1]

with PerfTimer(f'Open {trace_json}'):
with open(trace_json) as f:
traces_data = json.load(f)

with PerfTimer('Graph all spans'):
fig = graph.allspans.draw(traces_data)

html_path = Path(args.TRACE_JSON).with_suffix('.html')
html_path = Path(trace_json).with_suffix('.allspans.html')
with PerfTimer(f'Write {html_path}'):
fig.write_html(html_path)
27 changes: 18 additions & 9 deletions runners/s3-benchrunner-rust/graph/allspans.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,29 @@ def draw(data):
columns = defaultdict(list)
name_count = defaultdict(int)
for (idx, span) in enumerate(spans):

name = span['name']
# we want each span in its own row, so assign a unique name and use that as Y value
# TODO: improve unique name, using "seq" or "part-num"
name_count[name] += 1
unique_name = f"{name}#{name_count[name]}"

start_time_ns = span['startTimeUnixNano']
end_time_ns = span['endTimeUnixNano']
duration_ns = end_time_ns - start_time_ns
# ensure span is wide enough to see
visual_end_time_ns = start_time_ns + max(duration_ns, 50_000_000)

columns['Name'].append(name)
columns['Unique Name'].append(unique_name)
columns['Duration (ns)'].append(
span['endTimeUnixNano'] - span['startTimeUnixNano'])
columns['Start Time'].append(pd.to_datetime(span['startTimeUnixNano']))
columns['End Time'].append(pd.to_datetime(span['endTimeUnixNano']))
columns['Start Time'].append(pd.to_datetime(start_time_ns))
columns['End Time'].append(pd.to_datetime(end_time_ns))
columns['Visual End Time'].append(pd.to_datetime(visual_end_time_ns))
columns['Duration (secs)'].append(duration_ns / 1_000_000_000.0)
columns['Index'].append(idx)
columns['Span ID'].append(span['spanId'])
columns['Parent ID'].append(span['parentSpanId'])
columns['Attributes'].append(
[f"<br> {k}: {v}" for (k, v) in span['attributes'].items()])
"".join([f"<br> {k}={v}" for (k, v) in span['attributes'].items()]))

# if a span name occurs only once, remove the "#1" from its unique name
for (i, name) in enumerate(columns['Name']):
Expand All @@ -50,14 +55,13 @@ def draw(data):
# By default, show all columns in hover text.
# Omit a column by setting false. You can also set special formatting rules here.
hover_data = {col: True for col in columns.keys()}
hover_data['Name'] = False # already shown
hover_data['Unique Name'] = False # already shown
hover_data['End Time'] = False # who cares
hover_data['Visual End Time'] = False # actual "End Time" already shown

fig = px.timeline(
data_frame=df,
x_start='Start Time',
x_end='End Time',
x_end='Visual End Time',
y='Unique Name',
hover_data=hover_data,
# spans with same original name get same color
Expand Down Expand Up @@ -155,6 +159,11 @@ def _simplify_attributes(attributes_list):
if (src_idx := value.find("src/")) > 0:
value = value[src_idx:]

# trim down excessively long strings
MAX_STRLEN = 150
if isinstance(value, str) and len(value) > MAX_STRLEN:
value = value[:MAX_STRLEN] + "...TRUNCATED"

simple_dict[key] = value

return simple_dict
6 changes: 5 additions & 1 deletion runners/s3-benchrunner-rust/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ async fn execute(args: &Args) -> Result<()> {

runner
.run()
.instrument(info_span!("run", num = run_num, workload = workload_name))
.instrument(info_span!(
"run-benchmark",
num = run_num,
workload = workload_name
))
.await?;

let run_secs = run_start.elapsed().as_secs_f64();
Expand Down
22 changes: 10 additions & 12 deletions runners/s3-benchrunner-rust/src/transfer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl TransferManagerRunner {
}
}

async fn run_task(self, task_i: usize, parent_span: tracing::Span) -> Result<()> {
async fn run_task(self, task_i: usize) -> Result<()> {
let task_config = &self.config().workload.tasks[task_i];

if self.config().workload.checksum.is_some() {
Expand All @@ -77,12 +77,12 @@ impl TransferManagerRunner {
match task_config.action {
TaskAction::Download => {
self.download(task_config)
.instrument(info_span!(parent: parent_span, "download", key=task_config.key))
.instrument(info_span!("download", key = task_config.key))
.await
}
TaskAction::Upload => {
self.upload(task_config)
.instrument(info_span!(parent: parent_span, "upload", key=task_config.key))
.instrument(info_span!("upload", key = task_config.key))
.await
}
}
Expand All @@ -98,14 +98,13 @@ impl TransferManagerRunner {
.bucket(&self.config().bucket)
.key(key)
.send()
.instrument(info_span!("initial-send"))
.await
.with_context(|| format!("failed starting download: {key}"))?;

// if files_on_disk: open file for writing
let mut dest_file = if self.config().workload.files_on_disk {
let file = File::create(key)
.instrument(info_span!("file-open"))
.instrument(info_span!("open-file"))
.await
.with_context(|| format!("failed creating file: {key}"))?;
Some(file)
Expand All @@ -114,22 +113,24 @@ impl TransferManagerRunner {
};

let mut total_size = 0u64;
let mut seq: u64 = 0;
while let Some(chunk_result) = download_handle
.body_mut()
.next()
.instrument(info_span!("body-next"))
.instrument(info_span!("next-chunk", seq, offset = total_size))
.await
{
let mut chunk =
chunk_result.with_context(|| format!("failed downloading next chunk of: {key}"))?;

let chunk_size = chunk.remaining();
total_size += chunk_size as u64;
seq += 1;

if let Some(dest_file) = &mut dest_file {
dest_file
.write_all_buf(&mut chunk)
.instrument(info_span!("file-write", bytes = chunk_size))
.instrument(info_span!("write-file", bytes = chunk_size))
.await?;
}
}
Expand Down Expand Up @@ -159,13 +160,11 @@ impl TransferManagerRunner {
.key(key)
.body(stream)
.send()
.instrument(info_span!("initial-send"))
.await
.with_context(|| format!("failed starting upload: {key}"))?;

upload_handle
.join()
.instrument(info_span!("join"))
.await
.with_context(|| format!("failed uploading: {key}"))?;

Expand All @@ -181,9 +180,8 @@ impl RunBenchmark for TransferManagerRunner {
// so we're using a JoinSet.
let mut task_set: JoinSet<Result<()>> = JoinSet::new();
for i in 0..self.config().workload.tasks.len() {
let parent_span_of_task = tracing::Span::current();
let task = self.clone().run_task(i, parent_span_of_task);
task_set.spawn(task);
let task = self.clone().run_task(i);
task_set.spawn(task.instrument(tracing::Span::current()));
}

while let Some(join_result) = task_set.join_next().await {
Expand Down

0 comments on commit f34e3ac

Please sign in to comment.