diff --git a/runners/s3-benchrunner-rust/Cargo.lock b/runners/s3-benchrunner-rust/Cargo.lock index 579ceea..ffb48c7 100644 --- a/runners/s3-benchrunner-rust/Cargo.lock +++ b/runners/s3-benchrunner-rust/Cargo.lock @@ -247,7 +247,7 @@ dependencies = [ [[package]] name = "aws-s3-transfer-manager" version = "0.1.0" -source = "git+https://github.com/awslabs/aws-s3-transfer-manager-rs.git?rev=3dd20c8aa0872352100cf456beee02bfc53c73d1#3dd20c8aa0872352100cf456beee02bfc53c73d1" +source = "git+https://github.com/awslabs/aws-s3-transfer-manager-rs.git?rev=33031d9945cd9961bb5e1b5207ac8870b0a9dbbd#33031d9945cd9961bb5e1b5207ac8870b0a9dbbd" dependencies = [ "async-channel", "async-trait", diff --git a/runners/s3-benchrunner-rust/Cargo.toml b/runners/s3-benchrunner-rust/Cargo.toml index 885c11c..c4eb200 100644 --- a/runners/s3-benchrunner-rust/Cargo.toml +++ b/runners/s3-benchrunner-rust/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" [dependencies] # Swap which line is commented-out to use GitHub or local aws-s3-transfer-manager -aws-s3-transfer-manager = { git = "https://github.com/awslabs/aws-s3-transfer-manager-rs.git", rev = "3dd20c8aa0872352100cf456beee02bfc53c73d1" } +aws-s3-transfer-manager = { git = "https://github.com/awslabs/aws-s3-transfer-manager-rs.git", rev = "33031d9945cd9961bb5e1b5207ac8870b0a9dbbd" } # aws-s3-transfer-manager = { path = "../../../aws-s3-transfer-manager-rs/aws-s3-transfer-manager" } tracing-opentelemetry = "0.27" diff --git a/runners/s3-benchrunner-rust/graph.py b/runners/s3-benchrunner-rust/graph.py index dc814f0..2f3c0cc 100755 --- a/runners/s3-benchrunner-rust/graph.py +++ b/runners/s3-benchrunner-rust/graph.py @@ -28,13 +28,22 @@ args = PARSER.parse_args() -with PerfTimer(f'Open {args.TRACE_JSON}'): - with open(args.TRACE_JSON) as f: +trace_json = Path(args.TRACE_JSON) + +# if directory passed in, pick the newest file +if trace_json.is_dir(): + all_traces = list(trace_json.glob('trace_*.json')) + if len(all_traces) == 0: + exit(f"No trace_*.json found under: {trace_json.absolute()}") + trace_json = sorted(all_traces, key=lambda x: x.stat().st_mtime)[-1] + +with PerfTimer(f'Open {trace_json}'): + with open(trace_json) as f: traces_data = json.load(f) with PerfTimer('Graph all spans'): fig = graph.allspans.draw(traces_data) -html_path = Path(args.TRACE_JSON).with_suffix('.html') +html_path = Path(trace_json).with_suffix('.allspans.html') with PerfTimer(f'Write {html_path}'): fig.write_html(html_path) diff --git a/runners/s3-benchrunner-rust/graph/allspans.py b/runners/s3-benchrunner-rust/graph/allspans.py index e93fc0d..8eb1d83 100644 --- a/runners/s3-benchrunner-rust/graph/allspans.py +++ b/runners/s3-benchrunner-rust/graph/allspans.py @@ -21,24 +21,29 @@ def draw(data): columns = defaultdict(list) name_count = defaultdict(int) for (idx, span) in enumerate(spans): - name = span['name'] # we want each span in its own row, so assign a unique name and use that as Y value # TODO: improve unique name, using "seq" or "part-num" name_count[name] += 1 unique_name = f"{name}#{name_count[name]}" + start_time_ns = span['startTimeUnixNano'] + end_time_ns = span['endTimeUnixNano'] + duration_ns = end_time_ns - start_time_ns + # ensure span is wide enough to see + visual_end_time_ns = start_time_ns + max(duration_ns, 50_000_000) + columns['Name'].append(name) columns['Unique Name'].append(unique_name) - columns['Duration (ns)'].append( - span['endTimeUnixNano'] - span['startTimeUnixNano']) - columns['Start Time'].append(pd.to_datetime(span['startTimeUnixNano'])) - columns['End Time'].append(pd.to_datetime(span['endTimeUnixNano'])) + columns['Start Time'].append(pd.to_datetime(start_time_ns)) + columns['End Time'].append(pd.to_datetime(end_time_ns)) + columns['Visual End Time'].append(pd.to_datetime(visual_end_time_ns)) + columns['Duration (secs)'].append(duration_ns / 1_000_000_000.0) columns['Index'].append(idx) columns['Span ID'].append(span['spanId']) columns['Parent ID'].append(span['parentSpanId']) columns['Attributes'].append( - [f"
{k}: {v}" for (k, v) in span['attributes'].items()]) + "".join([f"
{k}={v}" for (k, v) in span['attributes'].items()])) # if a span name occurs only once, remove the "#1" from its unique name for (i, name) in enumerate(columns['Name']): @@ -50,14 +55,13 @@ def draw(data): # By default, show all columns in hover text. # Omit a column by setting false. You can also set special formatting rules here. hover_data = {col: True for col in columns.keys()} - hover_data['Name'] = False # already shown hover_data['Unique Name'] = False # already shown - hover_data['End Time'] = False # who cares + hover_data['Visual End Time'] = False # actual "End Time" already shown fig = px.timeline( data_frame=df, x_start='Start Time', - x_end='End Time', + x_end='Visual End Time', y='Unique Name', hover_data=hover_data, # spans with same original name get same color @@ -155,6 +159,11 @@ def _simplify_attributes(attributes_list): if (src_idx := value.find("src/")) > 0: value = value[src_idx:] + # trim down excessively long strings + MAX_STRLEN = 150 + if isinstance(value, str) and len(value) > MAX_STRLEN: + value = value[:MAX_STRLEN] + "...TRUNCATED" + simple_dict[key] = value return simple_dict diff --git a/runners/s3-benchrunner-rust/src/main.rs b/runners/s3-benchrunner-rust/src/main.rs index b271da9..f382a44 100644 --- a/runners/s3-benchrunner-rust/src/main.rs +++ b/runners/s3-benchrunner-rust/src/main.rs @@ -80,7 +80,11 @@ async fn execute(args: &Args) -> Result<()> { runner .run() - .instrument(info_span!("run", num = run_num, workload = workload_name)) + .instrument(info_span!( + "run-benchmark", + num = run_num, + workload = workload_name + )) .await?; let run_secs = run_start.elapsed().as_secs_f64(); diff --git a/runners/s3-benchrunner-rust/src/transfer_manager.rs b/runners/s3-benchrunner-rust/src/transfer_manager.rs index 1d97e22..95052e9 100644 --- a/runners/s3-benchrunner-rust/src/transfer_manager.rs +++ b/runners/s3-benchrunner-rust/src/transfer_manager.rs @@ -67,7 +67,7 @@ impl TransferManagerRunner { } } - async fn run_task(self, task_i: usize, parent_span: tracing::Span) -> Result<()> { + async fn run_task(self, task_i: usize) -> Result<()> { let task_config = &self.config().workload.tasks[task_i]; if self.config().workload.checksum.is_some() { @@ -77,12 +77,12 @@ impl TransferManagerRunner { match task_config.action { TaskAction::Download => { self.download(task_config) - .instrument(info_span!(parent: parent_span, "download", key=task_config.key)) + .instrument(info_span!("download", key = task_config.key)) .await } TaskAction::Upload => { self.upload(task_config) - .instrument(info_span!(parent: parent_span, "upload", key=task_config.key)) + .instrument(info_span!("upload", key = task_config.key)) .await } } @@ -98,14 +98,13 @@ impl TransferManagerRunner { .bucket(&self.config().bucket) .key(key) .send() - .instrument(info_span!("initial-send")) .await .with_context(|| format!("failed starting download: {key}"))?; // if files_on_disk: open file for writing let mut dest_file = if self.config().workload.files_on_disk { let file = File::create(key) - .instrument(info_span!("file-open")) + .instrument(info_span!("open-file")) .await .with_context(|| format!("failed creating file: {key}"))?; Some(file) @@ -114,10 +113,11 @@ impl TransferManagerRunner { }; let mut total_size = 0u64; + let mut seq: u64 = 0; while let Some(chunk_result) = download_handle .body_mut() .next() - .instrument(info_span!("body-next")) + .instrument(info_span!("next-chunk", seq, offset = total_size)) .await { let mut chunk = @@ -125,11 +125,12 @@ impl TransferManagerRunner { let chunk_size = chunk.remaining(); total_size += chunk_size as u64; + seq += 1; if let Some(dest_file) = &mut dest_file { dest_file .write_all_buf(&mut chunk) - .instrument(info_span!("file-write", bytes = chunk_size)) + .instrument(info_span!("write-file", bytes = chunk_size)) .await?; } } @@ -159,13 +160,11 @@ impl TransferManagerRunner { .key(key) .body(stream) .send() - .instrument(info_span!("initial-send")) .await .with_context(|| format!("failed starting upload: {key}"))?; upload_handle .join() - .instrument(info_span!("join")) .await .with_context(|| format!("failed uploading: {key}"))?; @@ -181,9 +180,8 @@ impl RunBenchmark for TransferManagerRunner { // so we're using a JoinSet. let mut task_set: JoinSet> = JoinSet::new(); for i in 0..self.config().workload.tasks.len() { - let parent_span_of_task = tracing::Span::current(); - let task = self.clone().run_task(i, parent_span_of_task); - task_set.spawn(task); + let task = self.clone().run_task(i); + task_set.spawn(task.instrument(tracing::Span::current())); } while let Some(join_result) = task_set.join_next().await {