Skip to content

Commit

Permalink
Back out "kineto" (#1005)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #1005

Original commit changeset: 5f6ffe647dbd

Original Phabricator Diff: D64456849

D64456849 broke OSS CI: T205514965. In particular `list[OpAgg]` annotations (instead of `List[OpAgg]`) aren't supported in python 3.8.

Reviewed By: sraikund16

Differential Revision: D64839799

fbshipit-source-id: dc4f0565e2ce1ab38fc1ff2ae49056a2f2a42d0d
  • Loading branch information
davidberard98 authored and facebook-github-bot committed Oct 23, 2024
1 parent 0cb4467 commit a4eb0fa
Show file tree
Hide file tree
Showing 35 changed files with 292 additions and 301 deletions.
9 changes: 4 additions & 5 deletions benchmarks/perfetto/backends/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@
import os

import time
from collections.abc import Callable
from dataclasses import dataclass, field, fields

from typing import Dict
from typing import Callable, Dict

import numpy

Expand All @@ -26,11 +25,11 @@ def _get_input_path(input_name):
@dataclass
class TraceAnalysisMetrics:
# Latency to perform trace analysis tasks
latency: dict[str, float] = field(default_factory=dict)
latency: Dict[str, float] = field(default_factory=dict)
# Peak CPU memory to perform trace analysis tasks
peak_mem: dict[str, float] = field(default_factory=dict)
peak_mem: Dict[str, float] = field(default_factory=dict)
# extra metrics
extra_metrics: dict[str, float] = field(default_factory=dict)
extra_metrics: Dict[str, float] = field(default_factory=dict)


DEFAULT_METRICS = ["latency"]
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/perfetto/backends/perfetto.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def __init__(self, args: argparse.Namespace):
def load(self, input_file_path: str):
self.tp = TraceProcessor(input_file_path)

def search_gemm_kernels(self) -> list[str]:
def search_gemm_kernels(self) -> List[str]:
query = "SELECT DISTINCT(name) FROM slice WHERE name like '%sm90_xmma_gemm_%' ORDER BY ts"
query_result = [str(x) for x in self.tp.query(query)]
return query_result
Expand Down
8 changes: 4 additions & 4 deletions benchmarks/perfetto/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

@dataclass
class TraceAnalysisBenchmarkResult:
inputs: list[str]
tasks: list[str]
metrics: list[str]
inputs: List[str]
tasks: List[str]
metrics: List[str]
# key: (input, backend), value: benchmark results by tasks
data: dict[tuple[str, str], TraceAnalysisMetrics] = field(default_factory=dict)
data: Dict[Tuple[str, str], TraceAnalysisMetrics] = field(default_factory=dict)

def _table(self):
"""
Expand Down
2 changes: 1 addition & 1 deletion tb_plugin/examples/resnet50_autograd_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

with profile(use_cuda=True, use_kineto=True, record_shapes=True) as p:
for step, data in enumerate(trainloader, 0):
print(f"step:{step}")
print("step:{}".format(step))
inputs, labels = data[0].to(device=device), data[1].to(device=device)

outputs = model(inputs)
Expand Down
2 changes: 1 addition & 1 deletion tb_plugin/examples/resnet50_ddp_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def example(rank, use_gpu=True):
record_shapes=True
) as p:
for step, data in enumerate(trainloader, 0):
print(f"step:{step}")
print("step:{}".format(step))
if use_gpu:
inputs, labels = data[0].to(rank), data[1].to(rank)
else:
Expand Down
2 changes: 1 addition & 1 deletion tb_plugin/examples/resnet50_profiler_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
with_stack=True
) as p:
for step, data in enumerate(trainloader, 0):
print(f"step:{step}")
print("step:{}".format(step))
inputs, labels = data[0].to(device=device), data[1].to(device=device)

outputs = model(inputs)
Expand Down
4 changes: 2 additions & 2 deletions tb_plugin/fe/scripts/add_header.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@


def add_header(file):
with open(file) as f:
with open(file, 'r') as f:
contents = f.readlines()

# do nothing if there is already header
Expand All @@ -26,7 +26,7 @@ def add_header(file):
if __name__ == '__main__':
dir = sys.argv[1]
if not os.path.isdir(dir):
raise ValueError(f'{dir} is not a directory')
raise ValueError('{} is not a directory'.format(dir))

for file in glob.glob(dir + '/*.ts'):
add_header(file)
8 changes: 4 additions & 4 deletions tb_plugin/test/test_compare_with_autograd.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@


def create_log_dir():
log_dir_name = f'./log{str(int(time.time()*1000))}'
log_dir_name = './log{}'.format(str(int(time.time()*1000)))
try:
os.makedirs(log_dir_name)
except Exception:
Expand Down Expand Up @@ -198,7 +198,7 @@ def get_train_func(use_gpu=True):

def train(train_step, prof=None):
for step, data in enumerate(trainloader, 0):
print(f'step:{step}')
print('step:{}'.format(step))
inputs, labels = data[0].to(device=device), data[1].to(device=device)

outputs = model(inputs)
Expand All @@ -218,7 +218,7 @@ def get_output_fn(dir_name, profilers_dict):
def output_fn(p):
# In current torch.profiler.profile, at beginning of each span, a new p.profiler will be created.
# So the same p.profiler will not be shared among different spans
worker_name = f'worker{p.step_num}'
worker_name = 'worker{}'.format(p.step_num)
profilers_dict[worker_name] = p.profiler
tb_trace_handler = torch.profiler.tensorboard_trace_handler(dir_name, worker_name)
tb_trace_handler(p)
Expand Down Expand Up @@ -248,7 +248,7 @@ def test_autograd_api(self):
with torch.autograd.profiler.profile(use_cuda=True, use_kineto=True, record_shapes=True) as p:
get_train_func()(5)
log_dir = create_log_dir()
p.export_chrome_trace(os.path.join(log_dir, f'worker0.{int(time.time() * 1000)}.pt.trace.json'))
p.export_chrome_trace(os.path.join(log_dir, 'worker0.{}.pt.trace.json'.format(int(time.time() * 1000))))
self.compare_results(log_dir, {'worker0': p})

def base_profiler_api(self, use_gpu, record_shapes, profile_memory, with_stack):
Expand Down
12 changes: 6 additions & 6 deletions tb_plugin/test/test_tensorboard_end2end.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,15 @@ def _test_tensorboard_with_arguments(self, test_folder, expected_runs, env=None,
self._test_tensorboard(host, port, expected_runs, path_prefix)
finally:
pid = tb.pid
print(f'tensorboard process {pid} is terminating.')
print('tensorboard process {} is terminating.'.format(pid))
tb.terminate()

def _test_tensorboard(self, host, port, expected_runs, path_prefix):
if not path_prefix:
link_prefix = f'http://{host}:{port}/data/plugin/pytorch_profiler/'
link_prefix = 'http://{}:{}/data/plugin/pytorch_profiler/'.format(host, port)
else:
path_prefix = path_prefix.strip('/')
link_prefix = f'http://{host}:{port}/{path_prefix}/data/plugin/pytorch_profiler/'
link_prefix = 'http://{}:{}/{}/data/plugin/pytorch_profiler/'.format(host, port, path_prefix)
run_link = link_prefix + 'runs'

expected_links_format = [
Expand All @@ -102,7 +102,7 @@ def _test_tensorboard(self, host, port, expected_runs, path_prefix):
socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect((host, port))
print('tensorboard start successfully')
break
except OSError:
except socket.error:
time.sleep(2)
retry_times -= 1
if retry_times < 0:
Expand All @@ -120,7 +120,7 @@ def _test_tensorboard(self, host, port, expected_runs, path_prefix):
data = json.loads(data)
runs = data.get('runs')
if runs:
runs = '[{}]'.format(', '.join([f'"{i}"' for i in runs]))
runs = '[{}]'.format(', '.join(['"{}"'.format(i) for i in runs]))
runs = runs.encode('utf-8')
if runs == expected_runs:
break
Expand Down Expand Up @@ -151,7 +151,7 @@ def _test_tensorboard(self, host, port, expected_runs, path_prefix):
f.write(response.read().decode('utf-8'))
f.write('\n')
else:
with open('result_check_file.txt') as f:
with open('result_check_file.txt', 'r') as f:
lines = f.readlines()
i = 0
print('starting testing...')
Expand Down
8 changes: 4 additions & 4 deletions tb_plugin/torch_tb_profiler/io/azureblob.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def write(self, filename, file_content, binary_mode=False):
client.upload_blob(path, file_content)

def download_file(self, file_to_download, file_to_save):
logger.info('azure blob: starting downloading file {} as {}'.format(file_to_download, file_to_save))
logger.info('azure blob: starting downloading file %s as %s' % (file_to_download, file_to_save))
account, container, path = self.container_and_path(file_to_download)
client = self.create_container_client(account, container)
blob_client = client.get_blob_client(path)
Expand All @@ -87,7 +87,7 @@ def glob(self, filename):
quest_i = filename.find('?')
if quest_i >= 0:
raise NotImplementedError(
f'{filename} not supported by compat glob'
'{} not supported by compat glob'.format(filename)
)
if star_i != len(filename) - 1:
return []
Expand Down Expand Up @@ -141,7 +141,7 @@ def walk(self, top, topdown=True, onerror=None):
results = {}
for blob in blobs:
dirname, basename = self.split(blob.name)
dirname = f'https://{account}/{container}/{dirname}'
dirname = 'https://{}/{}/{}'.format(account, container, dirname)
results.setdefault(dirname, []).append(basename)
for key, value in results.items():
yield key, None, value
Expand Down Expand Up @@ -183,5 +183,5 @@ def create_container_client(self, account, container):
if self.connection_string:
client = ContainerClient.from_connection_string(self.connection_string, container)
else:
client = ContainerClient.from_container_url(f'https://{account}/{container}')
client = ContainerClient.from_container_url('https://{}/{}'.format(account, container))
return client
4 changes: 2 additions & 2 deletions tb_plugin/torch_tb_profiler/io/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __setstate__(self, state):
"""
from absl import logging
logging.use_absl_handler()
logger.debug('Cache.__setstate__ {} '.format(state))
logger.debug('Cache.__setstate__ %s ' % (state,))
data, file._REGISTERED_FILESYSTEMS = state
self.__dict__.update(data)

Expand Down Expand Up @@ -71,7 +71,7 @@ def get_file(self, filename):

def add_file(self, source_file, local_file):
with self._lock:
logger.debug('add local cache {} for file {}'.format(local_file, source_file))
logger.debug('add local cache %s for file %s' % (local_file, source_file))
self._cache_dict[source_file] = local_file

def __enter__(self):
Expand Down
13 changes: 7 additions & 6 deletions tb_plugin/torch_tb_profiler/io/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ def read(self, filename, binary_mode=False, size=None, continue_from=None):
endpoint = offset + size

if offset != 0 or endpoint != "":
args["Range"] = f"bytes={offset}-{endpoint}"
args["Range"] = "bytes={}-{}".format(offset, endpoint)

logger.info("s3: starting reading file %s" % filename)
try:
Expand All @@ -238,7 +238,7 @@ def read(self, filename, binary_mode=False, size=None, continue_from=None):
# Asked for no bytes, so just return empty
stream = b""
else:
args["Range"] = f"bytes={offset}-{endpoint}"
args["Range"] = "bytes={}-{}".format(offset, endpoint)
stream = s3.Object(bucket, path).get(**args)["Body"].read()
else:
raise
Expand Down Expand Up @@ -273,7 +273,7 @@ def download_file(self, file_to_download, file_to_save):
s3 = boto3.resource("s3", endpoint_url=self._s3_endpoint)
bucket, path = self.bucket_and_path(file_to_download)
s3.Bucket(bucket).download_file(path, file_to_save)
logger.info("s3: file {} is downloaded as {}".format(file_to_download, file_to_save))
logger.info("s3: file %s is downloaded as %s" % (file_to_download, file_to_save))
return

def glob(self, filename):
Expand All @@ -282,7 +282,7 @@ def glob(self, filename):
star_i = filename.find("*")
quest_i = filename.find("?")
if quest_i >= 0:
raise NotImplementedError(f"{filename} not supported")
raise NotImplementedError("{} not supported".format(filename))
if star_i != len(filename) - 1:
return []

Expand Down Expand Up @@ -366,7 +366,7 @@ def stat(self, filename):
class File:
def __init__(self, filename, mode):
if mode not in ("r", "rb", "br", "w", "wb", "bw"):
raise ValueError(f"mode {mode} not supported by File")
raise ValueError("mode {} not supported by File".format(mode))
self.filename = filename
self.fs = get_filesystem(self.filename)
self.fs_supports_append = self.fs.support_append()
Expand Down Expand Up @@ -615,7 +615,8 @@ def walk(top, topdown=True, onerror=None):

for subdir in subdirs:
joined_subdir = fs.join(top, subdir)
yield from walk(joined_subdir, topdown, onerror=onerror)
for subitem in walk(joined_subdir, topdown, onerror=onerror):
yield subitem

if not topdown:
yield here
Expand Down
2 changes: 1 addition & 1 deletion tb_plugin/torch_tb_profiler/io/gs.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def walk(self, top, topdown=True, onerror=None):
results = {}
for blob in blobs:
dirname, basename = self.split(blob.name)
dirname = f'gs://{bucket_name}/{dirname}'
dirname = 'gs://{}/{}'.format(bucket_name, dirname)
results.setdefault(dirname, []).append(basename)
for key, value in results.items():
yield key, None, value
Expand Down
2 changes: 1 addition & 1 deletion tb_plugin/torch_tb_profiler/io/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def as_bytes(bytes_or_text, encoding="utf-8"):
return bytes_or_text
else:
raise TypeError(
"Expected binary or unicode string, got {!r}".format(bytes_or_text)
"Expected binary or unicode string, got %r" % (bytes_or_text,)
)


Expand Down
14 changes: 7 additions & 7 deletions tb_plugin/torch_tb_profiler/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def __init__(self, context: base_plugin.TBContext):
Args:
context: A base_plugin.TBContext instance.
"""
super().__init__(context)
super(TorchProfilerPlugin, self).__init__(context)
if not context.logdir and context.flags.logdir_spec:
dirs = context.flags.logdir_spec.split(',')
if len(dirs) > 1:
Expand Down Expand Up @@ -332,7 +332,7 @@ def module_route(self, request: werkzeug.Request):
name = request.args.get('run')
worker = request.args.get('worker')
span = request.args.get('span')
raise exceptions.NotFound('could not find the run for {}/{}/{}'.format(name, worker, span))
raise exceptions.NotFound('could not find the run for %s/%s/%s' % (name, worker, span))

@wrappers.Request.application
def op_tree_route(self, request: werkzeug.Request):
Expand Down Expand Up @@ -374,7 +374,7 @@ def static_file_route(self, request: werkzeug.Request):
try:
with open(filepath, 'rb') as infile:
contents = infile.read()
except OSError:
except IOError:
raise exceptions.NotFound('404 Not Found')
return werkzeug.Response(
contents, content_type=mimetype, headers=TorchProfilerPlugin.headers
Expand Down Expand Up @@ -512,7 +512,7 @@ def _load_run(self, run_dir):
try:
self._load_threads.remove(t)
except ValueError:
logger.warning(f'could not find the thread {run_dir}')
logger.warning('could not find the thread {}'.format(run_dir))

def _get_run(self, name) -> Run:
with self._runs_lock:
Expand All @@ -538,7 +538,7 @@ def _get_profile_for_request(self, request: werkzeug.Request) -> RunProfile:
self._validate(run=name, worker=worker)
profile = self._get_profile(name, worker, span)
if not isinstance(profile, RunProfile):
raise exceptions.BadRequest('Get an unexpected profile type {} for {}/{}'.format(type(profile), name, worker))
raise exceptions.BadRequest('Get an unexpected profile type %s for %s/%s' % (type(profile), name, worker))

return profile

Expand All @@ -548,15 +548,15 @@ def _get_distributed_profile_for_request(self, request: werkzeug.Request) -> Dis
self._validate(run=name)
profile = self._get_profile(name, 'All', span)
if not isinstance(profile, DistributedRunProfile):
raise exceptions.BadRequest('Get an unexpected distributed profile type {} for {}'.format(type(profile), name))
raise exceptions.BadRequest('Get an unexpected distributed profile type %s for %s' % (type(profile), name))

return profile

def _get_profile(self, name, worker, span):
run = self._get_run(name)
profile = run.get_profile(worker, span)
if profile is None:
raise exceptions.NotFound('could not find the profile for {}/{}/{} '.format(name, worker, span))
raise exceptions.NotFound('could not find the profile for %s/%s/%s ' % (name, worker, span))
return profile

def _validate(self, **kwargs):
Expand Down
Loading

0 comments on commit a4eb0fa

Please sign in to comment.