diff --git a/benchmarks/perfetto/backends/common.py b/benchmarks/perfetto/backends/common.py index 0a6d590ab..1dac400f5 100644 --- a/benchmarks/perfetto/backends/common.py +++ b/benchmarks/perfetto/backends/common.py @@ -8,10 +8,9 @@ import os import time -from collections.abc import Callable from dataclasses import dataclass, field, fields -from typing import Dict +from typing import Callable, Dict import numpy @@ -26,11 +25,11 @@ def _get_input_path(input_name): @dataclass class TraceAnalysisMetrics: # Latency to perform trace analysis tasks - latency: dict[str, float] = field(default_factory=dict) + latency: Dict[str, float] = field(default_factory=dict) # Peak CPU memory to perform trace analysis tasks - peak_mem: dict[str, float] = field(default_factory=dict) + peak_mem: Dict[str, float] = field(default_factory=dict) # extra metrics - extra_metrics: dict[str, float] = field(default_factory=dict) + extra_metrics: Dict[str, float] = field(default_factory=dict) DEFAULT_METRICS = ["latency"] diff --git a/benchmarks/perfetto/backends/perfetto.py b/benchmarks/perfetto/backends/perfetto.py index 0c682e9c3..f4429c47e 100644 --- a/benchmarks/perfetto/backends/perfetto.py +++ b/benchmarks/perfetto/backends/perfetto.py @@ -23,7 +23,7 @@ def __init__(self, args: argparse.Namespace): def load(self, input_file_path: str): self.tp = TraceProcessor(input_file_path) - def search_gemm_kernels(self) -> list[str]: + def search_gemm_kernels(self) -> List[str]: query = "SELECT DISTINCT(name) FROM slice WHERE name like '%sm90_xmma_gemm_%' ORDER BY ts" query_result = [str(x) for x in self.tp.query(query)] return query_result diff --git a/benchmarks/perfetto/table.py b/benchmarks/perfetto/table.py index a7243d50a..80d6017c1 100644 --- a/benchmarks/perfetto/table.py +++ b/benchmarks/perfetto/table.py @@ -16,11 +16,11 @@ @dataclass class TraceAnalysisBenchmarkResult: - inputs: list[str] - tasks: list[str] - metrics: list[str] + inputs: List[str] + tasks: List[str] + metrics: List[str] # key: (input, backend), value: benchmark results by tasks - data: dict[tuple[str, str], TraceAnalysisMetrics] = field(default_factory=dict) + data: Dict[Tuple[str, str], TraceAnalysisMetrics] = field(default_factory=dict) def _table(self): """ diff --git a/tb_plugin/examples/resnet50_autograd_api.py b/tb_plugin/examples/resnet50_autograd_api.py index 51c9b9acc..3c0bbbc9b 100644 --- a/tb_plugin/examples/resnet50_autograd_api.py +++ b/tb_plugin/examples/resnet50_autograd_api.py @@ -27,7 +27,7 @@ with profile(use_cuda=True, use_kineto=True, record_shapes=True) as p: for step, data in enumerate(trainloader, 0): - print(f"step:{step}") + print("step:{}".format(step)) inputs, labels = data[0].to(device=device), data[1].to(device=device) outputs = model(inputs) diff --git a/tb_plugin/examples/resnet50_ddp_profiler.py b/tb_plugin/examples/resnet50_ddp_profiler.py index 95ba4d841..8ce00fbe1 100644 --- a/tb_plugin/examples/resnet50_ddp_profiler.py +++ b/tb_plugin/examples/resnet50_ddp_profiler.py @@ -58,7 +58,7 @@ def example(rank, use_gpu=True): record_shapes=True ) as p: for step, data in enumerate(trainloader, 0): - print(f"step:{step}") + print("step:{}".format(step)) if use_gpu: inputs, labels = data[0].to(rank), data[1].to(rank) else: diff --git a/tb_plugin/examples/resnet50_profiler_api.py b/tb_plugin/examples/resnet50_profiler_api.py index fab1bbc73..a9cb459ef 100644 --- a/tb_plugin/examples/resnet50_profiler_api.py +++ b/tb_plugin/examples/resnet50_profiler_api.py @@ -38,7 +38,7 @@ with_stack=True ) as p: for step, data in enumerate(trainloader, 0): - print(f"step:{step}") + print("step:{}".format(step)) inputs, labels = data[0].to(device=device), data[1].to(device=device) outputs = model(inputs) diff --git a/tb_plugin/fe/scripts/add_header.py b/tb_plugin/fe/scripts/add_header.py index 2f032f97b..a36b60637 100755 --- a/tb_plugin/fe/scripts/add_header.py +++ b/tb_plugin/fe/scripts/add_header.py @@ -11,7 +11,7 @@ def add_header(file): - with open(file) as f: + with open(file, 'r') as f: contents = f.readlines() # do nothing if there is already header @@ -26,7 +26,7 @@ def add_header(file): if __name__ == '__main__': dir = sys.argv[1] if not os.path.isdir(dir): - raise ValueError(f'{dir} is not a directory') + raise ValueError('{} is not a directory'.format(dir)) for file in glob.glob(dir + '/*.ts'): add_header(file) diff --git a/tb_plugin/test/test_compare_with_autograd.py b/tb_plugin/test/test_compare_with_autograd.py index ee7a35148..dc266ccc8 100644 --- a/tb_plugin/test/test_compare_with_autograd.py +++ b/tb_plugin/test/test_compare_with_autograd.py @@ -15,7 +15,7 @@ def create_log_dir(): - log_dir_name = f'./log{str(int(time.time()*1000))}' + log_dir_name = './log{}'.format(str(int(time.time()*1000))) try: os.makedirs(log_dir_name) except Exception: @@ -198,7 +198,7 @@ def get_train_func(use_gpu=True): def train(train_step, prof=None): for step, data in enumerate(trainloader, 0): - print(f'step:{step}') + print('step:{}'.format(step)) inputs, labels = data[0].to(device=device), data[1].to(device=device) outputs = model(inputs) @@ -218,7 +218,7 @@ def get_output_fn(dir_name, profilers_dict): def output_fn(p): # In current torch.profiler.profile, at beginning of each span, a new p.profiler will be created. # So the same p.profiler will not be shared among different spans - worker_name = f'worker{p.step_num}' + worker_name = 'worker{}'.format(p.step_num) profilers_dict[worker_name] = p.profiler tb_trace_handler = torch.profiler.tensorboard_trace_handler(dir_name, worker_name) tb_trace_handler(p) @@ -248,7 +248,7 @@ def test_autograd_api(self): with torch.autograd.profiler.profile(use_cuda=True, use_kineto=True, record_shapes=True) as p: get_train_func()(5) log_dir = create_log_dir() - p.export_chrome_trace(os.path.join(log_dir, f'worker0.{int(time.time() * 1000)}.pt.trace.json')) + p.export_chrome_trace(os.path.join(log_dir, 'worker0.{}.pt.trace.json'.format(int(time.time() * 1000)))) self.compare_results(log_dir, {'worker0': p}) def base_profiler_api(self, use_gpu, record_shapes, profile_memory, with_stack): diff --git a/tb_plugin/test/test_tensorboard_end2end.py b/tb_plugin/test/test_tensorboard_end2end.py index 9fa3de772..fae95b490 100644 --- a/tb_plugin/test/test_tensorboard_end2end.py +++ b/tb_plugin/test/test_tensorboard_end2end.py @@ -77,15 +77,15 @@ def _test_tensorboard_with_arguments(self, test_folder, expected_runs, env=None, self._test_tensorboard(host, port, expected_runs, path_prefix) finally: pid = tb.pid - print(f'tensorboard process {pid} is terminating.') + print('tensorboard process {} is terminating.'.format(pid)) tb.terminate() def _test_tensorboard(self, host, port, expected_runs, path_prefix): if not path_prefix: - link_prefix = f'http://{host}:{port}/data/plugin/pytorch_profiler/' + link_prefix = 'http://{}:{}/data/plugin/pytorch_profiler/'.format(host, port) else: path_prefix = path_prefix.strip('/') - link_prefix = f'http://{host}:{port}/{path_prefix}/data/plugin/pytorch_profiler/' + link_prefix = 'http://{}:{}/{}/data/plugin/pytorch_profiler/'.format(host, port, path_prefix) run_link = link_prefix + 'runs' expected_links_format = [ @@ -102,7 +102,7 @@ def _test_tensorboard(self, host, port, expected_runs, path_prefix): socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect((host, port)) print('tensorboard start successfully') break - except OSError: + except socket.error: time.sleep(2) retry_times -= 1 if retry_times < 0: @@ -120,7 +120,7 @@ def _test_tensorboard(self, host, port, expected_runs, path_prefix): data = json.loads(data) runs = data.get('runs') if runs: - runs = '[{}]'.format(', '.join([f'"{i}"' for i in runs])) + runs = '[{}]'.format(', '.join(['"{}"'.format(i) for i in runs])) runs = runs.encode('utf-8') if runs == expected_runs: break @@ -151,7 +151,7 @@ def _test_tensorboard(self, host, port, expected_runs, path_prefix): f.write(response.read().decode('utf-8')) f.write('\n') else: - with open('result_check_file.txt') as f: + with open('result_check_file.txt', 'r') as f: lines = f.readlines() i = 0 print('starting testing...') diff --git a/tb_plugin/torch_tb_profiler/io/azureblob.py b/tb_plugin/torch_tb_profiler/io/azureblob.py index 6ce4093d7..b0ac49a65 100644 --- a/tb_plugin/torch_tb_profiler/io/azureblob.py +++ b/tb_plugin/torch_tb_profiler/io/azureblob.py @@ -66,7 +66,7 @@ def write(self, filename, file_content, binary_mode=False): client.upload_blob(path, file_content) def download_file(self, file_to_download, file_to_save): - logger.info('azure blob: starting downloading file {} as {}'.format(file_to_download, file_to_save)) + logger.info('azure blob: starting downloading file %s as %s' % (file_to_download, file_to_save)) account, container, path = self.container_and_path(file_to_download) client = self.create_container_client(account, container) blob_client = client.get_blob_client(path) @@ -87,7 +87,7 @@ def glob(self, filename): quest_i = filename.find('?') if quest_i >= 0: raise NotImplementedError( - f'{filename} not supported by compat glob' + '{} not supported by compat glob'.format(filename) ) if star_i != len(filename) - 1: return [] @@ -141,7 +141,7 @@ def walk(self, top, topdown=True, onerror=None): results = {} for blob in blobs: dirname, basename = self.split(blob.name) - dirname = f'https://{account}/{container}/{dirname}' + dirname = 'https://{}/{}/{}'.format(account, container, dirname) results.setdefault(dirname, []).append(basename) for key, value in results.items(): yield key, None, value @@ -183,5 +183,5 @@ def create_container_client(self, account, container): if self.connection_string: client = ContainerClient.from_connection_string(self.connection_string, container) else: - client = ContainerClient.from_container_url(f'https://{account}/{container}') + client = ContainerClient.from_container_url('https://{}/{}'.format(account, container)) return client diff --git a/tb_plugin/torch_tb_profiler/io/cache.py b/tb_plugin/torch_tb_profiler/io/cache.py index 8c46a7044..ea9afab66 100644 --- a/tb_plugin/torch_tb_profiler/io/cache.py +++ b/tb_plugin/torch_tb_profiler/io/cache.py @@ -38,7 +38,7 @@ def __setstate__(self, state): """ from absl import logging logging.use_absl_handler() - logger.debug('Cache.__setstate__ {} '.format(state)) + logger.debug('Cache.__setstate__ %s ' % (state,)) data, file._REGISTERED_FILESYSTEMS = state self.__dict__.update(data) @@ -71,7 +71,7 @@ def get_file(self, filename): def add_file(self, source_file, local_file): with self._lock: - logger.debug('add local cache {} for file {}'.format(local_file, source_file)) + logger.debug('add local cache %s for file %s' % (local_file, source_file)) self._cache_dict[source_file] = local_file def __enter__(self): diff --git a/tb_plugin/torch_tb_profiler/io/file.py b/tb_plugin/torch_tb_profiler/io/file.py index 17ec7ef81..76a5f069b 100644 --- a/tb_plugin/torch_tb_profiler/io/file.py +++ b/tb_plugin/torch_tb_profiler/io/file.py @@ -220,7 +220,7 @@ def read(self, filename, binary_mode=False, size=None, continue_from=None): endpoint = offset + size if offset != 0 or endpoint != "": - args["Range"] = f"bytes={offset}-{endpoint}" + args["Range"] = "bytes={}-{}".format(offset, endpoint) logger.info("s3: starting reading file %s" % filename) try: @@ -238,7 +238,7 @@ def read(self, filename, binary_mode=False, size=None, continue_from=None): # Asked for no bytes, so just return empty stream = b"" else: - args["Range"] = f"bytes={offset}-{endpoint}" + args["Range"] = "bytes={}-{}".format(offset, endpoint) stream = s3.Object(bucket, path).get(**args)["Body"].read() else: raise @@ -273,7 +273,7 @@ def download_file(self, file_to_download, file_to_save): s3 = boto3.resource("s3", endpoint_url=self._s3_endpoint) bucket, path = self.bucket_and_path(file_to_download) s3.Bucket(bucket).download_file(path, file_to_save) - logger.info("s3: file {} is downloaded as {}".format(file_to_download, file_to_save)) + logger.info("s3: file %s is downloaded as %s" % (file_to_download, file_to_save)) return def glob(self, filename): @@ -282,7 +282,7 @@ def glob(self, filename): star_i = filename.find("*") quest_i = filename.find("?") if quest_i >= 0: - raise NotImplementedError(f"{filename} not supported") + raise NotImplementedError("{} not supported".format(filename)) if star_i != len(filename) - 1: return [] @@ -366,7 +366,7 @@ def stat(self, filename): class File: def __init__(self, filename, mode): if mode not in ("r", "rb", "br", "w", "wb", "bw"): - raise ValueError(f"mode {mode} not supported by File") + raise ValueError("mode {} not supported by File".format(mode)) self.filename = filename self.fs = get_filesystem(self.filename) self.fs_supports_append = self.fs.support_append() @@ -615,7 +615,8 @@ def walk(top, topdown=True, onerror=None): for subdir in subdirs: joined_subdir = fs.join(top, subdir) - yield from walk(joined_subdir, topdown, onerror=onerror) + for subitem in walk(joined_subdir, topdown, onerror=onerror): + yield subitem if not topdown: yield here diff --git a/tb_plugin/torch_tb_profiler/io/gs.py b/tb_plugin/torch_tb_profiler/io/gs.py index 796e46f1e..c6076ee9f 100644 --- a/tb_plugin/torch_tb_profiler/io/gs.py +++ b/tb_plugin/torch_tb_profiler/io/gs.py @@ -82,7 +82,7 @@ def walk(self, top, topdown=True, onerror=None): results = {} for blob in blobs: dirname, basename = self.split(blob.name) - dirname = f'gs://{bucket_name}/{dirname}' + dirname = 'gs://{}/{}'.format(bucket_name, dirname) results.setdefault(dirname, []).append(basename) for key, value in results.items(): yield key, None, value diff --git a/tb_plugin/torch_tb_profiler/io/utils.py b/tb_plugin/torch_tb_profiler/io/utils.py index ba41ee638..79e9afc39 100644 --- a/tb_plugin/torch_tb_profiler/io/utils.py +++ b/tb_plugin/torch_tb_profiler/io/utils.py @@ -60,7 +60,7 @@ def as_bytes(bytes_or_text, encoding="utf-8"): return bytes_or_text else: raise TypeError( - "Expected binary or unicode string, got {!r}".format(bytes_or_text) + "Expected binary or unicode string, got %r" % (bytes_or_text,) ) diff --git a/tb_plugin/torch_tb_profiler/plugin.py b/tb_plugin/torch_tb_profiler/plugin.py index 2842d5fec..d81d7db97 100644 --- a/tb_plugin/torch_tb_profiler/plugin.py +++ b/tb_plugin/torch_tb_profiler/plugin.py @@ -47,7 +47,7 @@ def __init__(self, context: base_plugin.TBContext): Args: context: A base_plugin.TBContext instance. """ - super().__init__(context) + super(TorchProfilerPlugin, self).__init__(context) if not context.logdir and context.flags.logdir_spec: dirs = context.flags.logdir_spec.split(',') if len(dirs) > 1: @@ -332,7 +332,7 @@ def module_route(self, request: werkzeug.Request): name = request.args.get('run') worker = request.args.get('worker') span = request.args.get('span') - raise exceptions.NotFound('could not find the run for {}/{}/{}'.format(name, worker, span)) + raise exceptions.NotFound('could not find the run for %s/%s/%s' % (name, worker, span)) @wrappers.Request.application def op_tree_route(self, request: werkzeug.Request): @@ -374,7 +374,7 @@ def static_file_route(self, request: werkzeug.Request): try: with open(filepath, 'rb') as infile: contents = infile.read() - except OSError: + except IOError: raise exceptions.NotFound('404 Not Found') return werkzeug.Response( contents, content_type=mimetype, headers=TorchProfilerPlugin.headers @@ -512,7 +512,7 @@ def _load_run(self, run_dir): try: self._load_threads.remove(t) except ValueError: - logger.warning(f'could not find the thread {run_dir}') + logger.warning('could not find the thread {}'.format(run_dir)) def _get_run(self, name) -> Run: with self._runs_lock: @@ -538,7 +538,7 @@ def _get_profile_for_request(self, request: werkzeug.Request) -> RunProfile: self._validate(run=name, worker=worker) profile = self._get_profile(name, worker, span) if not isinstance(profile, RunProfile): - raise exceptions.BadRequest('Get an unexpected profile type {} for {}/{}'.format(type(profile), name, worker)) + raise exceptions.BadRequest('Get an unexpected profile type %s for %s/%s' % (type(profile), name, worker)) return profile @@ -548,7 +548,7 @@ def _get_distributed_profile_for_request(self, request: werkzeug.Request) -> Dis self._validate(run=name) profile = self._get_profile(name, 'All', span) if not isinstance(profile, DistributedRunProfile): - raise exceptions.BadRequest('Get an unexpected distributed profile type {} for {}'.format(type(profile), name)) + raise exceptions.BadRequest('Get an unexpected distributed profile type %s for %s' % (type(profile), name)) return profile @@ -556,7 +556,7 @@ def _get_profile(self, name, worker, span): run = self._get_run(name) profile = run.get_profile(worker, span) if profile is None: - raise exceptions.NotFound('could not find the profile for {}/{}/{} '.format(name, worker, span)) + raise exceptions.NotFound('could not find the profile for %s/%s/%s ' % (name, worker, span)) return profile def _validate(self, **kwargs): diff --git a/tb_plugin/torch_tb_profiler/profiler/communication.py b/tb_plugin/torch_tb_profiler/profiler/communication.py index bfde506e2..ac7f708e5 100644 --- a/tb_plugin/torch_tb_profiler/profiler/communication.py +++ b/tb_plugin/torch_tb_profiler/profiler/communication.py @@ -11,10 +11,10 @@ def generate_communication_nodes( - communication_data: dict[int, CommunicationNode], - steps: list[tuple[int, int]], - steps_names: list[str]): - comm_node_list: list[CommunicationNode] = [] + communication_data: Dict[int, CommunicationNode], + steps: List[Tuple[int, int]], + steps_names: List[str]): + comm_node_list: List[CommunicationNode] = [] # Sort the communication node according the start time, this is for correlating communication node between workers for comm_node in communication_data.values(): @@ -42,12 +42,12 @@ def generate_communication_nodes( return comm_node_list -def analyze_communication_nodes(comm_node_list: list[CommunicationNode])\ - -> tuple[dict[str, tuple[int, int]], dict[str, list[int]]]: - step_comm_stats: dict[str, tuple[int, int]] = {} - total_comm_stats: dict[str, tuple[int, int, list, list]] = {} +def analyze_communication_nodes(comm_node_list: List[CommunicationNode])\ + -> Tuple[Dict[str, Tuple[int, int]], Dict[str, List[int]]]: + step_comm_stats: Dict[str, Tuple[int, int]] = {} + total_comm_stats: Dict[str, Tuple[int, int, List, List]] = {} - step_to_comm_ranges: dict[str, tuple[list, list]] = {} + step_to_comm_ranges: Dict[str, Tuple[List, List]] = {} for comm_node in comm_node_list: if comm_node.step_name not in step_to_comm_ranges: step_to_comm_ranges[comm_node.step_name] = [[], []] @@ -73,7 +73,7 @@ def analyze_communication_nodes(comm_node_list: list[CommunicationNode])\ elif comm_node.input_type[i] == 'unsigned char': bytes_one_value = 1 else: - logger.warning(f'Found an unknown tensor type: {comm_node.input_type[i]}') + logger.warning('Found an unknown tensor type: {}'.format(comm_node.input_type[i])) bytes_one_value = 0 total_size = 1 for size in comm_node.input_shape[i]: diff --git a/tb_plugin/torch_tb_profiler/profiler/data.py b/tb_plugin/torch_tb_profiler/profiler/data.py index 0de081ba7..68ad69d6c 100644 --- a/tb_plugin/torch_tb_profiler/profiler/data.py +++ b/tb_plugin/torch_tb_profiler/profiler/data.py @@ -27,7 +27,7 @@ class RunProfileData: - def __init__(self, worker: str, span: str, trace_json: dict): + def __init__(self, worker: str, span: str, trace_json: Dict): self.worker = worker self.span = span @@ -38,7 +38,7 @@ def __init__(self, worker: str, span: str, trace_json: dict): self.device_props = trace_json.get('deviceProperties', None) self.profiler_start_ts = float('inf') - self.events: list[BaseEvent] = [] + self.events: List[BaseEvent] = [] trace_body = trace_json['traceEvents'] fwd_bwd_events = [] @@ -57,8 +57,8 @@ def __init__(self, worker: str, span: str, trace_json: dict): self.trace_file_path: str = None # Event Parser results - self.tid2tree: dict[int, OperatorNode] = None - self.pl_tid2tree: dict[int, OperatorNode] = None + self.tid2tree: Dict[int, OperatorNode] = None + self.pl_tid2tree: Dict[int, OperatorNode] = None self.used_devices = [] self.use_dp: bool = False self.use_ddp: bool = False @@ -91,7 +91,7 @@ def __init__(self, worker: str, span: str, trace_json: dict): # Communicator self.comm_node_list = None self.comm_overlap_costs = None - self.memory_snapshot: MemorySnapshot | None = None + self.memory_snapshot: Optional[MemorySnapshot] = None # recommendation based on analysis result. self.recommendations = [] @@ -105,7 +105,7 @@ def parse(worker, span, path, cache_dir): return profile @staticmethod - def from_json(worker, span, trace_json: dict): + def from_json(worker, span, trace_json: Dict): profile = RunProfileData(worker, span, trace_json) with utils.timing('Data processing'): profile.process() @@ -286,7 +286,7 @@ def _analyze_distributed_metrics(self): minor = device_prop.get('computeMinor') if major is None or minor is None: continue - compute_capability = f'{major}.{minor}' + compute_capability = '{}.{}'.format(major, minor) if float(compute_capability) >= 3.5: text = ( 'Nccl backend is currently the fastest and highly recommended backend' @@ -308,7 +308,7 @@ def _analyze_distributed_metrics(self): f"convergence and accuracy. For such case, you may want to evaluate {href('LAMB optimizer', lamb_url)}." ) - def _memory_events(self) -> list[MemoryEvent]: + def _memory_events(self) -> List[MemoryEvent]: memory_events = [e for e in self.events if e.type == EventTypes.MEMORY] memory_events.sort(key=lambda e: e.ts) return memory_events @@ -318,9 +318,9 @@ def get_gpus_str(gpus): gpu_list_str = str(gpus[0]) for i in range(1, len(gpus)): if i == len(gpus) - 1: - gpu_list_str += f'and {gpus[i]}' + gpu_list_str += 'and {}'.format(gpus[i]) else: - gpu_list_str += f', {gpus[i]}' + gpu_list_str += ', {}'.format(gpus[i]) has_str = 'has' if len(gpu_list_str) == 1 else 'have' return gpu_list_str, has_str diff --git a/tb_plugin/torch_tb_profiler/profiler/diffrun/contract.py b/tb_plugin/torch_tb_profiler/profiler/diffrun/contract.py index 0d54e351e..ce0cba35d 100644 --- a/tb_plugin/torch_tb_profiler/profiler/diffrun/contract.py +++ b/tb_plugin/torch_tb_profiler/profiler/diffrun/contract.py @@ -19,7 +19,7 @@ def __init__(self, duration, device_duration, total_duration, - aggs: list[OpAgg]): + aggs: List[OpAgg]): self.name = name self.duration = duration self.device_duration = device_duration @@ -34,10 +34,10 @@ class DiffStats: def __init__(self, left: OpStats, right: OpStats): self.left = left self.right = right - self.children: list[DiffStats] = [] + self.children: List[DiffStats] = [] - def flatten_diff_tree(self) -> dict[str, 'DiffStats']: - result: dict[str, DiffStats] = {} + def flatten_diff_tree(self) -> Dict[str, 'DiffStats']: + result: Dict[str, DiffStats] = {} def traverse(node: DiffStats, path: str): result[path] = node diff --git a/tb_plugin/torch_tb_profiler/profiler/diffrun/operator.py b/tb_plugin/torch_tb_profiler/profiler/diffrun/operator.py index 1962a28c3..ca0c1f7d4 100644 --- a/tb_plugin/torch_tb_profiler/profiler/diffrun/operator.py +++ b/tb_plugin/torch_tb_profiler/profiler/diffrun/operator.py @@ -40,7 +40,7 @@ def aggregate_ops(self): agg.self_host_duration, agg.self_device_duration) - def get_operators_and_kernels(self) -> tuple[list[OperatorNode], list[DeviceNode]]: + def get_operators_and_kernels(self) -> Tuple[List[OperatorNode], List[DeviceNode]]: return [], [] @@ -65,7 +65,7 @@ def device_duration(self) -> int: class Operators(Operator): - def __init__(self, nodes: OperatorNode | list[OperatorNode]): + def __init__(self, nodes: Union[OperatorNode, List[OperatorNode]]): if not nodes: raise ValueError('the operator node is None or empty') if isinstance(nodes, OperatorNode): @@ -73,7 +73,7 @@ def __init__(self, nodes: OperatorNode | list[OperatorNode]): elif isinstance(nodes, list): super().__init__('CompositeNodes') - self.op_nodes: OperatorNode | list[OperatorNode] = nodes + self.op_nodes: Union[OperatorNode, List[OperatorNode]] = nodes @property def duration(self): @@ -102,14 +102,14 @@ def __str__(self) -> str: else: return f'{self.name}: {self.op_nodes.__class__.__name__}: {self.total_duration}' - def get_operators_and_kernels(self) -> tuple[list[OperatorNode], list[DeviceNode]]: + def get_operators_and_kernels(self) -> Tuple[List[OperatorNode], List[DeviceNode]]: if isinstance(self.op_nodes, list): nodes = self.op_nodes else: nodes = [self.op_nodes] - ops: list[OperatorNode] = [] - kernels: list[DeviceNode] = [] + ops: List[OperatorNode] = [] + kernels: List[DeviceNode] = [] for n in nodes: o, k = n.get_operator_and_kernels() ops.extend(o) @@ -117,7 +117,7 @@ def get_operators_and_kernels(self) -> tuple[list[OperatorNode], list[DeviceNode return ops, kernels -def create_operator(op_nodes: OperatorNode | list[OperatorNode]) -> Operator: +def create_operator(op_nodes: Union[OperatorNode, List[OperatorNode]]) -> Operator: if op_nodes: return Operators(op_nodes) else: diff --git a/tb_plugin/torch_tb_profiler/profiler/diffrun/tree.py b/tb_plugin/torch_tb_profiler/profiler/diffrun/tree.py index e28265961..412c677ce 100644 --- a/tb_plugin/torch_tb_profiler/profiler/diffrun/tree.py +++ b/tb_plugin/torch_tb_profiler/profiler/diffrun/tree.py @@ -2,8 +2,7 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # ------------------------------------------------------------------------- import sys -from typing import List, Union -from collections.abc import Generator +from typing import Generator, List, Union from ..node import (BackwardNode, DataLoaderNode, ModuleNode, OperatorNode, OptimizerNode, ProfilerStepNode) @@ -18,7 +17,7 @@ class DiffNode: def __init__(self, left: Operator, right: Operator): self.left: Operator = left self.right: Operator = right - self.children: list[DiffNode] = [] + self.children: List[DiffNode] = [] def build_tree(self): '''build the children from the left_node and right_node''' @@ -44,8 +43,8 @@ def build_tree(self): @staticmethod def create_node( - left: OperatorNode | list[OperatorNode], - right: OperatorNode | list[OperatorNode]) -> 'DiffNode': + left: Union[OperatorNode, List[OperatorNode]], + right: Union[OperatorNode, List[OperatorNode]]) -> 'DiffNode': if isinstance(left, list) and len(left) == 1: left = left[0] if isinstance(right, list) and len(right) == 1: @@ -57,8 +56,8 @@ def create_node( @staticmethod def compare_operator_nodes( - left_nodes: list[OperatorNode], - right_nodes: list[OperatorNode]) -> Generator['DiffNode', None, None]: + left_nodes: List[OperatorNode], + right_nodes: List[OperatorNode]) -> Generator['DiffNode', None, None]: '''Given two OperatorNode lists, find the DataLoader/Module/Backward/Optimizer node and create the child list DiffNode ''' right_keys = [(type(r), r.name) for r in right_nodes] @@ -150,7 +149,7 @@ def diff_summary(node: DiffNode) -> DiffStats: return stats -def print_node(node: DiffNode | DiffStats, level: int, index: int, file=sys.stdout): +def print_node(node: Union[DiffNode, DiffStats], level: int, index: int, file=sys.stdout): file.write(f'{INDENT * level}level {level}, index {index}:\n') file.write(f'{INDENT * (level + 1)}left : {node.left}\n') file.write(f'{INDENT * (level + 1)}right: {node.right}\n') diff --git a/tb_plugin/torch_tb_profiler/profiler/event_parser.py b/tb_plugin/torch_tb_profiler/profiler/event_parser.py index c57f31252..b0efce664 100644 --- a/tb_plugin/torch_tb_profiler/profiler/event_parser.py +++ b/tb_plugin/torch_tb_profiler/profiler/event_parser.py @@ -4,8 +4,7 @@ import sys from collections import defaultdict from enum import IntEnum -from typing import Dict, List, Optional, Tuple -from collections.abc import Iterable +from typing import Dict, Iterable, List, Optional, Tuple from .. import utils from .communication import generate_communication_nodes @@ -39,9 +38,9 @@ def __init__(self, *args, **kwargs): """ super().__init__(*args, **kwargs) - self.communication_data: dict[int, CommunicationNode] = {} - self.device_node_list: list[DeviceNode] = [] - self.runtime_node_list: list[RuntimeNode] = [] + self.communication_data: Dict[int, CommunicationNode] = {} + self.device_node_list: List[DeviceNode] = [] + self.runtime_node_list: List[RuntimeNode] = [] self.used_devices = set() self.use_dp = False self.use_ddp = False @@ -56,15 +55,15 @@ def parse_nodes(self, events: Iterable[BaseEvent]): # Because in the case when RuntimeNode has duration 0 and starts at same time as a OperatorNode, # just use interval containing relationship can't tell it is child or brother of the OperatorNode. # value is a list of OperatorNode and ProfilerStepNode. Do not include RuntimeNode - tid2list: dict[int, list[OperatorNode]] = defaultdict(list) + tid2list: Dict[int, List[OperatorNode]] = defaultdict(list) # value is a list of PLProfileNode. Do not include RuntimeNode - pl_tid2list: dict[int, list[PLProfileNode]] = defaultdict(list) + pl_tid2list: Dict[int, List[PLProfileNode]] = defaultdict(list) # value is a list of RuntimeNode with external_id=0. They will be attached to root nodes. - tid2zero_rt_list: dict[int, list[RuntimeNode]] = defaultdict(list) - corrid_to_device: dict[int, list[DeviceNode]] = defaultdict(list) # value is a list of DeviceNode + tid2zero_rt_list: Dict[int, List[RuntimeNode]] = defaultdict(list) + corrid_to_device: Dict[int, List[DeviceNode]] = defaultdict(list) # value is a list of DeviceNode - corrid_to_runtime: dict[int, RuntimeNode] = {} # value is a RuntimeNode - externalid_to_runtime: dict[int, list[RuntimeNode]] = defaultdict(list) # value is a list of RuntimeNode + corrid_to_runtime: Dict[int, RuntimeNode] = {} # value is a RuntimeNode + externalid_to_runtime: Dict[int, List[RuntimeNode]] = defaultdict(list) # value is a list of RuntimeNode for event in events: if event.type == EventTypes.MEMORY: @@ -100,8 +99,8 @@ def parse_nodes(self, events: Iterable[BaseEvent]): for n in nodes: node_count_dict[n.type] += 1 - logger.debug("Some events doesn't belongs to any operators: " - f"{', '.join([':'.join((k, str(v))) for k, v in node_count_dict.items()])}") + logger.debug(("Some events doesn't belongs to any operators: " + f"{', '.join([':'.join((k, str(v))) for k, v in node_count_dict.items()])}")) staled_device_nodes = [] for device_nodes in corrid_to_device.values(): @@ -123,12 +122,12 @@ def _update_communication_node(self, event: KernelEvent): def _parse_node(self, event: DurationEvent, - corrid_to_device: dict[int, list[DeviceNode]], - corrid_to_runtime: dict[int, RuntimeNode], - externalid_to_runtime: dict[int, list[RuntimeNode]], - tid2list: dict[int, list[OperatorNode]], - pl_tid2list: dict[int, list[PLProfileNode]], - tid2zero_rt_list: dict[int, list[RuntimeNode]]): + corrid_to_device: Dict[int, List[DeviceNode]], + corrid_to_runtime: Dict[int, RuntimeNode], + externalid_to_runtime: Dict[int, List[RuntimeNode]], + tid2list: Dict[int, List[OperatorNode]], + pl_tid2list: Dict[int, List[PLProfileNode]], + tid2zero_rt_list: Dict[int, List[RuntimeNode]]): corrid = event.correlation_id tid = event.tid if event.type in [EventTypes.KERNEL, EventTypes.MEMCPY, EventTypes.MEMSET]: @@ -209,9 +208,9 @@ def __init__(self): # we could not use [[]] * len here since they all point to same memory # https://stackoverflow.com/questions/12791501/python-initializing-a-list-of-lists # https://stackoverflow.com/questions/240178/list-of-lists-changes-reflected-across-sublists-unexpectedly - self.role_ranges: list[list[tuple[int, int]]] = [[] for _ in range(ProfileRole.Total - 1)] - self.steps: list[tuple[int, int]] = [] - self.steps_names: list[str] = [] + self.role_ranges: List[List[Tuple[int, int]]] = [[] for _ in range(ProfileRole.Total - 1)] + self.steps: List[Tuple[int, int]] = [] + self.steps_names: List[str] = [] self.cpu_min_ts = sys.maxsize # Min time of CPU side events. self.cpu_max_ts = -sys.maxsize - 1 # Max time of CPU side events. self.global_min_ts = sys.maxsize # Min time of all events. @@ -222,7 +221,7 @@ def __init__(self): self.global_start_ts = sys.maxsize self.global_end_ts = -sys.maxsize - 1 - def parse_steps(self, events: Iterable[DurationEvent], comm_nodes: dict[int, CommunicationNode]): + def parse_steps(self, events: Iterable[DurationEvent], comm_nodes: Dict[int, CommunicationNode]): for event in events: if event.type == EventTypes.MEMORY: continue @@ -243,7 +242,7 @@ def parse_steps(self, events: Iterable[DurationEvent], comm_nodes: dict[int, Com for i in range(len(self.role_ranges)): self.role_ranges[i] = merge_ranges(self.role_ranges[i]) - def update_device_steps(self, runtime_node_list: list[RuntimeNode]): + def update_device_steps(self, runtime_node_list: List[RuntimeNode]): self._update_steps_duration(*self._find_device_steps(runtime_node_list)) @property @@ -262,7 +261,7 @@ def has_communication(self): def has_memcpy_or_memset(self): return bool(self.role_ranges[ProfileRole.Memcpy] or self.role_ranges[ProfileRole.Memset]) - def _parse_step(self, event: DurationEvent, comm_nodes: dict[int, CommunicationNode]): + def _parse_step(self, event: DurationEvent, comm_nodes: Dict[int, CommunicationNode]): ts = event.ts dur = event.duration evt_type = event.type @@ -298,16 +297,16 @@ def _parse_step(self, event: DurationEvent, comm_nodes: dict[int, CommunicationN self.global_min_ts = min(self.global_min_ts, ts) self.global_max_ts = max(self.global_max_ts, ts + dur) - def _find_device_steps(self, runtime_node_list: list[RuntimeNode]): + def _find_device_steps(self, runtime_node_list: List[RuntimeNode]): """return steps associated with device nodes. """ runtime_node_list = sorted(runtime_node_list, key=lambda x: x.start_time) # Use similar code with two-way merge to get all runtimes inside each host-side step span, # then record each step's min kernel start time and max kernel end time: - steps_device: list[tuple[int, int]] = [(sys.maxsize, -sys.maxsize - 1)] * len(self.steps) + steps_device: List[Tuple[int, int]] = [(sys.maxsize, -sys.maxsize - 1)] * len(self.steps) # where the steps associated with devcie node, if yes, the related array item is larger than 0. - steps_matched_device_nodes: list[int] = [0] * len(self.steps) + steps_matched_device_nodes: List[int] = [0] * len(self.steps) i_step = 0 i_runtime = 0 @@ -352,7 +351,7 @@ def _find_device_steps(self, runtime_node_list: list[RuntimeNode]): i_step += 1 # If there are matched device, find the first step end time before steps_device[0][0] - prev_step_end_time: int | None = None + prev_step_end_time: Optional[int] = None if len(matched_device_nodes) > 0: prev_step_end_time = self.steps[0][0] if steps_device[0][0] != sys.maxsize: # When step 0 has device event. @@ -365,9 +364,9 @@ def _find_device_steps(self, runtime_node_list: list[RuntimeNode]): return prev_step_end_time, steps_device, steps_matched_device_nodes def _update_steps_duration(self, - prev_step_end_time: int | None, - steps_device: list[tuple[int, int]], - steps_matched_device_nodes: list[int]): + prev_step_end_time: Optional[int], + steps_device: List[Tuple[int, int]], + steps_matched_device_nodes: List[int]): """Update self.steps considering device side events launched by each host side step. Update self.steps_names if some tail steps are removed.""" @@ -414,9 +413,9 @@ def _update_steps_duration(self, class EventParser(NodeParserMixin, StepParser): def __init__(self): super().__init__() - self.comm_node_list: dict[CommunicationNode] = None + self.comm_node_list: Dict[CommunicationNode] = None - def parse(self, events: Iterable[BaseEvent], fwd_bwd_map: dict[int, int]) -> dict[int, list[OperatorNode]]: + def parse(self, events: Iterable[BaseEvent], fwd_bwd_map: Dict[int, int]) -> Dict[int, List[OperatorNode]]: with utils.timing('EventParser: parse nodes'): tid2list, tid2zero_rt_list, staled_device_nodes, pl_tid2list = self.parse_nodes(events) diff --git a/tb_plugin/torch_tb_profiler/profiler/gpu_metrics_parser.py b/tb_plugin/torch_tb_profiler/profiler/gpu_metrics_parser.py index fe373a293..8a66cba73 100644 --- a/tb_plugin/torch_tb_profiler/profiler/gpu_metrics_parser.py +++ b/tb_plugin/torch_tb_profiler/profiler/gpu_metrics_parser.py @@ -1,8 +1,7 @@ # ------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # -------------------------------------------------------------------------- -from typing import List -from collections.abc import Iterable +from typing import Iterable, List from .. import consts, utils from .range_utils import (get_ranges_sum, intersection_ranges_lists, @@ -190,14 +189,14 @@ def parse_event(self, event: KernelEvent): self.blocks_per_sm_count[gpu_id] += 1 else: # Workaround for negative value input. - logger.warning(f'blocks per SM {event.blocks_per_sm} with ts {ts} is not positive!') + logger.warning('blocks per SM {} with ts {} is not positive!'.format(event.blocks_per_sm, ts)) if event.occupancy is not None: if event.occupancy >= 0.0: self.occupancy_per_device[gpu_id].append((ts, ts + dur, event.occupancy)) self.occupancy_count[gpu_id] += 1 else: # Workaround for negative value input. - logger.warning(f'est. achieved occupancy % {event.occupancy} with ts {ts} is negative!') + logger.warning('est. achieved occupancy % {} with ts {} is negative!'.format(event.occupancy, ts)) def get_gpu_metrics_columns(self): columns = [] @@ -228,11 +227,11 @@ def build_trace_counter_sm_efficiency(gpu_id, start_time, counter_value): "\"args\":{{\"Est. SM Efficiency\":{}}}}}").format(gpu_id, gpu_id, start_time, counter_value) return util_json - def add_trace_counter_gpu_util(gpu_id, start_time, counter_value, counter_json_list: list): + def add_trace_counter_gpu_util(gpu_id, start_time, counter_value, counter_json_list: List): json_str = build_trace_counter_gpu_util(gpu_id, start_time, counter_value) counter_json_list.append(json_str) - def add_trace_counter_sm_efficiency(gpu_id, start_time, end_time, value, counter_json_list: list): + def add_trace_counter_sm_efficiency(gpu_id, start_time, end_time, value, counter_json_list: List): efficiency_json_start = build_trace_counter_sm_efficiency(gpu_id, start_time, value) efficiency_json_finish = build_trace_counter_sm_efficiency(gpu_id, end_time, 0) counter_json_list.append(efficiency_json_start) @@ -272,7 +271,7 @@ def get_gpu_metrics_data_tooltip( def process_gpu(gpu_id: int): nonlocal has_sm_efficiency, has_occupancy, has_tc - gpu_metrics_data.append({'title': f'GPU {gpu_id}:', 'value': ''}) + gpu_metrics_data.append({'title': 'GPU {}:'.format(gpu_id), 'value': ''}) gpu_info = gpu_infos.get(gpu_id, None) if gpu_info is not None: for key in gpu_info_columns: @@ -304,7 +303,7 @@ def process_gpu(gpu_id: int): process_gpu(gpu_ids[idx]) tooltip_summary = 'The GPU usage metrics:\n' - tooltip = f'{tooltip_summary}\n{consts.TOOLTIP_GPU_UTIL}' + tooltip = '{}\n{}'.format(tooltip_summary, consts.TOOLTIP_GPU_UTIL) if has_sm_efficiency: tooltip += '\n' + consts.TOOLTIP_SM_EFFICIENCY if has_occupancy: diff --git a/tb_plugin/torch_tb_profiler/profiler/kernel_parser.py b/tb_plugin/torch_tb_profiler/profiler/kernel_parser.py index a6ed0ef6c..838fc38ce 100644 --- a/tb_plugin/torch_tb_profiler/profiler/kernel_parser.py +++ b/tb_plugin/torch_tb_profiler/profiler/kernel_parser.py @@ -12,7 +12,7 @@ class KernelParser: def __init__(self): - self.kernel_stat: pd.DataFrame | None = None + self.kernel_stat: Optional[pd.DataFrame] = None self.tc_used_ratio = 0.0 def parse_events(self, events): diff --git a/tb_plugin/torch_tb_profiler/profiler/loader.py b/tb_plugin/torch_tb_profiler/profiler/loader.py index 44730a57b..930a4f2c2 100644 --- a/tb_plugin/torch_tb_profiler/profiler/loader.py +++ b/tb_plugin/torch_tb_profiler/profiler/loader.py @@ -1,3 +1,4 @@ + # ------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # -------------------------------------------------------------------------- @@ -59,7 +60,7 @@ def load(self): run = Run(self.run_name, self.run_dir) num_items = len(workers) while num_items > 0: - item: tuple[RunProfile, DistributedRunProfileData] = self.queue.get() + item: Tuple[RunProfile, DistributedRunProfileData] = self.queue.get() num_items -= 1 r, d = item if r or d: @@ -116,9 +117,9 @@ def _process_spans(self, distributed_run: Run): span_profiles.append(p) return span_profiles - def _process_distributed_profiles(self, profiles: list[DistributedRunProfileData], span): + def _process_distributed_profiles(self, profiles: List[DistributedRunProfileData], span): has_communication = True - comm_node_lists: list[list[CommunicationNode]] = [] + comm_node_lists: List[List[CommunicationNode]] = [] for data in profiles: logger.debug('Processing profile data') # Set has_communication to False and disable distributed view if any one worker has no communication diff --git a/tb_plugin/torch_tb_profiler/profiler/memory_parser.py b/tb_plugin/torch_tb_profiler/profiler/memory_parser.py index f97dd107e..014542c56 100644 --- a/tb_plugin/torch_tb_profiler/profiler/memory_parser.py +++ b/tb_plugin/torch_tb_profiler/profiler/memory_parser.py @@ -3,8 +3,7 @@ # -------------------------------------------------------------------------- from collections import defaultdict from enum import IntEnum -from typing import Dict, List, Optional, Tuple -from collections.abc import Iterable +from typing import Dict, Iterable, List, Optional, Tuple from .. import utils from .node import OperatorNode, is_operator_node @@ -37,15 +36,15 @@ def __init__(self, scope: str, pid: int, tid: int, ts: int, self.bytes = bytes self.total_allocated = total_allocated self.total_reserved = total_reserved - self.op_name: str | None = None - self.parent_op_name: str | None = None + self.op_name: Optional[str] = None + self.parent_op_name: Optional[str] = None @property def device_name(self): if self.device_type == DeviceType.CPU: return 'CPU' elif self.device_type == DeviceType.CUDA: - return f'GPU{self.device_id}' + return 'GPU{}'.format(self.device_id) else: return None @@ -68,8 +67,8 @@ def __repr__(self) -> str: class MemorySnapshot: def __init__(self, memory_records: Iterable[MemoryRecord], - op_memory_table: dict[OperatorNode, list[MemoryRecord]], - processed_nodes: dict[OperatorNode, int]) -> None: + op_memory_table: Dict[OperatorNode, List[MemoryRecord]], + processed_nodes: Dict[OperatorNode, int]) -> None: self.memory_records = memory_records self.op_memory_table = op_memory_table # the visited node times from parent to child @@ -77,7 +76,7 @@ def __init__(self, memory_records: Iterable[MemoryRecord], self.processed_node = processed_nodes self.unreached_node = defaultdict(list) - def get_peak_memory(self) -> dict[tuple[DeviceType, int], int]: + def get_peak_memory(self) -> Dict[Tuple[DeviceType, int], int]: peaks = defaultdict(int) for r in self.memory_records: if r.total_allocated == r.total_allocated: # !isnan @@ -85,8 +84,8 @@ def get_peak_memory(self) -> dict[tuple[DeviceType, int], int]: return peaks def get_memory_statistics(self, - tid2tree: dict[int, OperatorNode], - start_ts=None, end_ts=None) -> dict[str, dict[str, list[int]]]: + tid2tree: Dict[int, OperatorNode], + start_ts=None, end_ts=None) -> Dict[str, Dict[str, List[int]]]: metric_length = len(MemoryMetrics) self_metric_length = metric_length // 2 @@ -94,10 +93,10 @@ def dict_factory(): return defaultdict(lambda: [0] * metric_length) # traverse outputs - op_list: list[OperatorNode] = [] + op_list: List[OperatorNode] = [] # two level keys dictionary # first keyed by node, then keyed by device (CPU/GPU0/GPU1/etc.) - memory_metrics_keyed_by_node: dict[OperatorNode, dict[str, list[int]]] = defaultdict(dict_factory) + memory_metrics_keyed_by_node: Dict[OperatorNode, Dict[str, List[int]]] = defaultdict(dict_factory) def traverse_node_memory(node: OperatorNode): if start_ts is not None and node.end_time < start_ts: @@ -139,7 +138,7 @@ def traverse_node_memory(node: OperatorNode): # keyed first by device name like CPU/GPU0 etc, then keyed by operator name. # the value is array [items indexed by MemoryMetrics] - memory_metrics_keyed_by_nodename: dict[str, dict[str, list[int]]] = defaultdict(dict_factory) + memory_metrics_keyed_by_nodename: Dict[str, Dict[str, List[int]]] = defaultdict(dict_factory) # node: the instance, device_keyed_metrics: dictionary keyed by device name like CPU/GPU0 for node, device_keyed_metrics in memory_metrics_keyed_by_node.items(): if not is_operator_node(node): @@ -151,12 +150,12 @@ def traverse_node_memory(node: OperatorNode): memory_metrics_keyed_by_nodename[device][node.name][i] += metric # get the op_calls dictionary from module parser result. - op_calls: dict[str, int] = defaultdict(int) + op_calls: Dict[str, int] = defaultdict(int) agg_result = aggregate_ops(op_list, [lambda op: op.name]) for op_name, op_agg in agg_result[0].items(): op_calls[op_name] += op_agg.calls - result: dict[str, dict[str, list[int]]] = defaultdict(defaultdict) + result: Dict[str, Dict[str, List[int]]] = defaultdict(defaultdict) for device, node_metrics in memory_metrics_keyed_by_nodename.items(): for node, values in node_metrics.items(): if any(values): @@ -166,7 +165,7 @@ def traverse_node_memory(node: OperatorNode): def get_memory_metrics(self, op: OperatorNode, start_ts, end_ts): metrics_count = len([e.name for e in MemoryMetrics if e.name.startswith('Self')]) - memory_metrics: dict[str, list[int]] = defaultdict(lambda: [0] * metrics_count) + memory_metrics: Dict[str, List[int]] = defaultdict(lambda: [0] * metrics_count) for record in self.op_memory_table[op]: if start_ts is not None and record.ts < start_ts: continue @@ -187,16 +186,16 @@ def get_memory_metrics(self, op: OperatorNode, start_ts, end_ts): class MemoryParser: def __init__(self, memory_events: Iterable[MemoryEvent]): # statistics purpose - self.staled_records: list[MemoryRecord] = [] - self.processed_records: list[MemoryRecord] = [] - self.memory_records: list[MemoryRecord] = [MemoryRecord.from_event(e) for e in memory_events] + self.staled_records: List[MemoryRecord] = [] + self.processed_records: List[MemoryRecord] = [] + self.memory_records: List[MemoryRecord] = [MemoryRecord.from_event(e) for e in memory_events] - def find_memory_nodes(self, tid2tree: dict[int, OperatorNode]) -> MemorySnapshot: - records_by_tid: dict[int, list[MemoryRecord]] = defaultdict(list) + def find_memory_nodes(self, tid2tree: Dict[int, OperatorNode]) -> MemorySnapshot: + records_by_tid: Dict[int, List[MemoryRecord]] = defaultdict(list) for r in self.memory_records: records_by_tid[r.tid].append(r) - op_memory_table: dict[OperatorNode, list[MemoryRecord]] = defaultdict(list) + op_memory_table: Dict[OperatorNode, List[MemoryRecord]] = defaultdict(list) processed_node = defaultdict(int) tree_height = 0 @@ -205,7 +204,7 @@ def find_memory_nodes(self, tid2tree: dict[int, OperatorNode]) -> MemorySnapshot continue # each item is (parent_node, child_index) that it is visiting. - node_stack: list[tuple[OperatorNode, int]] = [] + node_stack: List[Tuple[OperatorNode, int]] = [] record_index = 0 current_node: OperatorNode = tid2tree.get(tid) @@ -297,7 +296,7 @@ def find_memory_nodes(self, tid2tree: dict[int, OperatorNode]) -> MemorySnapshot logger.debug('{} memory records are skipped in total {} memory records and only {} get processed'.format( len(self.staled_records), len(self.memory_records), len(self.processed_records))) if tree_height > 0: - logger.debug(f'max tree height is {tree_height}') + logger.debug('max tree height is {}'.format(tree_height)) all_records = self.get_preprocessed_records() return MemorySnapshot(all_records, op_memory_table, processed_node) diff --git a/tb_plugin/torch_tb_profiler/profiler/module_op.py b/tb_plugin/torch_tb_profiler/profiler/module_op.py index 27186770f..68e74d578 100644 --- a/tb_plugin/torch_tb_profiler/profiler/module_op.py +++ b/tb_plugin/torch_tb_profiler/profiler/module_op.py @@ -2,8 +2,7 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # ------------------------------------------------------------------------- from collections import namedtuple -from typing import Dict, List, Optional, Set, Tuple, Union -from collections.abc import Generator, Iterable +from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple, Union from .node import (DataLoaderNode, ModuleNode, OperatorNode, OptimizerNode, PLModuleNode, ProfilerStepNode, is_operator_node) @@ -14,7 +13,7 @@ class Module: def __init__(self, name: str, module_id: int, shape: str = ''): self.name = name self.module_id = module_id - self.children: list[Module] = [] + self.children: List[Module] = [] def __hash__(self): return hash((self.name, self.module_id, tuple(self.children))) @@ -61,7 +60,7 @@ def avg_device_duration(self): 'children']) -def aggegate_module_view(tid2tree: dict[int, OperatorNode], events: list[BaseEvent]) -> list[Stats] | None: +def aggegate_module_view(tid2tree: Dict[int, OperatorNode], events: List[BaseEvent]) -> Optional[List[Stats]]: roots = _build_module_hierarchy(events) modules = _get_node_list(tid2tree, ModuleNode) if modules and roots: @@ -70,7 +69,7 @@ def aggegate_module_view(tid2tree: dict[int, OperatorNode], events: list[BaseEve return None -def aggegate_pl_module_view(tid2tree: dict[int, OperatorNode], events: list[BaseEvent]) -> list[Stats] | None: +def aggegate_pl_module_view(tid2tree: Dict[int, OperatorNode], events: List[BaseEvent]) -> Optional[List[Stats]]: roots = _build_module_hierarchy_from_name(events) modules = _get_node_list(tid2tree, PLModuleNode) if modules and roots: @@ -79,10 +78,10 @@ def aggegate_pl_module_view(tid2tree: dict[int, OperatorNode], events: list[Base return None -def _build_module_hierarchy_from_name(events: list[PLModuleEvent]) -> list[Module]: +def _build_module_hierarchy_from_name(events: List[PLModuleEvent]) -> List[Module]: pl_module_events = [e for e in events if e.type == EventTypes.PL_MODULE] - name2module: dict[str, Module] = {} - no_root: set[str] = set() + name2module: Dict[str, Module] = {} + no_root: Set[str] = set() for event in pl_module_events: if event.name not in name2module: @@ -99,14 +98,14 @@ def _build_module_hierarchy_from_name(events: list[PLModuleEvent]) -> list[Modul return [module for name, module in name2module.items() if name not in no_root] -def _build_module_hierarchy(events: list[PythonFunctionEvent]) -> list[Module]: +def _build_module_hierarchy(events: List[PythonFunctionEvent]) -> List[Module]: """Get the module hierarchy from the chome trace events """ python_events = [e for e in events if e.type in (EventTypes.PYTHON_FUNCTION, EventTypes.MODULE)] id_to_event = {e.python_id: e for e in python_events} # Extract Python function topology. - children: dict[int, list[int]] = {} + children: Dict[int, List[int]] = {} for e in python_events: e_id = e.python_id children.setdefault(e_id, []) @@ -139,7 +138,7 @@ def _build_module_hierarchy(events: list[PythonFunctionEvent]) -> list[Module]: e = id_to_event.get(e.python_parent_id, None) module_roots = [k for k, v in module_parent_map.items() if v is None] - module_child_map: dict[int, list[int]] = {} + module_child_map: Dict[int, List[int]] = {} for child_id, parent_id in module_parent_map.items(): module_child_map.setdefault(child_id, []) module_child_map.setdefault(parent_id, []) @@ -155,7 +154,7 @@ def append_hierarchy(e_id) -> Module: module.children.append(child) return module - unique_modules: set[Module] = set() + unique_modules: Set[Module] = set() for e_id in module_roots: root = append_hierarchy(e_id) unique_modules.add(root) @@ -163,9 +162,9 @@ def append_hierarchy(e_id) -> Module: return list(unique_modules) -def _aggregate_modules(modules: Iterable[ModuleNode | PLModuleNode]) -> dict[tuple[str, int], ModuleStats]: +def _aggregate_modules(modules: Iterable[Union[ModuleNode, PLModuleNode]]) -> Dict[Tuple[str, int], ModuleStats]: """Aggregate the modules based on the name and module_id""" - module_aggs: dict[tuple(str, int), ModuleStats] = {} + module_aggs: Dict[Tuple(str, int), ModuleStats] = {} for m in modules: key = (m.name, m.module_id) if key not in module_aggs: @@ -184,7 +183,7 @@ def _aggregate_modules(modules: Iterable[ModuleNode | PLModuleNode]) -> dict[tup return module_aggs -def _get_node_list(tid2tree: dict[int, OperatorNode], node_class) -> Generator[OperatorNode, None, None]: +def _get_node_list(tid2tree: Dict[int, OperatorNode], node_class) -> Generator[OperatorNode, None, None]: """Get all node with node_class from the operator tree""" def traverse_node(node): # Check OptimizerNode here because in PytorchLightning PLModuleNode is under OptimizerNoder. @@ -203,8 +202,8 @@ def traverse_node(node): def _process_module_statistics( - modules_nodes: Iterable[ModuleNode | PLModuleNode], - hierarchy: Iterable[Module]) -> list[Stats]: + modules_nodes: Iterable[Union[ModuleNode, PLModuleNode]], + hierarchy: Iterable[Module]) -> List[Stats]: """Get the module statistics from the ModuleNode(s) and the hierarchy """ module_aggs = _aggregate_modules(modules_nodes) @@ -231,13 +230,13 @@ def process_modules(h_modules: Iterable[Module]): return data -def get_module_tree(tid2tree: dict[int, OperatorNode]): +def get_module_tree(tid2tree: Dict[int, OperatorNode]): """Get the module tree in timeline""" from copy import copy modules = [] - def traverse_node(node, parent: ModuleNode | None): + def traverse_node(node, parent: Optional[ModuleNode]): if type(node) not in (ProfilerStepNode, ModuleNode): return @@ -263,7 +262,7 @@ def traverse_node(node, parent: ModuleNode | None): return modules -def dump_modules(level: int, modules: Iterable[Module | ModuleNode]): +def dump_modules(level: int, modules: Iterable[Union[Module, ModuleNode]]): """testing purpose""" for module in modules: print(f"{' ' * level}{module.name.replace('nn.Module: ', '')}_{module.module_id}") diff --git a/tb_plugin/torch_tb_profiler/profiler/node.py b/tb_plugin/torch_tb_profiler/profiler/node.py index 697a28cd4..70866869d 100644 --- a/tb_plugin/torch_tb_profiler/profiler/node.py +++ b/tb_plugin/torch_tb_profiler/profiler/node.py @@ -17,7 +17,7 @@ class BaseNode(ABC): def __init__(self, name: str, start_time: int, end_time: int, type: str, tid: int, - external_id: int | None = None): + external_id: Optional[int] = None): self.name = name self.start_time = start_time self.end_time = end_time @@ -49,12 +49,12 @@ def duration(self) -> int: class CommunicationNode(BaseNode): - def __init__(self, input_shape: list[list[int]], input_type: list[str], **kwargs): + def __init__(self, input_shape: List[List[int]], input_type: List[str], **kwargs): super().__init__(**kwargs) self.input_shape = input_shape self.input_type = input_type - self.kernel_ranges: list[tuple[int, int]] = [] - self.real_time_ranges: list[tuple[int, int]] = [] + self.kernel_ranges: List[Tuple[int, int]] = [] + self.real_time_ranges: List[Tuple[int, int]] = [] self.total_time: int = 0 self.real_time: int = 0 self.step_name: str = None @@ -75,12 +75,12 @@ class OperatorNode(HostNode): # Don't use [] as default parameters # https://stackoverflow.com/questions/1132941/least-astonishment-and-the-mutable-default-argument?page=1&tab=votes#tab-top # https://web.archive.org/web/20200221224620/http://effbot.org/zone/default-values.htm - def __init__(self, children=None, runtimes=None, input_shape: list[list[int]] | None = None, - input_type: list[str] | None = None, callstack: str | None = None, + def __init__(self, children=None, runtimes=None, input_shape: Optional[List[List[int]]] = None, + input_type: Optional[List[str]] = None, callstack: Optional[str] = None, self_host_duration: int = 0, self_device_duration: int = 0, **kwargs): super().__init__(**kwargs) - self.children: list[OperatorNode] = [] if children is None else children # OperatorNode and ProfilerStepNode. - self.runtimes: list[RuntimeNode] = [] if runtimes is None else runtimes # RuntimeNode + self.children: List[OperatorNode] = [] if children is None else children # OperatorNode and ProfilerStepNode. + self.runtimes: List[RuntimeNode] = [] if runtimes is None else runtimes # RuntimeNode self.input_shape = input_shape self.input_type = input_type self.callstack = callstack @@ -120,12 +120,12 @@ def fill_stats(self): self.tc_self_duration += rt.tc_duration self.tc_total_duration += rt.tc_duration if self.type == EventTypes.OPERATOR and not self.tc_eligible and rt.tc_duration > 0: - logger.warning(f"New Tensor Cores eligible operator found: '{self.name}'!") + logger.warning("New Tensor Cores eligible operator found: '{}'!".format(self.name)) self.tc_eligible = True def get_operator_and_kernels(self): - ops: list[OperatorNode] = [] - kernels: list[DeviceNode] = [] + ops: List[OperatorNode] = [] + kernels: List[DeviceNode] = [] for child in self.children: child_ops, child_kernels = child.get_operator_and_kernels() ops.extend(child_ops) @@ -230,7 +230,7 @@ def __init__(self, **kwargs): class RuntimeNode(HostNode): - def __init__(self, device_nodes: list['DeviceNode'] | None = None, **kwargs): + def __init__(self, device_nodes: Optional[List['DeviceNode']] = None, **kwargs): super().__init__(**kwargs) # One runtime could trigger more than one kernel, such as cudaLaunchCooperativeKernelMultiDevice. self.device_nodes = sorted(device_nodes, key=lambda x: (x.start_time, -x.end_time)) if device_nodes else None @@ -253,17 +253,17 @@ def get_kernels(self): yield d @classmethod - def create(cls, event, device_nodes: list['DeviceNode'] | None): + def create(cls, event, device_nodes: Optional[List['DeviceNode']]): kwargs = BaseNode.get_node_argument(event) return cls(device_nodes=device_nodes, **kwargs) class DeviceNode(BaseNode): def __init__(self, - blocks_per_sm: float | None = None, + blocks_per_sm: Optional[float] = None, occupancy: int = None, - grid: list[int] | None = None, - block: list[int] | None = None, + grid: Optional[List[int]] = None, + block: Optional[List[int]] = None, regs_per_thread: int = None, shared_memory: int = None, device_id: int = None, **kwargs): diff --git a/tb_plugin/torch_tb_profiler/profiler/op_agg.py b/tb_plugin/torch_tb_profiler/profiler/op_agg.py index ffb6becec..8a1af502f 100644 --- a/tb_plugin/torch_tb_profiler/profiler/op_agg.py +++ b/tb_plugin/torch_tb_profiler/profiler/op_agg.py @@ -3,8 +3,7 @@ # -------------------------------------------------------------------------- import sys from collections import defaultdict -from typing import Dict, List -from collections.abc import Callable +from typing import Callable, Dict, List from .. import utils from .node import DeviceNode, OperatorNode @@ -37,9 +36,9 @@ def tc_total_ratio(self) -> float: return self.tc_total_duration / self.device_duration if self.device_duration > 0 else 0 -def aggregate_ops(op_list: list[OperatorNode], - keys_func: list[Callable[[OperatorNode], str]]) -> list[dict[str, OperatorAgg]]: - def aggregate(key_to_agg: dict[str, OperatorAgg], key: str, op: OperatorNode): +def aggregate_ops(op_list: List[OperatorNode], + keys_func: List[Callable[[OperatorNode], str]]) -> List[Dict[str, OperatorAgg]]: + def aggregate(key_to_agg: Dict[str, OperatorAgg], key: str, op: OperatorNode): if key not in key_to_agg: key_to_agg[key] = OperatorAgg(op) agg = key_to_agg[key] @@ -53,7 +52,7 @@ def aggregate(key_to_agg: dict[str, OperatorAgg], key: str, op: OperatorNode): agg.tc_total_duration += op.tc_total_duration return agg - agg_dicts: list[dict[str, OperatorAgg]] = [{} for _ in range(len(keys_func))] + agg_dicts: List[Dict[str, OperatorAgg]] = [{} for _ in range(len(keys_func))] for op in op_list: for i, key_func in enumerate(keys_func): key = key_func(op) @@ -93,8 +92,8 @@ def avg_occupancy(self) -> float: return self.occupancy / self.total_duration if self.total_duration > 0 else 0 -def aggregate_kernels(kernel_list: list[DeviceNode]) -> list[KernelAggByNameOp]: - name_op_to_agg: dict[str, KernelAggByNameOp] = {} +def aggregate_kernels(kernel_list: List[DeviceNode]) -> List[KernelAggByNameOp]: + name_op_to_agg: Dict[str, KernelAggByNameOp] = {} for kernel in kernel_list: dur = kernel.end_time - kernel.start_time op_name = 'N/A' if kernel.op_name is None else kernel.op_name @@ -118,17 +117,17 @@ def aggregate_kernels(kernel_list: list[DeviceNode]) -> list[KernelAggByNameOp]: class ModuleAggregator: def __init__(self): - self.op_list_groupby_name: list[OperatorAgg] = None # For Operator-view. - self.op_list_groupby_name_input: list[OperatorAgg] = None # For Operator-view. - self.kernel_list_groupby_name_op: list[KernelAggByNameOp] = None # For Kernel-view. - self.stack_lists_group_by_name: dict[str, list[OperatorAgg]] = None - self.stack_lists_group_by_name_input: dict[str, list[OperatorAgg]] = None - self.ops: list[OperatorNode] = None - - def aggregate(self, tid2tree: dict[int, OperatorNode]): + self.op_list_groupby_name: List[OperatorAgg] = None # For Operator-view. + self.op_list_groupby_name_input: List[OperatorAgg] = None # For Operator-view. + self.kernel_list_groupby_name_op: List[KernelAggByNameOp] = None # For Kernel-view. + self.stack_lists_group_by_name: Dict[str, List[OperatorAgg]] = None + self.stack_lists_group_by_name_input: Dict[str, List[OperatorAgg]] = None + self.ops: List[OperatorNode] = None + + def aggregate(self, tid2tree: Dict[int, OperatorNode]): # get the operators and kernels recursively by traverse the node tree root. - ops: list[OperatorNode] = [] - kernels: list[DeviceNode] = [] + ops: List[OperatorNode] = [] + kernels: List[DeviceNode] = [] for root in tid2tree.values(): root_ops, root_kernels = root.get_operator_and_kernels() ops.extend(root_ops) @@ -137,15 +136,15 @@ def aggregate(self, tid2tree: dict[int, OperatorNode]): # aggregate both kernels and operators self.kernel_list_groupby_name_op = aggregate_kernels(kernels) - keys: list[Callable[[OperatorNode], str]] = [ + keys: List[Callable[[OperatorNode], str]] = [ lambda x: x.name, lambda x: '###'.join((x.name, str(x.input_shape))), lambda x: '###'.join((x.name, str(x.callstack))), lambda x: '###'.join((x.name, str(x.input_shape), str(x.callstack))) ] agg_result = aggregate_ops(ops, keys) - stack_lists_group_by_name: dict[str, list[OperatorAgg]] = defaultdict(list) - stack_lists_group_by_name_input: dict[str, list[OperatorAgg]] = defaultdict(list) + stack_lists_group_by_name: Dict[str, List[OperatorAgg]] = defaultdict(list) + stack_lists_group_by_name_input: Dict[str, List[OperatorAgg]] = defaultdict(list) for agg in agg_result[2].values(): assert (len(agg.callstacks) == 1) if list(agg.callstacks)[0]: diff --git a/tb_plugin/torch_tb_profiler/profiler/op_tree.py b/tb_plugin/torch_tb_profiler/profiler/op_tree.py index 69acb34f3..a27014b06 100644 --- a/tb_plugin/torch_tb_profiler/profiler/op_tree.py +++ b/tb_plugin/torch_tb_profiler/profiler/op_tree.py @@ -3,8 +3,7 @@ # ------------------------------------------------------------------------- import sys from collections import defaultdict -from typing import Dict, List, Optional, Tuple -from collections.abc import Iterable +from typing import Dict, Iterable, List, Optional, Tuple from .. import utils from .node import (BackwardNode, DeviceNode, ModuleNode, OperatorNode, @@ -20,13 +19,13 @@ class OpTreeBuilder: def __init__(self): self.main_tid: int = None - self.tid2tree: dict[int, OperatorNode] = None + self.tid2tree: Dict[int, OperatorNode] = None def build_tree(self, - tid2list: dict[int, list[OperatorNode]], - tid2zero_rt_list: dict[int, list[RuntimeNode]], - staled_device_nodes: list[DeviceNode], - fwd_bwd_map: dict[int, int]): + tid2list: Dict[int, List[OperatorNode]], + tid2zero_rt_list: Dict[int, List[RuntimeNode]], + staled_device_nodes: List[DeviceNode], + fwd_bwd_map: Dict[int, int]): """Construct the BackwardNode and replace the original backward nodes """ self.tid2tree = self._build_tree(tid2list, tid2zero_rt_list, staled_device_nodes) @@ -48,7 +47,7 @@ def build_tree(self, if len(agg_nodes) > 0: logger.warning('some nodes cannot find forward nodes') - backward_modules: list[BackwardNode] = [] + backward_modules: List[BackwardNode] = [] for module in modules: OpTreeBuilder._build_backward_module(module, None, fwd_bwd_root, backward_modules) OpTreeBuilder._insert_backward_modules(self.tid2tree[self.main_tid], backward_modules) @@ -56,7 +55,7 @@ def build_tree(self, return self.tid2tree - def _build_tree(self, tid2list: dict[int, list[OperatorNode]], tid2zero_rt_list, staled_device_nodes): + def _build_tree(self, tid2list: Dict[int, List[OperatorNode]], tid2zero_rt_list, staled_device_nodes): tid2tree = {} for tid, op_list in tid2list.items(): @@ -101,7 +100,7 @@ def _build_tree_internal(self, host_node_list, zero_rt_list, tid, staled_device_ zero_rt_list: list of RuntimeNode with external_id=0.""" def build_tree_relationship(host_node_list: Iterable[OperatorNode], zero_rt_list, staled_device_nodes): - dummpy_rt: list[RuntimeNode] = [] + dummpy_rt: List[RuntimeNode] = [] if staled_device_nodes: # Note: Although kernels of this dummy runtime is put under main thread's tree, # we don't know which thread launches them. @@ -114,7 +113,7 @@ def build_tree_relationship(host_node_list: Iterable[OperatorNode], zero_rt_list tid=0, device_nodes=staled_device_nodes)) dummpy_rt[0].fill_stats() - node_stack: list[OperatorNode] = [] + node_stack: List[OperatorNode] = [] root_node = OperatorNode( name='CallTreeRoot', start_time=-sys.maxsize - 1, @@ -170,13 +169,13 @@ def remove_dup_nodes(node: OperatorNode): if child.end_time is not None), None) return root_node - def _get_modules(self) -> tuple[list[ModuleNode], list[OperatorNode]]: + def _get_modules(self) -> Tuple[List[ModuleNode], List[OperatorNode]]: """Get the ModuleNodes and backward root nodes If there are any ModuleNodes, the backward roots will be removed from the tree so that later a new BackwardNode will be replaced. """ - modules: list[ModuleNode] = [] - backward_nodes: dict[OperatorNode, list[OperatorNode]] = defaultdict(list) + modules: List[ModuleNode] = [] + backward_nodes: Dict[OperatorNode, List[OperatorNode]] = defaultdict(list) def traverse_node(parent, node: OperatorNode): if isinstance(node, ModuleNode): @@ -195,7 +194,7 @@ def traverse_node(parent, node: OperatorNode): traverse_node(root, child) if modules: - backward_nodes_flatten: list[OperatorNode] = [] + backward_nodes_flatten: List[OperatorNode] = [] # only remove the backward nodes when the module information exist for p, nodes in backward_nodes.items(): p.children = [child for child in p.children if child not in nodes] @@ -208,8 +207,8 @@ def traverse_node(parent, node: OperatorNode): @staticmethod def _get_node_parents(nodes: Iterable[OperatorNode]): """Get the child->parent relationship for these nodes""" - ts_to_node: dict[int, OperatorNode] = {} - ts_to_parent: dict[int, OperatorNode] = {} + ts_to_node: Dict[int, OperatorNode] = {} + ts_to_parent: Dict[int, OperatorNode] = {} def traverse_node(node: OperatorNode): if node.start_time not in ts_to_node: @@ -224,12 +223,12 @@ def traverse_node(node: OperatorNode): return ts_to_node, ts_to_parent @staticmethod - def _group_backward_nodes(nodes: Iterable[OperatorNode]) -> dict[OperatorNode, list[OperatorNode]]: + def _group_backward_nodes(nodes: Iterable[OperatorNode]) -> Dict[OperatorNode, List[OperatorNode]]: """All nodes are backward nodes startswith autograd::engine::evaluate_function. If one node's name is autograd::engine::evaluate_function: torch::autograd::AccumulateGrad, it should be grouped with previous normal backward node. Otherwise, a new backward node should be started """ - grouped_bwd_nodes: list[list[OperatorNode]] = [] + grouped_bwd_nodes: List[List[OperatorNode]] = [] for node in nodes: if node.name == OpTreeBuilder.BACKWARD_ACCUMULATE_GRAD: grouped_bwd_nodes[-1].append(node) @@ -241,13 +240,13 @@ def _group_backward_nodes(nodes: Iterable[OperatorNode]) -> dict[OperatorNode, l return {nodes[0]: nodes for nodes in grouped_bwd_nodes} @staticmethod - def _get_backward_roots(fwd_bwd_map: dict[int, int], - ts2parent: dict[int, OperatorNode], - backward_nodes: dict[OperatorNode, list[OperatorNode]]) -> dict[int, list[OperatorNode]]: + def _get_backward_roots(fwd_bwd_map: Dict[int, int], + ts2parent: Dict[int, OperatorNode], + backward_nodes: Dict[OperatorNode, List[OperatorNode]]) -> Dict[int, List[OperatorNode]]: if not fwd_bwd_map: return None - fwd_to_bwdroot: dict[int, list[OperatorNode]] = {} + fwd_to_bwdroot: Dict[int, List[OperatorNode]] = {} for fwd, bwd in fwd_bwd_map.items(): parent = ts2parent.get(bwd) while parent is not None and not parent.name.startswith(OpTreeBuilder.BACKWARD_ROOT_PREFIX): @@ -261,9 +260,9 @@ def _get_backward_roots(fwd_bwd_map: dict[int, int], return fwd_to_bwdroot def _build_backward_module(node: ModuleNode, - parent: BackwardNode | None, - fwd_bwd_map: dict[int, list[OperatorNode]], - result: list[BackwardNode]): + parent: Optional[BackwardNode], + fwd_bwd_map: Dict[int, List[OperatorNode]], + result: List[BackwardNode]): """Construct the backward module from root (node argument) and insert it into result array if there is no any parent associated with it. """ @@ -294,7 +293,7 @@ def _build_backward_module(node: ModuleNode, parent.tid = parent.children[0].tid @staticmethod - def _insert_backward_modules(root: OperatorNode, backward_modules: list[BackwardNode]): + def _insert_backward_modules(root: OperatorNode, backward_modules: List[BackwardNode]): backward_modules.sort(key=lambda x: (x.start_time, -x.end_time)) # each item is (parent_node, child_index) that it is visiting. diff --git a/tb_plugin/torch_tb_profiler/profiler/overall_parser.py b/tb_plugin/torch_tb_profiler/profiler/overall_parser.py index 392c14158..590672974 100644 --- a/tb_plugin/torch_tb_profiler/profiler/overall_parser.py +++ b/tb_plugin/torch_tb_profiler/profiler/overall_parser.py @@ -13,7 +13,7 @@ class OverallParser: class Costs: - def __init__(self, costs: list[float] = None): + def __init__(self, costs: List[float] = None): # the cost length is len(ProfileRole) if costs is None: self.costs = [0.] * len(ProfileRole) @@ -29,18 +29,18 @@ def create_from_statistics(cls, statistics: 'OverallParser.Statistics', total_du return cls(costs) class Statistics: - def __init__(self, cost_ranges: list[list[tuple[int, int]]]): + def __init__(self, cost_ranges: List[List[Tuple[int, int]]]): if not cost_ranges: raise ValueError('the cost ranges is None') self.cost_ranges = cost_ranges @classmethod - def create_from_range(cls, steps: list[tuple[int, int]], role_ranges: list[list[tuple[int, int]]]): + def create_from_range(cls, steps: List[Tuple[int, int]], role_ranges: List[List[Tuple[int, int]]]): assert len(role_ranges) == ProfileRole.Total - 1 - cost_ranges: list[list[tuple[int, int]]] = [] - slots: list[tuple[int, int]] = [] + cost_ranges: List[List[Tuple[int, int]]] = [] + slots: List[Tuple[int, int]] = [] for role in role_ranges: if slots: range = intersection_ranges_lists(slots, role) @@ -54,8 +54,8 @@ def create_from_range(cls, steps: list[tuple[int, int]], role_ranges: list[list[ return cls(cost_ranges) - def intersection_with_step(self, step: tuple[int, int]): - cost_ranges: list[list[tuple[int, int]]] = [] + def intersection_with_step(self, step: Tuple[int, int]): + cost_ranges: List[List[Tuple[int, int]]] = [] step = [step] for range in self.cost_ranges: cost_ranges.append(intersection_ranges_lists(step, range)) @@ -70,11 +70,11 @@ def __init__(self): self.other: int = 0 def __init__(self): - self.steps_costs: list[OverallParser.Costs] = [] + self.steps_costs: List[OverallParser.Costs] = [] self.avg_costs = OverallParser.Costs() - self.communication_overlap: list[OverallParser.StepCommunicationCosts] = [] + self.communication_overlap: List[OverallParser.StepCommunicationCosts] = [] - def aggregate(self, steps: list[tuple[int, int]], role_ranges: list[list[tuple[int, int]]]): + def aggregate(self, steps: List[Tuple[int, int]], role_ranges: List[List[Tuple[int, int]]]): logger.debug('Overall, statistics') global_stats = OverallParser.Statistics.create_from_range(steps, role_ranges) if role_ranges[ProfileRole.Kernel]: diff --git a/tb_plugin/torch_tb_profiler/profiler/range_utils.py b/tb_plugin/torch_tb_profiler/profiler/range_utils.py index 7cc066bd1..c927d5acb 100644 --- a/tb_plugin/torch_tb_profiler/profiler/range_utils.py +++ b/tb_plugin/torch_tb_profiler/profiler/range_utils.py @@ -18,7 +18,7 @@ class EndpointTypes(IntEnum): merged_ranges = [] if len(src_ranges) > 0: # Build tuple of (time, type, value) - endpoints: list[EndPoint] = [] + endpoints: List[EndPoint] = [] for r in src_ranges: endpoints.append(EndPoint(r[0], EndpointTypes.START, r[2])) endpoints.append(EndPoint(r[1], EndpointTypes.END, r[2])) @@ -42,7 +42,7 @@ class EndpointTypes(IntEnum): # range_list1 item is length 3. range_list2 item is length 2. # Reture value's item is length 3. -def intersection_ranges_lists_with_value(range_list1, range_list2) -> list[tuple[int, int, int]]: +def intersection_ranges_lists_with_value(range_list1, range_list2) -> List[Tuple[int, int, int]]: range_list_dst = [] if len(range_list1) == 0 or len(range_list2) == 0: return range_list_dst @@ -80,8 +80,8 @@ def intersection_ranges_lists_with_value(range_list1, range_list2) -> list[tuple return range_list_dst -def subtract_ranges_lists(range_list1: list[tuple[int, int]], - range_list2: list[tuple[int, int]]) -> list[tuple[int, int]]: +def subtract_ranges_lists(range_list1: List[Tuple[int, int]], + range_list2: List[Tuple[int, int]]) -> List[Tuple[int, int]]: range_list_dst = [] if len(range_list1) == 0: return range_list_dst @@ -115,8 +115,8 @@ def subtract_ranges_lists(range_list1: list[tuple[int, int]], return range_list_dst -def intersection_ranges_lists(range_list1: list[tuple[int, int]], - range_list2: list[tuple[int, int]]) -> list[tuple[int, int]]: +def intersection_ranges_lists(range_list1: List[Tuple[int, int]], + range_list2: List[Tuple[int, int]]) -> List[Tuple[int, int]]: range_list_dst = [] if len(range_list1) == 0 or len(range_list2) == 0: return range_list_dst @@ -154,7 +154,7 @@ def intersection_ranges_lists(range_list1: list[tuple[int, int]], return range_list_dst -def get_ranges_sum(ranges: list[tuple[int, int]]) -> int: +def get_ranges_sum(ranges: List[Tuple[int, int]]) -> int: sum: int = 0 for range in ranges: sum += (range[1] - range[0]) @@ -169,7 +169,7 @@ def pop_list(range_list, index): return next_item, next_index -def merge_ranges(src_ranges, is_sorted=False) -> list[tuple[int, int]]: +def merge_ranges(src_ranges, is_sorted=False) -> List[Tuple[int, int]]: if not src_ranges: # return empty list if src_ranges is None or its length is zero. return [] diff --git a/tb_plugin/torch_tb_profiler/profiler/run_generator.py b/tb_plugin/torch_tb_profiler/profiler/run_generator.py index c1e585ce5..4193816ed 100644 --- a/tb_plugin/torch_tb_profiler/profiler/run_generator.py +++ b/tb_plugin/torch_tb_profiler/profiler/run_generator.py @@ -2,8 +2,7 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # -------------------------------------------------------------------------- from collections import OrderedDict -from typing import Dict, List -from collections.abc import Iterable +from typing import Dict, Iterable, List from .. import consts, utils from ..run import DistributedRunProfile, RunProfile @@ -182,8 +181,8 @@ def build_avg_cost_dict(part_name: str, part_cost: float): else: html = '' for recommendation in self.profile_data.recommendations: - html += f'
  • {recommendation}
  • ' - data['recommendations'] = f'' + html += '
  • {}
  • '.format(recommendation) + data['recommendations'] = ''.format(html) return data @@ -342,7 +341,7 @@ def _generate_kernel_op_table(self): table['columns'].extend(gpu_metrics_columns) table['rows'] = [] - kernel_list: list[KernelAggByNameOp] = sorted( + kernel_list: List[KernelAggByNameOp] = sorted( self.profile_data.kernel_list_groupby_name_op, key=lambda x: x.total_duration, reverse=True) for agg_by_name_op in kernel_list: kernel_op_row = [agg_by_name_op.name, agg_by_name_op.op_name, @@ -413,7 +412,7 @@ def _get_gpu_info(device_props, gpu_id): if (device_props is None) or (gpu_id >= len(device_props)) or (gpu_id < 0): return None - device_prop: dict = device_props[gpu_id] + device_prop: Dict = device_props[gpu_id] gpu_info = {} name = device_prop.get('name') if name is not None: @@ -421,13 +420,13 @@ def _get_gpu_info(device_props, gpu_id): mem = device_prop.get('totalGlobalMem') if mem is not None: - gpu_info['Memory'] = f'{round(float(mem) / 1024 / 1024 / 1024, 2)} GB' + gpu_info['Memory'] = '{} GB'.format(round(float(mem) / 1024 / 1024 / 1024, 2)) gpu_info['Memory Raw'] = mem major = device_prop.get('computeMajor') minor = device_prop.get('computeMinor') if major is not None and minor is not None: - gpu_info['Compute Capability'] = f'{major}.{minor}' + gpu_info['Compute Capability'] = '{}.{}'.format(major, minor) return gpu_info @@ -449,7 +448,7 @@ def generate_run_profile(self): def _generate_gpu_info(self): # first key is node name, the second key is process id, the third key is GPU0/, # the value is the gpu info json - result: dict[str, dict[str, dict[str, dict]]] = OrderedDict() + result: Dict[str, Dict[str, Dict[str, Dict]]] = OrderedDict() index = 0 for data in sorted(self.all_profile_data, key=lambda x: x.worker): if not data.device_props: @@ -460,7 +459,7 @@ def _generate_gpu_info(self): node = match.group(1) process_id = match.group(2) else: - logger.warning(f'cannot parse node name from worker name {data.worker}') + logger.warning('cannot parse node name from worker name {}'.format(data.worker)) node = data.worker process_id = index index += 1 @@ -491,7 +490,7 @@ def _generate_overlap_graph(self): 'legends': ['Computation', 'Overlapping', 'Communication', 'Other'], 'units': 'us' } - steps_to_overlap: dict[str, dict[str, list[int]]] = OrderedDict() + steps_to_overlap: Dict[str, Dict[str, List[int]]] = OrderedDict() steps_to_overlap['all'] = OrderedDict() for data in self.all_profile_data: steps_to_overlap['all'][data.worker] = [0, 0, 0, 0] @@ -521,7 +520,7 @@ def _generate_wait_graph(self): 'legends': ['Data Transfer Time', 'Synchronizing Time'], 'units': 'us' } - steps_to_wait: dict[str, dict[str, list[int]]] = OrderedDict() + steps_to_wait: Dict[str, Dict[str, List[int]]] = OrderedDict() steps_to_wait['all'] = OrderedDict() for data in self.all_profile_data: diff --git a/tb_plugin/torch_tb_profiler/profiler/tensor_cores_parser.py b/tb_plugin/torch_tb_profiler/profiler/tensor_cores_parser.py index a08c38fbe..e2372d9ad 100644 --- a/tb_plugin/torch_tb_profiler/profiler/tensor_cores_parser.py +++ b/tb_plugin/torch_tb_profiler/profiler/tensor_cores_parser.py @@ -1,28 +1,27 @@ # ------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # ------------------------------------------------------------------------- -from typing import Dict, List -from collections.abc import Iterable +from typing import Dict, Iterable, List from .. import consts from .node import OperatorNode class TensorCoresParser: - def __init__(self, tc_ratio: list[float], tc_eligible_ops_kernel_ratio: float): + def __init__(self, tc_ratio: List[float], tc_eligible_ops_kernel_ratio: float): # For calculating Tensor Cores time ratio per GPU. self.tc_ratio = tc_ratio self.tc_eligible_ops_kernel_ratio = tc_eligible_ops_kernel_ratio @classmethod - def parse_events(cls, tid2tree: dict[str, OperatorNode], ops: Iterable[OperatorNode], gpu_ids: Iterable[int]): + def parse_events(cls, tid2tree: Dict[str, OperatorNode], ops: Iterable[OperatorNode], gpu_ids: Iterable[int]): tc_ratio = cls._calculate_tc_ratio(ops, gpu_ids) tc_eligible_ops_kernel_ratio = cls._get_tc_eligible_ops_kernel_ratio(tid2tree, ops) return cls(tc_ratio, tc_eligible_ops_kernel_ratio) @staticmethod def _calculate_tc_ratio(ops: Iterable[OperatorNode], gpu_ids: Iterable[int]): - tc_ratio: list[float] = [None] * consts.MAX_GPU_PER_NODE + tc_ratio: List[float] = [None] * consts.MAX_GPU_PER_NODE tc_time = [0] * consts.MAX_GPU_PER_NODE total_time = [0] * consts.MAX_GPU_PER_NODE has_kernel = False @@ -47,7 +46,7 @@ def _calculate_tc_ratio(ops: Iterable[OperatorNode], gpu_ids: Iterable[int]): @staticmethod def _get_bottom_tc_eligible_operators(op_tree_node: OperatorNode): - ops: list[OperatorNode] = [] + ops: List[OperatorNode] = [] for child in op_tree_node.children: child_ops = TensorCoresParser._get_bottom_tc_eligible_operators(child) ops.extend(child_ops) @@ -57,7 +56,7 @@ def _get_bottom_tc_eligible_operators(op_tree_node: OperatorNode): return ops @staticmethod - def _get_tc_eligible_ops_kernel_ratio(tid2tree: dict[int, OperatorNode], ops: Iterable[OperatorNode]): + def _get_tc_eligible_ops_kernel_ratio(tid2tree: Dict[int, OperatorNode], ops: Iterable[OperatorNode]): def sum_self_kernel_time(ops: Iterable[OperatorNode]): sum_time = 0 for op in ops: diff --git a/tb_plugin/torch_tb_profiler/profiler/trace.py b/tb_plugin/torch_tb_profiler/profiler/trace.py index f7642334c..803e989c8 100644 --- a/tb_plugin/torch_tb_profiler/profiler/trace.py +++ b/tb_plugin/torch_tb_profiler/profiler/trace.py @@ -60,7 +60,7 @@ def __init__(self, type, data): self.ts: int = data.get('ts') self.pid: int = data.get('pid') self.tid: int = data.get('tid') - self.args: dict = data.get('args', {}) + self.args: Dict = data.get('args', {}) class DurationEvent(BaseEvent): @@ -69,11 +69,11 @@ def __init__(self, type, data): self.category: str = data.get('cat', '') self.duration: int = data.get('dur') - extern_id: int | None = self.args.get('external id') + extern_id: Optional[int] = self.args.get('external id') if extern_id is None: extern_id = self.args.get('External id') self.external_id = extern_id - self.correlation_id: int | None = self.args.get('correlation') + self.correlation_id: Optional[int] = self.args.get('correlation') class KernelEvent(DurationEvent): @@ -169,7 +169,7 @@ def __init__(self, data): self.name = self.name[self.name.find(': ')+2:] -def create_event(event, is_pytorch_lightning) -> BaseEvent | None: +def create_event(event, is_pytorch_lightning) -> Optional[BaseEvent]: try: type = event.get('ph') if type == 'X': @@ -183,7 +183,7 @@ def create_event(event, is_pytorch_lightning) -> BaseEvent | None: raise -def create_trace_event(event, is_pytorch_lightning) -> BaseEvent | None: +def create_trace_event(event, is_pytorch_lightning) -> Optional[BaseEvent]: category = event.get('cat') event_type = EventTypeMap.get(category.lower()) if event_type == EventTypes.USER_ANNOTATION: @@ -218,7 +218,7 @@ def create_trace_event(event, is_pytorch_lightning) -> BaseEvent | None: return None -def create_association_events(events) -> dict[int, int]: +def create_association_events(events) -> Dict[int, int]: forward_map = {} backward_map = {} diff --git a/tb_plugin/torch_tb_profiler/run.py b/tb_plugin/torch_tb_profiler/run.py index 3a033ee3c..23c69ee16 100644 --- a/tb_plugin/torch_tb_profiler/run.py +++ b/tb_plugin/torch_tb_profiler/run.py @@ -2,8 +2,7 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # -------------------------------------------------------------------------- from collections import defaultdict -from typing import Any, Dict, List, Optional, Tuple, Union -from collections.abc import Iterable +from typing import Any, Dict, Iterable, List, Optional, Tuple, Union from . import consts, utils from .profiler.diffrun import compare_op_tree, diff_summary @@ -23,7 +22,7 @@ class Run: def __init__(self, name, run_dir): self.name = name self.run_dir = run_dir - self.profiles: dict[tuple[str, str], RunProfile] = {} + self.profiles: Dict[Tuple[str, str], RunProfile] = {} @property def workers(self): @@ -33,7 +32,7 @@ def workers(self): return worker_list @property - def views(self) -> list[consts.View]: + def views(self) -> List[consts.View]: view_set = set() for profile in self.profiles.values(): view_set.update(profile.views) @@ -78,7 +77,7 @@ def get_profile(self, worker, span) -> Union['DistributedRunProfile', 'RunProfil return self.profiles.get((worker, span), None) def get_profiles(self, *, worker=None, span=None) \ - -> list['RunProfile'] | list['DistributedRunProfile'] | None: + -> Optional[Union[List['RunProfile'], List['DistributedRunProfile']]]: # Note: we could not use if span to check it is None or not # since the span 0 will be skipped at this case. if worker is not None and span is not None: @@ -98,7 +97,7 @@ class RunProfile: def __init__(self, worker, span): self.worker = worker self.span = span - self.views: list[consts.View] = [] + self.views: List[consts.View] = [] self.is_pytorch_lightning = False self.has_runtime = False self.has_kernel = False @@ -108,10 +107,10 @@ def __init__(self, worker, span): self.overview = None self.operation_pie_by_name = None self.operation_table_by_name = None - self.operation_stack_by_name: dict = None + self.operation_stack_by_name: Dict = None self.operation_pie_by_name_input = None self.operation_table_by_name_input = None - self.operation_stack_by_name_input: dict = None + self.operation_stack_by_name_input: Dict = None self.kernel_op_table = None self.kernel_pie = None self.kernel_table = None @@ -124,12 +123,12 @@ def __init__(self, worker, span): self.gpu_tooltip = None # for memory stats and curve - self.memory_snapshot: MemorySnapshot | None = None - self.tid2tree: dict[int, OperatorNode] = None - self.pl_tid2tree: dict[int, OperatorNode] = None + self.memory_snapshot: Optional[MemorySnapshot] = None + self.tid2tree: Dict[int, OperatorNode] = None + self.pl_tid2tree: Dict[int, OperatorNode] = None - self.module_stats: list(Stats) | None = None - self.pl_module_stats: list(Stats) | None = None + self.module_stats: Optional[List(Stats)] = None + self.pl_module_stats: Optional[List(Stats)] = None def append_gpu_metrics(self, raw_data: bytes): counter_json_str = ', {}'.format(', '.join(self.gpu_metrics)) @@ -216,7 +215,7 @@ def get_memory_curve( time_metric: str = 'ms', memory_metric: str = 'K', patch_for_step_plot=True): - def get_curves_and_peaks(records: list[MemoryRecord], cano: Canonicalizer): + def get_curves_and_peaks(records: List[MemoryRecord], cano: Canonicalizer): """Inputs: records: Sorted list of MemoryRecord @@ -259,7 +258,7 @@ def get_curves_and_peaks(records: list[MemoryRecord], cano: Canonicalizer): return curves, peaks # NOTE: this should have been occured in frontend - def patch_curves_for_step_plot(curves: dict[str, list]): + def patch_curves_for_step_plot(curves: Dict[str, List]): # For example, if a curve is [(0, 0), (1, 1), (2,2)], the line plot # is a stright line. Interpolating it as [(0, 0), (1, 0), (1, 1), # (2,1) (2,2)], then the line plot will work as step plot. @@ -281,14 +280,14 @@ def patch_curves_for_step_plot(curves: dict[str, list]): peaks_formatted = {} totals = {} for dev, value in peaks.items(): - peaks_formatted[dev] = f'Peak Memory Usage: {cano.convert_memory(value):.1f}{cano.memory_metric}' + peaks_formatted[dev] = 'Peak Memory Usage: {:.1f}{}'.format(cano.convert_memory(value), cano.memory_metric) if dev != 'CPU': try: totals[dev] = cano.convert_memory(self.gpu_infos[int(dev[3:])]['Memory Raw']) except BaseException: pass - devices: list[str] = sorted(list(curves.keys())) + devices: List[str] = sorted(list(curves.keys())) default_device = 'CPU' for dev in devices: if dev.startswith('GPU'): @@ -433,7 +432,7 @@ def get_module_view(self): 'data': [] } - def process_modules_stats(parent: list[Any], modules_stats: list[Stats]): + def process_modules_stats(parent: List[Any], modules_stats: List[Stats]): for stats in modules_stats: d = stats._asdict() d['children'] = [] @@ -451,7 +450,7 @@ def get_operator_tree(self): result = [] - def traverse_node(parent: list, node: OperatorNode): + def traverse_node(parent: List, node: OperatorNode): d = { 'name': node.name, 'start_time': node.start_time,