diff --git a/README.md b/README.md index 25507de..675ee40 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@
pip3 install frelatage
- Current release : 0.0.8

+ Current release : 0.1.0

@@ -263,7 +263,9 @@ f = frelatage.Fuzzer( # Directory where the error reports will be stored output_directory="./out", # Enable or disable silent mode - silent=False + silent=False, + # Enable or disable infinite fuzzing + infinite_fuzz=False ) f.fuzz() ``` diff --git a/examples/yaml_fuzzer/yaml_fuzzer_dump.py b/examples/yaml_fuzzer/yaml_fuzzer_dump.py index b5b4878..1e778a9 100644 --- a/examples/yaml_fuzzer/yaml_fuzzer_dump.py +++ b/examples/yaml_fuzzer/yaml_fuzzer_dump.py @@ -16,6 +16,6 @@ def fuzz_yaml_dump(data): }) # Initialize the fuzzer -f = frelatage.Fuzzer(fuzz_yaml_dump, [[dictionary]]) +f = frelatage.Fuzzer(fuzz_yaml_dump, [[dictionary]], infinite_fuzz=True) # Fuzz f.fuzz() \ No newline at end of file diff --git a/frelatage/__init__.py b/frelatage/__init__.py index ebd0bed..1ed9351 100644 --- a/frelatage/__init__.py +++ b/frelatage/__init__.py @@ -5,9 +5,10 @@ from datetime import datetime from importlib.metadata import version from pathlib import Path -from typing import Callable, List +from typing import Callable from frelatage.corpus.corpus import load_corpus from frelatage.config.config import Config +from frelatage.input.input import Input from frelatage.mutator.mutator import * from frelatage.queue.queue import Queue from frelatage.tracer.tracer import Tracer @@ -30,28 +31,29 @@ class Fuzzer(object): of these fuzzers and gather them together into a new tool in order to efficiently fuzz python applications. """ - from ._mutation import valid_mutators, get_mutation, generate_cycle_mutations - from ._interface import ( + from ._mutation import valid_mutators, get_mutation, generate_cycle_mutations # type: ignore + from ._interface import ( # type: ignore init_interface, refresh_interface, start_interface, exit_message, ) - from ._evaluate import evaluate_mutations - from ._cycle import run_function, run_cycle - from ._fuzz import fuzz - from ._report import get_report_name, save_report - from ._input import init_input_folder, init_file_input_arguments, init_file_inputs + from ._evaluate import evaluate_mutations # type: ignore + from ._cycle import run_function, run_cycle # type: ignore + from ._fuzz import fuzz # type: ignore + from ._report import get_report_name, save_report # type: ignore + from ._input import init_input_folder, init_file_input_arguments, init_file_inputs # type: ignore def __init__( self, method: Callable, - corpus: list[object], + corpus: list, threads_count: int = 8, - exceptions_whitelist: list = (), - exceptions_blacklist: list = (), + exceptions_whitelist: tuple = (), + exceptions_blacklist: tuple = (), output_directory: str = "./out", silent: bool = False, + infinite_fuzz: bool = False, ) -> None: """ Initialize the fuzzer @@ -62,13 +64,13 @@ def __init__( self.config = Config # Global set of reached instructions - self.reached_instructions = set([]) + self.reached_instructions: set = set([]) # Global set of instructions pairs - self.instructions_pairs = set([]) + self.instructions_pairs: set = set([]) # Global set of instruction pairs executed during a crash - self.favored_pairs = set([]) + self.favored_pairs: set = set([]) # Global Set of positions where a crash occurred - self.error_positions = set([]) + self.error_positions: set = set([]) # Fuzzed method self.method = method @@ -79,9 +81,11 @@ def __init__( # List of all avalaibles mutators self.mutators = mutators # Number of concurrently launched threads - self.threads_count = max(min(threads_count, Config.FRELATAGE_MAX_THREADS), 8) + self.threads_count = max( + min(threads_count, Config.FRELATAGE_MAX_THREADS), 8 + ) # List of cycle mutations - self.cycle = [] + self.cycle: list = [] # Exceptions that will be taken into account or not when fuzzing self.exceptions_whitelist = exceptions_whitelist @@ -96,17 +100,22 @@ def __init__( # Input and output directories # The working directory is the same as the fuzz file self.input_directory = os.path.join( - os.path.dirname(os.path.realpath(sys.argv[0])), Config.FRELATAGE_INPUT_DIR + os.path.dirname(os.path.realpath(sys.argv[0])), + Config.FRELATAGE_INPUT_DIR, ) self.output_directory = Path( os.path.join( - os.path.dirname(os.path.realpath(sys.argv[0])), output_directory + os.path.dirname(os.path.realpath(sys.argv[0])), + output_directory, ) ).as_posix() # Silent output self.silent = silent + # Infinite fuzzing + self.infinite_fuzz = infinite_fuzz + # Fuzzer statistics self.cycles_count = 0 self.inputs_count = 0 diff --git a/frelatage/_cycle.py b/frelatage/_cycle.py index 3d6d4ad..5e4f893 100644 --- a/frelatage/_cycle.py +++ b/frelatage/_cycle.py @@ -15,8 +15,12 @@ def run_function(self, arguments: list, result: list) -> bool: trace_instructions_count = len(trace_instructions) trace_instruction_pairs_count = len(trace_instruction_pairs) - new_instructions_count = len(trace_instructions - self.reached_instructions) - new_sequences_count = len(trace_instruction_pairs - self.instructions_pairs) + new_instructions_count = len( + trace_instructions - self.reached_instructions + ) + new_sequences_count = len( + trace_instruction_pairs - self.instructions_pairs + ) new_instruction_error = trace.error_position not in self.error_positions @@ -40,7 +44,7 @@ def run_cycle(self) -> list[Report]: run the function with each mutation of the cycle as argument. Return the execution reports. """ - cycle_reports = [] + cycle_reports: list = [] # TODO: Implement multithreading for mutation in self.cycle: diff --git a/frelatage/_fuzz.py b/frelatage/_fuzz.py index 9529f7c..96d9870 100644 --- a/frelatage/_fuzz.py +++ b/frelatage/_fuzz.py @@ -6,6 +6,13 @@ def fuzz(self) -> None: """ Run the fuzzer """ + # Infinite fuzzing is allowed if we have one input combination + if self.infinite_fuzz and len(self.queue.arguments) > 1: + print( + "Error: infinite fuzzing is only possible with a corpus of size 1" + ) + exit(1) + try: # Interface if not self.silent: diff --git a/frelatage/_input.py b/frelatage/_input.py index 8e59842..b55cc33 100644 --- a/frelatage/_input.py +++ b/frelatage/_input.py @@ -77,7 +77,7 @@ def init_file_input_arguments(self) -> bool: return True -def init_file_inputs(self) -> bool: +def init_file_inputs(self) -> None: """ Set up the tree structure to fuzz a function with "file" type arguments """ diff --git a/frelatage/_interface.py b/frelatage/_interface.py index bd86314..86b2778 100644 --- a/frelatage/_interface.py +++ b/frelatage/_interface.py @@ -1,7 +1,7 @@ import curses import time from curses import wrapper -from datetime import datetime +from datetime import datetime, timedelta from string import Formatter from frelatage import __version__, Config from frelatage.colors import Colors @@ -10,7 +10,7 @@ REFRESH_INTERVAL = 0.1 -def format_delta(time_delta: datetime, format: str) -> str: +def format_delta(time_delta: timedelta, format: str) -> str: """ Format a time delta. """ @@ -79,10 +79,12 @@ def refresh_interface(self): # Process timing run_time = format_time_elapsed(self.fuzz_start_time).ljust(32) last_new_path_time = format_time_elapsed(self.last_new_path_time).ljust(32) - last_unique_crash_time = format_time_elapsed(self.last_unique_crash_time).ljust(32) - last_unique_timeout_time = format_time_elapsed(self.last_unique_timeout_time).ljust( - 32 - ) + last_unique_crash_time = format_time_elapsed( + self.last_unique_crash_time + ).ljust(32) + last_unique_timeout_time = format_time_elapsed( + self.last_unique_timeout_time + ).ljust(32) # Overall results uniques_crashes_count = str(self.unique_crashes).ljust(9) @@ -105,7 +107,8 @@ def refresh_interface(self): crashes=str(self.total_crashes), uniques=str(self.unique_crashes) ).ljust(22) total_timeouts = "{total_timeouts} [{timeout_delay} sec]".format( - total_timeouts=self.total_timeouts, timeout_delay=Config.FRELATAGE_TIMEOUT_DELAY + total_timeouts=self.total_timeouts, + timeout_delay=Config.FRELATAGE_TIMEOUT_DELAY, ).ljust(22) # Progress @@ -116,7 +119,8 @@ def refresh_interface(self): current_argument = self.queue.position + 1 total_arguments_count = len(self.queue.arguments) current_stage = "{current_argument}/{total_arguments_count}".format( - current_argument=current_argument, total_arguments_count=total_arguments_count + current_argument=current_argument, + total_arguments_count=total_arguments_count, ).ljust(16) stage_executions = str(self.stage_inputs_count).ljust(16) @@ -136,7 +140,7 @@ def refresh_interface(self): │ Uniques crashes :: {uniques_crashes_count}│ Cycles done :: {cycles_count}│ Stage :: {current_stage}│ │ Unique timeouts :: {uniques_timeouts_count}│ Total executions :: {total_executions}│ Stage execs :: {stage_executions}│ └─────────────────────────────────┴────────────────────────────────────┴────────────────────────────────┘ - [ {execs_per_second} exec/s ] + [ {execs_per_second} exec/s ] """.format( title=title, execs_per_second=execs_per_second, @@ -168,10 +172,9 @@ def exit_message( run_time = format_time_elapsed(self.fuzz_start_time) uniques_crashes_count = str(self.unique_crashes) uniques_timeouts_count = str(self.unique_timeout) - total_crashes = "{crashes} ({uniques} uniques)".format( - crashes=str(self.total_crashes), uniques=str(self.unique_crashes) + total_timeouts = "{total_timeouts}".format( + total_timeouts=self.total_timeouts ) - total_timeouts = "{total_timeouts}".format(total_timeouts=self.total_timeouts) total_paths_count = str(len(self.reached_instructions)) cycles_count = str(self.cycles_count) total_executions = str(self.inputs_count) @@ -240,7 +243,7 @@ def exit_message( return True -def start_interface(self): +def start_interface(self) -> None: """ Display the curse interface """ diff --git a/frelatage/_mutation.py b/frelatage/_mutation.py index 12b2fee..4b47403 100644 --- a/frelatage/_mutation.py +++ b/frelatage/_mutation.py @@ -1,7 +1,7 @@ import copy import os -import sys import random +import sys from pathlib import Path from typing import Type, Any from frelatage.mutator.mutator import Mutator @@ -30,7 +30,8 @@ def valid_mutators(self, input_type: Type, input_size: int) -> list[Mutator]: # Filtering mutators using the "allowed types" property of each mutator valid_mutators_type = list( filter( - lambda mutator: type_str in mutator.allowed_types and mutator.enabled, + lambda mutator: type_str in mutator.allowed_types + and mutator.enabled, self.mutators, ) ) @@ -40,7 +41,10 @@ def valid_mutators(self, input_type: Type, input_size: int) -> list[Mutator]: if max_input_size_reached: # Filtering mutators using the "size_effect" property of each mutator valid_mutators_size = list( - filter(lambda mutator: "decrease" in mutator.size_effect, self.mutators) + filter( + lambda mutator: "decrease" in mutator.size_effect, + self.mutators, + ) ) else: valid_mutators_size = self.mutators @@ -105,7 +109,7 @@ def get_mutation(self, input: Any, file: bool) -> Any: return mutation -def generate_cycle_mutations(self, parents: list) -> list: +def generate_cycle_mutations(self, parents: list) -> None: """ Generate a list of mutations with a list of parent mutations as an input """ @@ -127,7 +131,9 @@ def generate_cycle_mutations(self, parents: list) -> list: # Mutation of "file" type inputs if mutation.file: filename = os.path.split(mutation.value)[1] - file_argument_id = os.path.split(os.path.split(mutation.value)[0])[1] + file_argument_id = os.path.split( + os.path.split(mutation.value)[0] + )[1] base = Path(mutation.value).parents[2] new_argument = os.path.join( base, str(thread), file_argument_id, filename diff --git a/frelatage/_report.py b/frelatage/_report.py index d3f9f9a..4b780ec 100644 --- a/frelatage/_report.py +++ b/frelatage/_report.py @@ -20,9 +20,9 @@ def get_report_name(self, report: Report) -> str: else str(None) ) # File where the error occured - error_file = os.path.splitext(os.path.basename(report.trace.error_position[0][0]))[ - 0 - ].lower() + error_file = os.path.splitext( + os.path.basename(report.trace.error_position[0][0]) + )[0].lower() report_name = "id:{error_id},err:{error_type},err_file:{error_file},err_pos:{error_position}".format( error_id=error_id, @@ -65,7 +65,8 @@ def save_report(self, report) -> bool: # Save report file with open( - "{report_directory}/input".format(report_directory=report_directory), "wb+" + "{report_directory}/input".format(report_directory=report_directory), + "wb+", ) as f: # We use pickle to store the report object pickle.dump(custom_report, f) @@ -79,7 +80,8 @@ def save_report(self, report) -> bool: file_argument_content = open(argument.value, "rb").read() # Save file in /out/// argument_directory = "{report_directory}/{argument_number}".format( - report_directory=report_directory, argument_number=argument_number + report_directory=report_directory, + argument_number=argument_number, ) # create /out// folder if not exists if not os.path.exists(argument_directory): diff --git a/frelatage/config/config.py b/frelatage/config/config.py index 1e899db..99dab8c 100644 --- a/frelatage/config/config.py +++ b/frelatage/config/config.py @@ -14,7 +14,9 @@ class Config: # Enable the use of mutations based on dictionary elements if os.getenv("FRELATAGE_DICTIONARY_ENABLE", "1") not in ("1", "0"): - raise FrelatageConfigError("FRELATAGE_DICTIONARY_ENABLE must '1' or '0'") + raise FrelatageConfigError( + "FRELATAGE_DICTIONARY_ENABLE must '1' or '0'" + ) FRELATAGE_DICTIONARY_ENABLE = ( True if os.getenv("FRELATAGE_DICTIONARY_ENABLE", "1") == "1" else False ) @@ -29,7 +31,9 @@ class Config: # Delay in seconds after which a function will return a timeoutError # Must be in range 1~20 if not 1 <= int(os.getenv("FRELATAGE_TIMEOUT_DELAY", 2)) <= math.inf: - raise FrelatageConfigError("FRELATAGE_TIMEOUT_DELAY must be in range 1~20") + raise FrelatageConfigError( + "FRELATAGE_TIMEOUT_DELAY must be in range 1~20" + ) FRELATAGE_TIMEOUT_DELAY = int(os.getenv("FRELATAGE_TIMEOUT_DELAY", 2)) # Temporary folder where input files are stored. @@ -41,13 +45,17 @@ class Config: # Maximum size of an input variable in bytes # Must be in range 4~1000000 if not 4 <= int(os.getenv("FRELATAGE_INPUT_MAX_LEN", 4096)) <= math.inf: - raise FrelatageConfigError("FRELATAGE_INPUT_MAX_LEN must be in range 4~1000000") + raise FrelatageConfigError( + "FRELATAGE_INPUT_MAX_LEN must be in range 4~1000000" + ) FRELATAGE_INPUT_MAX_LEN = int(os.getenv("FRELATAGE_INPUT_MAX_LEN", 4096)) # Maximum number of simultaneous threads # Must be in range 8~50 if not 8 <= int(os.getenv("FRELATAGE_MAX_THREADS", 20)) <= math.inf: - raise FrelatageConfigError("FRELATAGE_MAX_THREADS must be in range 8~50") + raise FrelatageConfigError( + "FRELATAGE_MAX_THREADS must be in range 8~50" + ) FRELATAGE_MAX_THREADS = int(os.getenv("FRELATAGE_MAX_THREADS", 20)) # Maximum number of successives cycle without a new path diff --git a/frelatage/corpus/corpus.py b/frelatage/corpus/corpus.py index 5cda826..ebee0b4 100644 --- a/frelatage/corpus/corpus.py +++ b/frelatage/corpus/corpus.py @@ -15,7 +15,8 @@ def load_corpus(directory: str, file_extensions: list = []) -> list[Input]: # ./ # ./in by default input_root_directory = os.path.join( - os.path.dirname(os.path.realpath(sys.argv[0])), Config.FRELATAGE_INPUT_DIR + os.path.dirname(os.path.realpath(sys.argv[0])), + Config.FRELATAGE_INPUT_DIR, ) # .// @@ -30,13 +31,15 @@ def load_corpus(directory: str, file_extensions: list = []) -> list[Input]: for file_extension in file_extensions: file_inputs += glob.glob( os.path.join( - input_directory, "*.{extension}".format(extension=file_extension) + input_directory, + "*.{extension}".format(extension=file_extension), ) ) # Relative path file_inputs = [ - os.path.relpath(file_input, input_root_directory) for file_input in file_inputs + os.path.relpath(file_input, input_root_directory) + for file_input in file_inputs ] # Create an Input object for every file in the subdirectory diff --git a/frelatage/dictionary/dictionary.py b/frelatage/dictionary/dictionary.py index dd5ec8f..ef4903b 100644 --- a/frelatage/dictionary/dictionary.py +++ b/frelatage/dictionary/dictionary.py @@ -1,4 +1,5 @@ import re +from typing import List class Dictionary: @@ -7,7 +8,7 @@ class Dictionary: """ def __init__(self) -> None: - self.dictionary = [] + self.dictionary: List[str] = [] # dict_element="myelement" -> "myelement" self.DICTIONARY_ELEMENT_REGEXP = r'".+"' @@ -25,7 +26,9 @@ def load_dictionary_from_file(self, filename) -> list: if not line or line[0] == "#": continue # Parse line - dictionary_element = re.findall(self.DICTIONARY_ELEMENT_REGEXP, line) + dictionary_element = re.findall( + self.DICTIONARY_ELEMENT_REGEXP, line + ) if dictionary_element: # Remove the quote from both the beginning and the end dictionary_element = dictionary_element[0][1:-1] diff --git a/frelatage/input/input.py b/frelatage/input/input.py index 4615a24..bfc1f99 100644 --- a/frelatage/input/input.py +++ b/frelatage/input/input.py @@ -1,3 +1,6 @@ +from collections.abc import Generator + + class Input: """ Input can be either a file or any other type @@ -7,6 +10,6 @@ def __init__(self, value: str = "", file: bool = False) -> None: self.value = value self.file = file - def __iter__(self) -> None: + def __iter__(self) -> Generator[tuple, tuple, None]: yield "value", self.value yield "file", self.file diff --git a/frelatage/mutator/dictionary.py b/frelatage/mutator/dictionary.py index 792ce21..1960387 100644 --- a/frelatage/mutator/dictionary.py +++ b/frelatage/mutator/dictionary.py @@ -1,9 +1,10 @@ import glob import os +from typing import List, Any from frelatage.dictionary.dictionary import Dictionary -def load_dictionary(dictionary_folder: str) -> bool: +def load_dictionary(dictionary_folder: str) -> List[Any]: """ Load all the dictionaries from the dictionary folder """ diff --git a/frelatage/mutator/magicValues.py b/frelatage/mutator/magicValues.py index 593e654..824ce7b 100644 --- a/frelatage/mutator/magicValues.py +++ b/frelatage/mutator/magicValues.py @@ -6,6 +6,7 @@ class MagicValues: # Integers UINT = [ -9223372036854775809, + -9223372036854775808, -4294967296, -4294967295, -2147483648, @@ -22,6 +23,11 @@ class MagicValues: -256, -255, -128, + -127, + -100, + -64, + -32, + -16, -1, 0, 1, @@ -47,6 +53,5 @@ class MagicValues: 4294967295, 4294967296, 9223372036854775808, - 9223372036854775808, 9223372036854775809, ] diff --git a/frelatage/mutator/mutator.py b/frelatage/mutator/mutator.py index 5f1d7ed..8752e2e 100644 --- a/frelatage/mutator/mutator.py +++ b/frelatage/mutator/mutator.py @@ -1,12 +1,12 @@ import copy import random -from typing import Any, Type +from typing import List, Any, Type from frelatage.config.config import Config from frelatage.mutator.dictionary import load_dictionary from frelatage.mutator.magicValues import MagicValues # Array containing all the mutators -mutators = [] +mutators: List[Any] = [] # Load dictionary from the dictionary files dictionary_folder = Config().FRELATAGE_DICTIONARY_DIR @@ -15,9 +15,9 @@ class Mutator(object): # "str", "int", "list", "dict", "NoneType", "float" - allowed_types = set([]) + allowed_types: set = set([]) # "none", "increase", "decrease" - size_effect = [] + size_effect: list = [] # Is the mutator used by the fuzzer # True by default enabled = True @@ -67,8 +67,12 @@ def mutate(input: str) -> str: if len(input) == 0: return input position = Mutator.random_int(len(input)) - replacement_character = chr(ord(input[position]) ^ (1 << Mutator.random_int(8))) - mutation = input[0:position] + replacement_character + input[position + 1 :] + replacement_character = chr( + ord(input[position]) ^ (1 << Mutator.random_int(8)) + ) + mutation = ( + input[0:position] + replacement_character + input[position + 1 :] + ) return mutation @@ -84,7 +88,9 @@ def mutate(input: str) -> str: position = Mutator.random_int(len(input)) delta = Mutator.random_int(256) replacement_character = chr((ord(input[position]) + delta) % 256) - mutation = input[0:position] + replacement_character + input[position + 1 :] + mutation = ( + input[0:position] + replacement_character + input[position + 1 :] + ) return mutation @@ -108,7 +114,9 @@ class MutatorStringInsertDict(Mutator): allowed_types = set(["str"]) size_effect = ["increase"] # Disable if the dictionary is empty or if the dictionary fuzzing is disabled - enabled = True if (dictionary and Config.FRELATAGE_DICTIONARY_ENABLE) else False + enabled = ( + True if (dictionary and Config.FRELATAGE_DICTIONARY_ENABLE) else False + ) @staticmethod def mutate(input: str) -> str: @@ -150,12 +158,15 @@ def mutate(input: str) -> str: second_character_position = Mutator.random_int(len(input)) mutation = list(input) - mutation[first_character_position], mutation[second_character_position] = ( + ( + mutation[first_character_position], + mutation[second_character_position], + ) = ( mutation[second_character_position], mutation[first_character_position], ) - mutation = "".join(mutation) - return mutation + mutation_str = "".join(mutation) + return mutation_str @register_mutator @@ -168,11 +179,15 @@ def mutate(input: str) -> str: if len(input) < 2: return input first_character_position = Mutator.random_int(len(input)) - second_character_position = random.randint(first_character_position, len(input)) + second_character_position = random.randint( + first_character_position, len(input) + ) substring = input[first_character_position:second_character_position] substring_position = Mutator.random_int(len(input)) - mutation = input[:substring_position] + substring + input[substring_position:] + mutation = ( + input[:substring_position] + substring + input[substring_position:] + ) return mutation @@ -186,7 +201,9 @@ def mutate(input: str) -> str: if len(input) < 2: return input first_character_position = Mutator.random_int(len(input)) - second_character_position = random.randint(first_character_position, len(input)) + second_character_position = random.randint( + first_character_position, len(input) + ) substring = input[first_character_position:second_character_position] mutation = ( input[:second_character_position] @@ -206,9 +223,14 @@ def mutate(input: str) -> str: if len(input) < 2: return input first_character_position = Mutator.random_int(len(input)) - second_character_position = random.randint(first_character_position, len(input)) + second_character_position = random.randint( + first_character_position, len(input) + ) - mutation = input[:first_character_position] + input[second_character_position:] + mutation = ( + input[:first_character_position] + + input[second_character_position:] + ) return mutation @@ -311,8 +333,8 @@ def mutate(input: tuple) -> tuple: position = Mutator.random_int(len(mutation)) element = random.choice(mutation) mutation.insert(position, element) - mutation = tuple(mutation) - return mutation + mutation_tuple = tuple(mutation) + return mutation_tuple @register_mutator @@ -324,8 +346,8 @@ class MutatorTupleInsertNone(Mutator): def mutate(input: tuple) -> tuple: mutation = list(input) mutation.append(None) - mutation = tuple(mutation) - return mutation + mutation_tuple = tuple(mutation) + return mutation_tuple @register_mutator @@ -341,8 +363,8 @@ def mutate(input: tuple) -> tuple: mutation = list(input) element_position = Mutator.random_int(len(mutation)) mutation.pop(element_position) - mutation = tuple(mutation) - return mutation + mutation_tuple = tuple(mutation) + return mutation_tuple @register_mutator @@ -357,8 +379,8 @@ def mutate(input: tuple) -> tuple: mutation = list(input) random.shuffle(mutation) - mutation = tuple(mutation) - return mutation + mutation_tuple = tuple(mutation) + return mutation_tuple # None mutators @@ -390,10 +412,12 @@ def mutate(input: str) -> str: position = Mutator.random_int(len(file_content)) mutation = bytearray(file_content) - mutation[position] = mutation[position] ^ (1 << Mutator.random_int(8)) - mutation = bytes(mutation) + mutation[position] = mutation[position] ^ ( + 1 << Mutator.random_int(8) + ) + mutation_bytes = bytes(mutation) with open(input, "wb") as f: - f.write(mutation) + f.write(mutation_bytes) return input @@ -407,7 +431,7 @@ def mutate(input: str) -> str: with open(input, "rb") as f: file_content = f.read() - mutation = bytearray(file_content) + mutation: bytes = bytearray(file_content) if len(file_content) == 0: mutation = chr(Mutator.random_int(256)).encode() else: @@ -437,7 +461,7 @@ def mutate(input: str) -> str: else: position = Mutator.random_int(len(file_content)) - mutation = bytearray(file_content) + mutation: bytes = bytearray(file_content) mutation = mutation[0:position] + mutation[position + 1 :] mutation = bytes(mutation) with open(input, "wb") as f: @@ -461,16 +485,21 @@ def mutate(input: str) -> str: second_byte_position = Mutator.random_int(len(file_content)) # The position of the second character must be different from the first while first_byte_position == second_byte_position: - second_byte_position = Mutator.random_int(len(file_content)) + second_byte_position = Mutator.random_int( + len(file_content) + ) mutation = bytearray(file_content) - mutation[first_byte_position], mutation[second_byte_position] = ( + ( + mutation[first_byte_position], + mutation[second_byte_position], + ) = ( mutation[second_byte_position], mutation[first_byte_position], ) - mutation = bytes(mutation) + mutation_bytes = bytes(mutation) with open(input, "wb") as f: - f.write(mutation) + f.write(mutation_bytes) return input @@ -499,9 +528,9 @@ def mutate(input: str) -> str: + subbytes + mutation[subbytes_position:] ) - mutation = bytes(mutation) + mutation_bytes = bytes(mutation) with open(input, "wb") as f: - f.write(mutation) + f.write(mutation_bytes) return input @@ -529,9 +558,9 @@ def mutate(input: str) -> str: + subbytes + mutation[second_byte_position:] ) - mutation = bytes(mutation) + mutation_bytes = bytes(mutation) with open(input, "wb") as f: - f.write(mutation) + f.write(mutation_bytes) return input @@ -553,11 +582,12 @@ def mutate(input: str) -> str: ) mutation = bytearray(file_content) mutation = ( - mutation[:first_byte_position] + mutation[second_byte_position:] + mutation[:first_byte_position] + + mutation[second_byte_position:] ) - mutation = bytes(mutation) + mutation_bytes = bytes(mutation) with open(input, "wb") as f: - f.write(mutation) + f.write(mutation_bytes) return input @@ -566,7 +596,9 @@ class MutatorFileInsertDict(Mutator): allowed_types = set(["file"]) size_effect = ["increase"] # Disable if the dictionary is empty or if the dictionary fuzzing is disabled - enabled = True if (dictionary and Config.FRELATAGE_DICTIONARY_ENABLE) else False + enabled = ( + True if (dictionary and Config.FRELATAGE_DICTIONARY_ENABLE) else False + ) @staticmethod def mutate(input: str) -> str: @@ -580,10 +612,14 @@ def mutate(input: str) -> str: mutation = element.encode() else: position = Mutator.random_int(len(file_content)) - mutation = mutation[0:position] + element.encode() + mutation[position:] - mutation = bytes(mutation) + mutation = ( + mutation[0:position] + + element.encode() + + mutation[position:] + ) + mutation_bytes = bytes(mutation) with open(input, "wb") as f: - f.write(mutation) + f.write(mutation_bytes) return input diff --git a/frelatage/queue/queue.py b/frelatage/queue/queue.py index e992185..2afc56d 100644 --- a/frelatage/queue/queue.py +++ b/frelatage/queue/queue.py @@ -8,7 +8,7 @@ class Queue: The queue contains the list of possible combinations of the corpus entries. """ - def __init__(self, corpus: list[list[Input]]): + def __init__(self, corpus: list[object]): self.position = 0 self.corpus = corpus self.arguments = self.generate_arguments() diff --git a/frelatage/tracer/trace_result.py b/frelatage/tracer/trace_result.py index f2afe5e..084aba1 100644 --- a/frelatage/tracer/trace_result.py +++ b/frelatage/tracer/trace_result.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +from typing import Optional @dataclass @@ -10,14 +11,14 @@ class Result: # True if an error occurred during the execution of a function, false otherwise error: bool # Error type, e.g TypeError, ZeroDivisionError... - error_type: str + error_type: Optional[str] # True if an TimeOutError occurred during the execution of a function, false otherwise timeout: bool # Position of the instruction where the error occured - error_position: tuple + error_position: Optional[tuple] # All groups of two successive instructions reached during the execution of a function # e.g : [(('library.py', 3), ('library.py', 3)), (('library.py', 3), ('library.py', 2)), ...] instructions_pairs: list # List of the reached instructions during the execution of a function # e.g : [('library.py', 1), ('library.py', 3), ('library.py', 2), ('library.py', 4)] - reached_instructions: list[tuple] + reached_instructions: list diff --git a/frelatage/tracer/tracer.py b/frelatage/tracer/tracer.py index e81da7e..0e53196 100644 --- a/frelatage/tracer/tracer.py +++ b/frelatage/tracer/tracer.py @@ -2,12 +2,12 @@ import os import timeout_decorator import trace -from typing import Callable, Iterable +from typing import Callable from frelatage.config.config import Config from frelatage.tracer.trace_result import Result -def supress_stdout(func: Callable): +def supress_stdout(func: Callable) -> Callable: """ redirect the output of a function to /dev/null """ @@ -28,8 +28,8 @@ class Tracer: def __init__( self, - exceptions_whitelist: Iterable[Exception] = [], - exceptions_blacklist: Iterable[Exception] = [], + exceptions_whitelist: tuple = (), + exceptions_blacklist: tuple = (), ) -> None: """ Initialize the tracer with an exception whitelist and an exception blacklist. @@ -38,7 +38,9 @@ def __init__( self.exceptions_blacklist = exceptions_blacklist @timeout_decorator.timeout( - Config.FRELATAGE_TIMEOUT_DELAY, use_signals=True, timeout_exception=TimeoutError + Config.FRELATAGE_TIMEOUT_DELAY, + use_signals=True, + timeout_exception=TimeoutError, ) def runfunc(self, function: Callable, arguments): """ @@ -73,7 +75,7 @@ def trace(self, function: Callable, arguments: list) -> Result: timeout = True error_type = str(e.__class__.__name__) # Blacklisted exceptions - except self.exceptions_blacklist as e: + except self.exceptions_blacklist: pass # Default exceptions except Exception as e: @@ -84,9 +86,10 @@ def trace(self, function: Callable, arguments: list) -> Result: pass # List of the executed instructions - instructions = list(self.tracer.results().counter.items()) + instructions = list(self.tracer.results().counter.items()) # type: ignore reached_instructions = [ - (instruction[0][0], instruction[0][1]) for instruction in instructions + (instruction[0][0], instruction[0][1]) + for instruction in instructions ] # List of the instructions pairs : # e.g : reached instructions : [('library.py', 1), ('library.py', 3), ('library.py', 2), ('library.py', 4)] diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..1215375 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,2 @@ +[mypy] +ignore_missing_imports = True \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 4ca176c..b993382 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "frelatage" -version = "0.0.8" +version = "0.1.0" description = "Frelatage is a coverage-based Python fuzzing library which can be used to fuzz python code." authors = ["Rog3rSm1th "] license = "MIT" @@ -26,3 +26,20 @@ pytest = "^5.2" [build-system] requires = ["poetry-core>=1.0.0"] build-backend = "poetry.core.masonry.api" + +[tool.black] +line-length = 90 +include = '\.pyi?$' +exclude = ''' +/( + \.git + | \.hg + | \.mypy_cache + | \.tox + | \.venv + | _build + | buck-out + | build + | dist +)/ +''' \ No newline at end of file diff --git a/scripts/checks.sh b/scripts/checks.sh index ecca30b..9e92c4d 100755 --- a/scripts/checks.sh +++ b/scripts/checks.sh @@ -3,4 +3,6 @@ # Reformating the code using black black --check $(git ls-files './frelatage/*.py') # Check for unused imports -pylint --disable=all --enable=unused-import $(git ls-files './frelatage/*.py') \ No newline at end of file +pylint --disable=all --enable=unused-import $(git ls-files './frelatage/*.py') +# Validate types using mymy +mypy ./frelatage \ No newline at end of file