Skip to content

Commit

Permalink
migrating execution stats
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitriyMusatkin committed Jun 3, 2024
1 parent 1a0e08b commit 68aac44
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 40 deletions.
1 change: 1 addition & 0 deletions scripts/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ boto3
awscrt # for linting s3-benchrunner-python
autopep8 # for code formatting
mypy # for type checking
psutil # process stats
aws-cdk-lib==2.116.1 # CDK
constructs>=10.0.0,<11.0.0 # CDK
5 changes: 3 additions & 2 deletions scripts/run-benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
from pathlib import Path
import shlex

from utils import S3_CLIENTS, run, workload_paths_from_args
from utils import S3_CLIENTS, workload_paths_from_args
from utils.metrics import report_metrics
from utils.execution import run_with_stats

parser = argparse.ArgumentParser(
description='Benchmark workloads with a specific runner')
Expand Down Expand Up @@ -64,7 +65,7 @@
args.region, str(args.throughput)]

start_time = datetime.now(timezone.utc)
result = run(cmd, check=False, capture_output=True)
result, stats = run_with_stats(cmd, check=False, capture_output=True)
end_time = datetime.now(timezone.utc)

# reporting metrics before checking returncode
Expand Down
39 changes: 1 addition & 38 deletions scripts/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import os
from dataclasses import dataclass
from pathlib import Path
import subprocess
from typing import Optional

from utils.execution import run

REPO_DIR = Path(__file__).parent.parent.parent
RUNNERS_DIR = REPO_DIR/'runners'
Expand Down Expand Up @@ -63,43 +63,6 @@ def workload_paths_from_args(workloads: Optional[list[str]]) -> list[Path]:
return workload_paths


def run(cmd_args: list[str], check=True, capture_output=False) -> subprocess.CompletedProcess:
"""Run a subprocess"""
print(f'{Path.cwd()}> {subprocess.list2cmdline(cmd_args)}', flush=True)

if capture_output:
# Subprocess doesn't have built-in support for capturing output
# AND printing while it comes in, so we have to do it ourselves.
# We're combining stderr with stdout, for simplicity.
with subprocess.Popen(
cmd_args,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1, # line-buffered
) as p:
lines = []
assert p.stdout is not None # satisfy type checker
for line in p.stdout:
lines.append(line)
print(line, end='', flush=True)

p.wait() # ensure process is 100% finished

completed = subprocess.CompletedProcess(
args=cmd_args,
returncode=p.returncode,
stdout="".join(lines),
)
else:
# simpler case: just run the command
completed = subprocess.run(cmd_args, text=True)

if check and completed.returncode != 0:
exit(f"FAILED running: {subprocess.list2cmdline(cmd_args)}")
return completed


def fetch_git_repo(url: str, dir: Path, main_branch: str = 'main', preferred_branch: Optional[str] = None):
"""
Ensure repo is cloned, up to date, and on the right branch.
Expand Down
122 changes: 122 additions & 0 deletions scripts/utils/execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import subprocess
import psutil

from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Tuple, Any, List


@dataclass
class ExecutionCheckpoint:
time: datetime
cpu_count: int
cpu_times: Any
disk_io: Any
network_io: Any


def _execution_stats() -> ExecutionCheckpoint:

return ExecutionCheckpoint(
time=datetime.now(timezone.utc),
cpu_count=psutil.cpu_count(logical=False),
cpu_times=psutil.cpu_times(percpu=True),
disk_io=psutil.disk_io_counters(perdisk=False, nowrap=True),
network_io=psutil.net_io_counters(pernic=True, nowrap=True),
)


def print_execution_stats(start: ExecutionCheckpoint, end: ExecutionCheckpoint):
print("**** Execution stats ****")

cpu_sys = []
cpu_usr = []
cpu_idle = []
cpu_total = []
cpu_avg = []
for idx, item in enumerate(start.cpu_times):
cpu_usr.append(end.cpu_times[idx].user - item.user)
cpu_sys.append(end.cpu_times[idx].system - item.system)
cpu_idle.append(end.cpu_times[idx].idle - item.idle)
cpu_total.append(cpu_usr[idx] + cpu_sys[idx] + cpu_idle[idx])
cpu_avg.append((cpu_sys[idx] + cpu_usr[idx]) / cpu_total[idx])

print(f'Average CPU usage: {sum(cpu_avg) / len(cpu_avg)}')
print(f' CPU User usage per proc: {cpu_usr}')
print(f' CPU Sys usage per proc: {cpu_sys}')
print(f' CPU Idle usage per proc: {cpu_idle}')

print(f'Disk Reads in bytes: '
f'{end.disk_io.read_bytes - start.disk_io.read_bytes}')
print(f'Disk Writes in bytes: '
f'{end.disk_io.write_bytes - start.disk_io.write_bytes}')
if hasattr(start.disk_io, 'busy_time'):
print(f'Disk busy time in ms: '
f'{end.disk_io.busy_time - start.disk_io.busy_time}')

for nic in start.network_io:
start_nic = start.network_io[nic]
end_nic = end.network_io[nic]
print(f'nic {nic} sent in bytes: '
f'{end_nic.bytes_sent - start_nic.bytes_sent}')
print(f'nic {nic} recv in bytes: '
f'{end_nic.bytes_recv - start_nic.bytes_recv}')
print(f'nic {nic} errors in: '
f'{end_nic.errin - start_nic.errin}')
print(f'nic {nic} errors out: '
f'{end_nic.errout - start_nic.errout}')
print(f'nic {nic} dropped packets coming in: '
f'{end_nic.dropin - start_nic.dropin}')
print(f'nic {nic} dropped packets coming out: '
f'{end_nic.dropout - start_nic.dropout}')


def run(cmd_args: list[str], check=True, capture_output=False) \
-> subprocess.CompletedProcess:
"""Run a subprocess"""
print(f'{Path.cwd()}> {subprocess.list2cmdline(cmd_args)}', flush=True)

if capture_output:
# Subprocess doesn't have built-in support for capturing output
# AND printing while it comes in, so we have to do it ourselves.
# We're combining stderr with stdout, for simplicity.
with subprocess.Popen(
cmd_args,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1, # line-buffered
) as p:
lines = []
assert p.stdout is not None # satisfy type checker
for line in p.stdout:
lines.append(line)
print(line, end='', flush=True)

p.wait() # ensure process is 100% finished

completed = subprocess.CompletedProcess(
args=cmd_args,
returncode=p.returncode,
stdout="".join(lines),
)
else:
# simpler case: just run the command
completed = subprocess.run(cmd_args, text=True)

if check and completed.returncode != 0:
exit(f"FAILED running: {subprocess.list2cmdline(cmd_args)}")

return completed


def run_with_stats(cmd_args: list[str], check=True, capture_output=False) \
-> Tuple[subprocess.CompletedProcess, List[ExecutionCheckpoint]]:
execution_checkpoints = [_execution_stats()]
completed = run(cmd_args=cmd_args,
check=check,
capture_output=capture_output)
execution_checkpoints.append(_execution_stats())
return completed, execution_checkpoints

0 comments on commit 68aac44

Please sign in to comment.