diff --git a/optik/echidna/__main__.py b/optik/echidna/__main__.py index 0e7da2c..e3dfe45 100644 --- a/optik/echidna/__main__.py +++ b/optik/echidna/__main__.py @@ -17,7 +17,13 @@ start_display, stop_display, ) -from .interface import extract_contract_bytecode, extract_cases_from_json_output +from .interface import ( + count_cov_lines, + count_unique_pc, + get_latest_coverage_file, + extract_contract_bytecode, + extract_cases_from_json_output, +) from .runner import replay_inputs, generate_new_inputs, run_echidna_campaign from ..common.exceptions import ArgumentParsingError, InitializationError from ..common.logger import ( @@ -211,6 +217,24 @@ def run_hybrid_echidna(arguments: List[str]) -> None: logger.debug(f"Echidna stdout: \n{p.stdout}") + # Display line coverage info + coverage_file = get_latest_coverage_file(args.corpus_dir) + if coverage_file is None: + raise EchidnaException( + f"Couldn't get latest coverage file in {args.corpus_dir}" + ) + nb_cov_lines = count_cov_lines(coverage_file) + if iter_cnt == 1: + display.lines_cov_echidna = nb_cov_lines + display.lines_cov_last = nb_cov_lines - display.lines_cov_total + display.lines_cov_total = nb_cov_lines + # Display inst. coverage info + nb_cov_insts = count_unique_pc(p.stdout) + if iter_cnt == 1: + display.pc_cov_echidna = nb_cov_insts + display.pc_cov_last = nb_cov_insts - display.pc_cov_total + display.pc_cov_total = nb_cov_insts + # Display cases in terminal display.res_cases = extract_cases_from_json_output(p.stdout) glob_fuzzing_result.cases_found_cnt = len(display.res_cases) diff --git a/optik/echidna/display.py b/optik/echidna/display.py index c6728f9..b9578f2 100644 --- a/optik/echidna/display.py +++ b/optik/echidna/display.py @@ -49,6 +49,13 @@ def __init__(self) -> None: 33, 100, ) # tuple (curr, goal,) + self.cov_win_title = "Coverage" + self.lines_cov_echidna: int = 0 + self.lines_cov_total: int = 0 + self.lines_cov_last: int = 0 + self.pc_cov_echidna: int = 0 + self.pc_cov_total: int = 0 + self.pc_cov_last: int = 0 self.fuzz_win_title = "Fuzzer" self.fuzz_total_cases_cnt = 0 self.fuzz_last_cases_cnt = 0 @@ -69,6 +76,7 @@ def __init__(self) -> None: self.global_win_y_lines = 3 self.fuzz_win_x_ratio = 0.3 self.fuzz_win_y_lines = 4 + self.cov_win_y_lines = 2 # OTHER self._show_echidna_timer = False self._start_time: Optional[datetime] = None @@ -260,14 +268,77 @@ def update(self) -> None: x_pos, self.current_task_line_3, ) + # Coverage info window + cov_lines = self.cov_win_y_lines + 2 + cov_cols = curses.COLS - 2 + if cov_cols > 3 and cov_lines > 0: + cov_win = self.scr.derwin( + cov_lines, + cov_cols, + glob_lines - 1, + 1, + ) + cov_win.border(" ", " ", 0, " ", " ", " ", " ", " ") + x_pos = (cov_cols - len(self.cov_win_title)) // 2 + if x_pos > 0: + cov_win.addstr( + 0, + x_pos, + self.cov_win_title, + curses.A_BOLD | GREEN, + ) + self.add_info( + cov_win, + 1, + 1, + "Lines (echidna)", + self.lines_cov_echidna, + ) + self.add_info( + cov_win, + 1, + cov_cols // 3 - 2, + "Lines (optik)", + self.lines_cov_total, + ) + self.add_info( + cov_win, + 1, + (cov_cols // 3) * 2 - 2, + "Lines (last)", + self.lines_cov_last, + ) + self.add_info( + cov_win, + 2, + 1, + "Insts (echidna)", + self.pc_cov_echidna, + ) + self.add_info( + cov_win, + 2, + cov_cols // 3 - 2, + "Insts (optik)", + self.pc_cov_total, + ) + self.add_info( + cov_win, + 2, + (cov_cols // 3) * 2 - 2, + "Insts (last)", + self.pc_cov_last, + ) + # Fuzzer window fuzz_lines = self.fuzz_win_y_lines * 2 - 1 fuzz_cols = int(curses.COLS * self.fuzz_win_x_ratio) + fuzz_win_y_start = cov_lines + fuzz_lines - 4 if fuzz_cols > 3 and fuzz_lines > 3: fuzz_win = self.scr.derwin( fuzz_lines, fuzz_cols, - glob_lines - 1, + fuzz_win_y_start, 1, ) @@ -306,7 +377,7 @@ def update(self) -> None: sym_lines = fuzz_lines if sym_cols > 4 and sym_lines > 3: sym_win = self.scr.derwin( - fuzz_lines, sym_cols, glob_lines - 1, fuzz_cols + fuzz_lines, sym_cols, fuzz_win_y_start, fuzz_cols ) sym_win.border(" ", " ", 0, " ", " ", " ", " ", " ") x_pos = (sym_cols - len(self.sym_win_title)) // 2 @@ -366,7 +437,7 @@ def update(self) -> None: self.sym_path_constr_average, ) # Results windows - res_win_y_start = glob_lines + fuzz_lines - 3 + res_win_y_start = glob_lines + cov_lines + fuzz_lines - 4 res_win = self.scr.derwin( curses.LINES - 1 - res_win_y_start, curses.COLS - 2, diff --git a/optik/echidna/interface.py b/optik/echidna/interface.py index 59017dd..6f7fdd4 100644 --- a/optik/echidna/interface.py +++ b/optik/echidna/interface.py @@ -482,3 +482,45 @@ def extract_cases_from_json_output(output: str) -> List[List[str]]: ) res.append(case) return res + + +def count_cov_lines(coverage_file: str) -> int: + """Count the number of lines covered from an Echidna coverage file. + + :param coverage_file: an Echidna covered.*.txt type of file, with "*" + characters at the beginning of code lines that was covered during + fuzzing""" + with open(coverage_file, "r") as f: + return len([1 for line in f.readlines() if line[0] in "*e"]) + + +def get_latest_coverage_file( + corpus_dir: str, +) -> Optional[str]: + """Returns the path to covered..txt file generated by echidna + in the 'corpus_dir' directory. Returns None if no such file exists""" + # Get the first file after reverse sorting the filename list, so + # that we get the latest coverage file (name with the bigger timestamp) + try: + for filename in sorted(os.listdir(corpus_dir), reverse=True): + if filename.startswith("covered.") and filename.endswith(".txt"): + return os.path.join(corpus_dir, filename) + except FileNotFoundError: + pass + return None + + +def count_unique_pc(output: str) -> int: + """Count the unique instruction addresses that were executed + by echidna + + :param output: echidna's JSON output as a simple string + """ + if output.startswith("Loaded total of"): + output = output.split("\n", 1)[1] + data = json.loads(output) + res = 0 + for addr, cov in data["coverage"].items(): + unique_pcs = set([x[0] for x in cov]) + res += len(unique_pcs) + return res diff --git a/tests/echidna/test_coverage.py b/tests/echidna/test_coverage.py index 1a2d06c..dbe303d 100644 --- a/tests/echidna/test_coverage.py +++ b/tests/echidna/test_coverage.py @@ -5,6 +5,7 @@ import pytest from optik.echidna import run_hybrid_echidna +from optik.echidna.interface import get_latest_coverage_file from .common import new_test_dir, CONTRACTS_DIR COVERAGE_TARGET_MARKER = "test::coverage" @@ -64,13 +65,15 @@ def test_coverage(contract: str, cov_mode: str, seq_len: int): test_proc.start() # Detect early success in coverage test while test_proc.is_alive(): - covered_file = get_coverage_file(test_dir) + covered_file = get_latest_coverage_file(test_dir) if check_coverage_success(covered_file)[0]: test_proc.terminate() break sleep(5) # Final coverage check - success, err_msg = check_coverage_success(get_coverage_file(test_dir)) + success, err_msg = check_coverage_success( + get_latest_coverage_file(test_dir) + ) assert success, err_msg @@ -96,19 +99,3 @@ def check_coverage_success(covered_file: Optional[str]) -> Tuple[bool, str]: True, "", ) - - -def get_coverage_file( - test_dir: str, -) -> Optional[str]: - """Returns the path to covered..txt file generated by echidna - in the 'test_dir' directory. Returns None if no such file exists""" - # Get the first file after reverse sorting the filename list, so - # that we get the latest coverage file (name with the bigger timestamp) - try: - for filename in sorted(os.listdir(test_dir), reverse=True): - if filename.startswith("covered.") and filename.endswith(".txt"): - return os.path.join(test_dir, filename) - except FileNotFoundError: - pass - return None