From 08d6154dab216616501ae0ec46bc4a7ff9be3af5 Mon Sep 17 00:00:00 2001 From: Ekas Chawla Date: Tue, 11 Jul 2023 15:19:39 -0700 Subject: [PATCH 1/9] cr test cr: https://code.amazon.com/reviews/CR-96300558 --- deltacat/utils/ray_utils/RetryHandler/NonRetryableExc.py | 0 deltacat/utils/ray_utils/RetryHandler/RetryableExc.py | 0 deltacat/utils/ray_utils/RetryHandler/TaskInfoObject.py | 0 deltacat/utils/ray_utils/RetryHandler/retry.py | 0 4 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 deltacat/utils/ray_utils/RetryHandler/NonRetryableExc.py create mode 100644 deltacat/utils/ray_utils/RetryHandler/RetryableExc.py create mode 100644 deltacat/utils/ray_utils/RetryHandler/TaskInfoObject.py create mode 100644 deltacat/utils/ray_utils/RetryHandler/retry.py diff --git a/deltacat/utils/ray_utils/RetryHandler/NonRetryableExc.py b/deltacat/utils/ray_utils/RetryHandler/NonRetryableExc.py new file mode 100644 index 00000000..e69de29b diff --git a/deltacat/utils/ray_utils/RetryHandler/RetryableExc.py b/deltacat/utils/ray_utils/RetryHandler/RetryableExc.py new file mode 100644 index 00000000..e69de29b diff --git a/deltacat/utils/ray_utils/RetryHandler/TaskInfoObject.py b/deltacat/utils/ray_utils/RetryHandler/TaskInfoObject.py new file mode 100644 index 00000000..e69de29b diff --git a/deltacat/utils/ray_utils/RetryHandler/retry.py b/deltacat/utils/ray_utils/RetryHandler/retry.py new file mode 100644 index 00000000..e69de29b From 1c2b83a9b2a0c14f8fd5e59c9f0ccce9387b1d87 Mon Sep 17 00:00:00 2001 From: Ekas Chawla Date: Tue, 11 Jul 2023 16:48:23 -0700 Subject: [PATCH 2/9] skeleton CR --- .../ray_utils/RetryHandler/NonRetryableExc.py | 7 +++ .../ray_utils/RetryHandler/RetryableExc.py | 7 +++ .../ray_utils/RetryHandler/TaskInfoObject.py | 8 +++ .../utils/ray_utils/RetryHandler/retry.py | 63 +++++++++++++++++++ 4 files changed, 85 insertions(+) diff --git a/deltacat/utils/ray_utils/RetryHandler/NonRetryableExc.py b/deltacat/utils/ray_utils/RetryHandler/NonRetryableExc.py index e69de29b..cbcd25c5 100644 --- a/deltacat/utils/ray_utils/RetryHandler/NonRetryableExc.py +++ b/deltacat/utils/ray_utils/RetryHandler/NonRetryableExc.py @@ -0,0 +1,7 @@ +class NonRetryableExc(RuntimeError): +""" +Class represents a non-retryable error +""" + +def __init__(self, *args:object) --> None: + super().__init__(*args) diff --git a/deltacat/utils/ray_utils/RetryHandler/RetryableExc.py b/deltacat/utils/ray_utils/RetryHandler/RetryableExc.py index e69de29b..0ae67661 100644 --- a/deltacat/utils/ray_utils/RetryHandler/RetryableExc.py +++ b/deltacat/utils/ray_utils/RetryHandler/RetryableExc.py @@ -0,0 +1,7 @@ +class RetryableEXc(RuntimeError): +""" +class for errors that can be retried +""" + +def __init__(self, *args: object) --> None: + super().__init__(*args) \ No newline at end of file diff --git a/deltacat/utils/ray_utils/RetryHandler/TaskInfoObject.py b/deltacat/utils/ray_utils/RetryHandler/TaskInfoObject.py index e69de29b..12da99d9 100644 --- a/deltacat/utils/ray_utils/RetryHandler/TaskInfoObject.py +++ b/deltacat/utils/ray_utils/RetryHandler/TaskInfoObject.py @@ -0,0 +1,8 @@ +Class TaskInfoObject: + def __init__(self, task_callable: Callable, task_input, num_retries: int, retry_delay: int): #what inputs do I need here + self.task_callable = task_callable + self.task_input = task_input + #self.remote_task_options = ray_remote_task_options + self.num_retries = num_retries + self.retry_delay = retry_delay + self.attempt_count = 0 \ No newline at end of file diff --git a/deltacat/utils/ray_utils/RetryHandler/retry.py b/deltacat/utils/ray_utils/RetryHandler/retry.py index e69de29b..e60ad6ab 100644 --- a/deltacat/utils/ray_utils/RetryHandler/retry.py +++ b/deltacat/utils/ray_utils/RetryHandler/retry.py @@ -0,0 +1,63 @@ +import ray +import time +import logging +from typing import List, Callable +from ray.types import ObjectRef +from RetryExceptions.retryable_exception import RetryableException +from RetryExceptions.non_retryable_exception import NonRetryableException +from RetryExceptions.TaskInfoObject import TaskInfoObject + +#inputs: task_callable, task_input, ray_remote_task_options, exception_retry_strategy_configs +#include a seperate class for errors: break down into retryable and non-retryable +#seperate class to put info in a way that the retry class can handle: ray retry task info + +#This is what specifically retries a single task +@ray.remote +def submit_single_task(taskObj: TaskInfoObject) -> Any: + try: + taskObj.attempt_count += 1 + curr_attempt = taskObj.attempt_count + return tackObj.task_callable(taskObj.task_input) + except (Exception) as exception: + # if exception is detected we want to figure out how to handle it + #pass to a new method that handles exception strategy + #retry_strat = ...exception_retry_strategy_configs + retry_config = get_retry_strategy() #need to come up with fields needed for this + if retry_config is not None: + return the exception that retry_config detected + + + +class RetryHandler: + #given a list of tasks that are failing, we want to classify the error messages and redirect the task + #depending on the exception type using a wrapper + #wrapper function that before execution, checks what exception is being thrown and go to second method to + #commence retrying + + def get_task_results(self, num_of_results: int) -> List[Any]: + #implement wrapper here that before execution will try catch an exception + #get what tasks we need to run our execution on + + finished, unfinished = ray.wait(unfinished, num_of_results) + + #assuming we have the tasks we want to get results of + for finished in finished: + finished_result = None + + try: + finished_result = ray.get(finished) + except (Exception) as exception: + #if exception send to method handle_ray_exception to determine what to do and assign the corresp error + finished_result = #evaluate the exception and return the error + + if finished_result == RetryableException: + #feed into submit_single_task + else: + + + + def handle_ray_exception(self, exception: Exception, TaskInfo: TaskInfoObject) -> Error: + #will compare the exception with known exceptions and determine way to handle it based off that + #if RayOOM Error then: raise that error + + def get_retry_strategy() \ No newline at end of file From 4ed70b1b762097734cf9b6a00baff4a81a4d0ebf Mon Sep 17 00:00:00 2001 From: Ekas Chawla Date: Thu, 13 Jul 2023 16:01:15 -0700 Subject: [PATCH 3/9] skeletonCR --- deltacat/utils/ray_utils/RetryHandler/retry.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/deltacat/utils/ray_utils/RetryHandler/retry.py b/deltacat/utils/ray_utils/RetryHandler/retry.py index e60ad6ab..52b94f86 100644 --- a/deltacat/utils/ray_utils/RetryHandler/retry.py +++ b/deltacat/utils/ray_utils/RetryHandler/retry.py @@ -37,13 +37,10 @@ class RetryHandler: def get_task_results(self, num_of_results: int) -> List[Any]: #implement wrapper here that before execution will try catch an exception #get what tasks we need to run our execution on - finished, unfinished = ray.wait(unfinished, num_of_results) - #assuming we have the tasks we want to get results of for finished in finished: finished_result = None - try: finished_result = ray.get(finished) except (Exception) as exception: @@ -53,11 +50,14 @@ def get_task_results(self, num_of_results: int) -> List[Any]: if finished_result == RetryableException: #feed into submit_single_task else: - + #Non-retryable Exception def handle_ray_exception(self, exception: Exception, TaskInfo: TaskInfoObject) -> Error: #will compare the exception with known exceptions and determine way to handle it based off that #if RayOOM Error then: raise that error - def get_retry_strategy() \ No newline at end of file + def get_retry_strategy(self, exception: Exception, TaskInfo: TaskInfoObject) --> Any: + """ + Given the exception and task info it will check what retry to execute + """ \ No newline at end of file From 297a28120acbe4e11807d5e9c9ac2bce5a6afd7e Mon Sep 17 00:00:00 2001 From: Ekas Chawla Date: Tue, 18 Jul 2023 13:52:35 -0700 Subject: [PATCH 4/9] updated retry_handler: Straggler NOT injected --- .../ray_utils/RetryHandler/TaskInfoObject.py | 8 - .../utils/ray_utils/RetryHandler/retry.py | 63 ------- .../aws_security_token_service_exception.py | 6 + .../utils/ray_utils/retry_handler/logger.py | 14 ++ .../non_retryable_error.py} | 2 +- .../ray_remote_tasks_batch_scaling_params.py | 17 ++ .../utils/ray_utils/retry_handler/retry.py | 175 ++++++++++++++++++ .../retry_handler/retry_strategy_config.py | 14 ++ .../retryable_error.py} | 2 +- .../ray_utils/retry_handler/task_constants.py | 32 ++++ .../task_exception_retry_config.py | 23 +++ .../retry_handler/task_execution_error.py | 7 + .../retry_handler/task_info_object.py | 17 ++ .../ray_utils/retry_handler/task_options.py | 15 ++ 14 files changed, 322 insertions(+), 73 deletions(-) delete mode 100644 deltacat/utils/ray_utils/RetryHandler/TaskInfoObject.py delete mode 100644 deltacat/utils/ray_utils/RetryHandler/retry.py create mode 100644 deltacat/utils/ray_utils/retry_handler/aws_security_token_service_exception.py create mode 100644 deltacat/utils/ray_utils/retry_handler/logger.py rename deltacat/utils/ray_utils/{RetryHandler/NonRetryableExc.py => retry_handler/non_retryable_error.py} (75%) create mode 100644 deltacat/utils/ray_utils/retry_handler/ray_remote_tasks_batch_scaling_params.py create mode 100644 deltacat/utils/ray_utils/retry_handler/retry.py create mode 100644 deltacat/utils/ray_utils/retry_handler/retry_strategy_config.py rename deltacat/utils/ray_utils/{RetryHandler/RetryableExc.py => retry_handler/retryable_error.py} (76%) create mode 100644 deltacat/utils/ray_utils/retry_handler/task_constants.py create mode 100644 deltacat/utils/ray_utils/retry_handler/task_exception_retry_config.py create mode 100644 deltacat/utils/ray_utils/retry_handler/task_execution_error.py create mode 100644 deltacat/utils/ray_utils/retry_handler/task_info_object.py create mode 100644 deltacat/utils/ray_utils/retry_handler/task_options.py diff --git a/deltacat/utils/ray_utils/RetryHandler/TaskInfoObject.py b/deltacat/utils/ray_utils/RetryHandler/TaskInfoObject.py deleted file mode 100644 index 12da99d9..00000000 --- a/deltacat/utils/ray_utils/RetryHandler/TaskInfoObject.py +++ /dev/null @@ -1,8 +0,0 @@ -Class TaskInfoObject: - def __init__(self, task_callable: Callable, task_input, num_retries: int, retry_delay: int): #what inputs do I need here - self.task_callable = task_callable - self.task_input = task_input - #self.remote_task_options = ray_remote_task_options - self.num_retries = num_retries - self.retry_delay = retry_delay - self.attempt_count = 0 \ No newline at end of file diff --git a/deltacat/utils/ray_utils/RetryHandler/retry.py b/deltacat/utils/ray_utils/RetryHandler/retry.py deleted file mode 100644 index 52b94f86..00000000 --- a/deltacat/utils/ray_utils/RetryHandler/retry.py +++ /dev/null @@ -1,63 +0,0 @@ -import ray -import time -import logging -from typing import List, Callable -from ray.types import ObjectRef -from RetryExceptions.retryable_exception import RetryableException -from RetryExceptions.non_retryable_exception import NonRetryableException -from RetryExceptions.TaskInfoObject import TaskInfoObject - -#inputs: task_callable, task_input, ray_remote_task_options, exception_retry_strategy_configs -#include a seperate class for errors: break down into retryable and non-retryable -#seperate class to put info in a way that the retry class can handle: ray retry task info - -#This is what specifically retries a single task -@ray.remote -def submit_single_task(taskObj: TaskInfoObject) -> Any: - try: - taskObj.attempt_count += 1 - curr_attempt = taskObj.attempt_count - return tackObj.task_callable(taskObj.task_input) - except (Exception) as exception: - # if exception is detected we want to figure out how to handle it - #pass to a new method that handles exception strategy - #retry_strat = ...exception_retry_strategy_configs - retry_config = get_retry_strategy() #need to come up with fields needed for this - if retry_config is not None: - return the exception that retry_config detected - - - -class RetryHandler: - #given a list of tasks that are failing, we want to classify the error messages and redirect the task - #depending on the exception type using a wrapper - #wrapper function that before execution, checks what exception is being thrown and go to second method to - #commence retrying - - def get_task_results(self, num_of_results: int) -> List[Any]: - #implement wrapper here that before execution will try catch an exception - #get what tasks we need to run our execution on - finished, unfinished = ray.wait(unfinished, num_of_results) - #assuming we have the tasks we want to get results of - for finished in finished: - finished_result = None - try: - finished_result = ray.get(finished) - except (Exception) as exception: - #if exception send to method handle_ray_exception to determine what to do and assign the corresp error - finished_result = #evaluate the exception and return the error - - if finished_result == RetryableException: - #feed into submit_single_task - else: - #Non-retryable Exception - - - def handle_ray_exception(self, exception: Exception, TaskInfo: TaskInfoObject) -> Error: - #will compare the exception with known exceptions and determine way to handle it based off that - #if RayOOM Error then: raise that error - - def get_retry_strategy(self, exception: Exception, TaskInfo: TaskInfoObject) --> Any: - """ - Given the exception and task info it will check what retry to execute - """ \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/aws_security_token_service_exception.py b/deltacat/utils/ray_utils/retry_handler/aws_security_token_service_exception.py new file mode 100644 index 00000000..860482a3 --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/aws_security_token_service_exception.py @@ -0,0 +1,6 @@ +from deltacat.utils.ray_utils.retry_handler.retryable_error import RetryableError + +class AWSSecurityTokenException(RetryableError): + + def __init__(self, *args: object) -> None: + super().__init__(*args) \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/logger.py b/deltacat/utils/ray_utils/retry_handler/logger.py new file mode 100644 index 00000000..4527773a --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/logger.py @@ -0,0 +1,14 @@ +from logging import Logger +import logging + + +def configure_logger(logger: Logger) -> Logger: + logging.basicConfig(level=logging.INFO, + format='[%(asctime)s] %(levelname)s [%(name)s;%(filename)s.%(funcName)s:%(lineno)d] %(message)s', + datefmt='%a, %d %b %Y %H:%M:%S') + + # These modules were not configured to honor the log level specified, + # Hence, explicitly setting log level for them. + logging.getLogger("deltacat.utils.pyarrow").setLevel(logging.INFO) + logging.getLogger("amazoncerts.cacerts_helpers").setLevel(logging.ERROR) + return logger \ No newline at end of file diff --git a/deltacat/utils/ray_utils/RetryHandler/NonRetryableExc.py b/deltacat/utils/ray_utils/retry_handler/non_retryable_error.py similarity index 75% rename from deltacat/utils/ray_utils/RetryHandler/NonRetryableExc.py rename to deltacat/utils/ray_utils/retry_handler/non_retryable_error.py index cbcd25c5..e9b69704 100644 --- a/deltacat/utils/ray_utils/RetryHandler/NonRetryableExc.py +++ b/deltacat/utils/ray_utils/retry_handler/non_retryable_error.py @@ -1,4 +1,4 @@ -class NonRetryableExc(RuntimeError): +class NonRetryableError(RuntimeError): """ Class represents a non-retryable error """ diff --git a/deltacat/utils/ray_utils/retry_handler/ray_remote_tasks_batch_scaling_params.py b/deltacat/utils/ray_utils/retry_handler/ray_remote_tasks_batch_scaling_params.py new file mode 100644 index 00000000..eb4faa05 --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/ray_remote_tasks_batch_scaling_params.py @@ -0,0 +1,17 @@ +from deltacat.utils.ray_utils.retry_handler.task_constants import DEFAULT_RAY_REMOTE_TASK_BATCH_NEGATIVE_FEEDBACK_BATCH_SIZE_MULTIPLICATIVE_DECREASE_FACTOR, DEFAULT_RAY_REMOTE_TASK_BATCH_NEGATIVE_FEEDBACK_BACK_OFF_IN_MS, DEFAULT_RAY_REMOTE_TASK_BATCH_POSITIVE_FEEDBACK_BATCH_SIZE_ADDITIVE_INCREASE +from dataclasses import dataclass + +class RayRemoteTasksBatchScalingParams(): + """ + Represents the batch scaling params of the Ray remote tasks + need to add constants that this file refers to + """ + def __init__(self, + initial_batch_size: int, + negative_feedback_back_off_in_ms: int = DEFAULT_RAY_REMOTE_TASK_BATCH_NEGATIVE_FEEDBACK_BACK_OFF_IN_MS, + positive_feedback_batch_size_additive_increase: int = DEFAULT_RAY_REMOTE_TASK_BATCH_POSITIVE_FEEDBACK_BATCH_SIZE_ADDITIVE_INCREASE, + negative_feedback_batch_size_multiplicative_decrease_factor: int = DEFAULT_RAY_REMOTE_TASK_BATCH_NEGATIVE_FEEDBACK_BATCH_SIZE_MULTIPLICATIVE_DECREASE_FACTOR): + self.initial_batch_size = initial_batch_size + self.negative_feedback_back_off_in_ms = negative_feedback_back_off_in_ms + self.positive_feedback_batch_size_additive_increase = positive_feedback_batch_size_additive_increase + self.negative_feedback_batch_size_multiplicative_decrease_factor = negative_feedback_batch_size_multiplicative_decrease_factor \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/retry.py b/deltacat/utils/ray_utils/retry_handler/retry.py new file mode 100644 index 00000000..f296717b --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/retry.py @@ -0,0 +1,175 @@ +from __future__ import annotations +from typing import Any, Dict, List, cast +from deltacat.utils.ray_utils.retry_handler.ray_remote_tasks_batch_scaling_params import RayRemoteTasksBatchScalingParams +#import necessary errors here +import ray +import time +import logging +from deltacat.utils.ray_utils.retry_handler.logger import configure_logger +from deltacat.utils.ray_utils.retry_handler.task_execution_error import RayRemoteTaskExecutionError +from deltacat.utils.ray_utils.retry_handler.task_info_object import TaskInfoObject +from deltacat.utils.ray_utils.retry_handler.retry_strategy_config import get_retry_strategy_config_for_known_exception + +logger = configure_logger(logging.getLogger(__name__)) + +import ray +import time +import logging +from typing import Any, Dict, List, cast +from ray.types import ObjectRef +from RetryExceptions.retryable_exception import RetryableException +from RetryExceptions.non_retryable_exception import NonRetryableException +from RetryExceptions.TaskInfoObject import TaskInfoObject + +#inputs: task_callable, task_input, ray_remote_task_options, exception_retry_strategy_configs +#include a seperate class for errors: break down into retryable and non-retryable +#seperate class to put info in a way that the retry class can handle: ray retry task info + +#This is what specifically retries a single task +@ray.remote +def submit_single_task(taskObj: TaskInfoObject, progressNotifier: Optional[NotificationInterface] = None) -> Any: + try: + taskObj.attempt_count += 1 + curr_attempt = taskObj.attempt_count + if progressNotifier is not None: + #method call to straggler detection using notifier + logger.debug(f"Executing the submitted Ray remote task as part of attempt number: {current_attempt_number}") + return tackObj.task_callable(taskObj.task_input) + except (Exception) as exception: + exception_retry_strategy_config = get_retry_strategy_config_for_known_exception(exception, task_info_object.exception_retry_strategy_configs) + #pass to a new method that handles exception strategy + #retry_strat = ...exception_retry_strategy_configs + if exception_retry_strategy_config is not None: + return RayRemoteTaskExecutionError(exception_retry_strategy_config.exception, task_info_object) + + + + +class RetryHandler: + #given a list of tasks that are failing, we want to classify the error messages and redirect the task + #depending on the exception type using a wrapper + #wrapper function that before execution, checks what exception is being thrown and go to second method to + #commence retrying + def execute_task(self, ray_remote_task_info: RayRemoteTaskInfo) -> Any: + self.start_tasks_execution([ray_remote_task_info]) + return self.wait_and_get_all_task_results()[0] + + """ + Starts execution of all given Ray remote tasks + """ + def start_tasks_execution(self, ray_remote_task_infos: List[TaskInfoObject]) -> None: + self.start_tasks_execution_in_batches(ray_remote_task_infos, RayRemoteTasksBatchScalingParams(initial_batch_size=len(ray_remote_task_infos))) + + """ + Starts execution of given Ray remote tasks in batches depending on the given Batch scaling params + """ + def start_tasks_execution_in_batches(self, ray_remote_task_infos: List[RayRemoteTaskInfo], batch_scaling_params: RayRemoteTasksBatchScalingParams) -> None: + self.num_of_submitted_tasks = len(ray_remote_task_infos) + self.current_batch_size = min(batch_scaling_params.initial_batch_size, self.num_of_submitted_tasks) + self.num_of_submitted_tasks_completed = 0 + self.remaining_ray_remote_task_infos = ray_remote_task_infos + self.batch_scaling_params = batch_scaling_params + self.task_promise_obj_ref_to_task_info_map: Dict[Any, RayRemoteTaskInfo] = {} + + self.unfinished_promises: List[Any] = [] + logger.info(f"Starting the execution of {len(ray_remote_task_infos)} Ray remote tasks. Concurrency of tasks execution: {self.current_batch_size}") + self.__submit_tasks(self.remaining_ray_remote_task_infos[:self.current_batch_size]) + self.remaining_ray_remote_task_infos = self.remaining_ray_remote_task_infos[self.current_batch_size:] + + + def wait_and_get_all_task_results(self) -> List[Any]: + return self.wait_and_get_task_results(self.num_of_submitted_tasks) + + def get_task_results(self, num_of_results: int) -> List[Any]: + #implement wrapper here that before execution will try catch an exception + #get what tasks we need to run our execution on + finished, unfinished = ray.wait(unfinished, num_of_results) + #assuming we have the tasks we want to get results of + for finished in finished: + finished_result = None + try: + finished_result = ray.get(finished) + except (Exception) as exception: + #if exception send to method handle_ray_exception to determine what to do and assign the corresp error + finished_result = self.handle_ray_exception(exception=exception, TaskInfoObject = )#evaluate the exception and return the error + + if finished_result and type(finished_result) == RayRemoteTaskExecutionError: + finished_result = cast(RayRemoteTaskExecutionError, finished_result) + exception_retry_strategy_config = get_retry_strategy_config_for_known_exception(finished_result.exception, + finished_result.ray_remote_task_info.exception_retry_strategy_configs) + if (exception_retry_strategy_config is None or finished_result.ray_remote_task_info.num_of_attempts > exception_retry_strategy_config.max_retry_attempts): + logger.error(f"The submitted task has exhausted all the maximum retries configured and finally throws exception - {finished_result.exception}") + raise finished_result.exception + self.__update_ray_remote_task_options_on_exception(finished_result.exception, finished_result.ray_remote_task_info) + self.unfinished_promises.append(self.__invoke_ray_remote_task(ray_remote_task_info=finished_result.ray_remote_task_info)) + else: + successful_results.append(finished_result) + del self.task_promise_obj_ref_to_task_info_map[str(finished_promise)] + + num_of_successful_results = len(successful_results) + self.num_of_submitted_tasks_completed += num_of_successful_results + self.current_batch_size -= num_of_successful_results + + self.__enqueue_new_tasks(num_of_successful_results) + + if num_of_successful_results < num_of_results: + successful_results.extend(self.wait_and_get_task_results(num_of_results - num_of_successful_results)) + return successful_results + else: + return successful_results + + + def __enqueue_new_tasks(self, num_of_tasks: int) -> None: + new_tasks_submitted = self.remaining_ray_remote_task_infos[:num_of_tasks] + num_of_new_tasks_submitted = len(new_tasks_submitted) + self.__submit_tasks(new_tasks_submitted) + self.remaining_ray_remote_task_infos = self.remaining_ray_remote_task_infos[num_of_tasks:] + self.current_batch_size += num_of_new_tasks_submitted + logger.info(f"Enqueued {num_of_new_tasks_submitted} new tasks. Current concurrency of tasks execution: {self.current_batch_size}, Current Task progress: {self.num_of_submitted_tasks_completed}/{self.num_of_submitted_tasks}") + + def __submit_tasks(self, ray_remote_task_infos: List[RayRemoteTaskInfo]) -> None: + for ray_remote_task_info in ray_remote_task_infos: + time.sleep(0.005) + self.unfinished_promises.append(self.__invoke_ray_remote_task(ray_remote_task_info=ray_remote_task_info)) + + def __invoke_ray_remote_task(self, ray_remote_task_info: RayRemoteTaskInfo) -> Any: + ray_remote_task_options_arguments = dict() + + if ray_remote_task_info.ray_remote_task_options.memory: + ray_remote_task_options_arguments['memory'] = ray_remote_task_info.ray_remote_task_options.memory + + if ray_remote_task_info.ray_remote_task_options.num_cpus: + ray_remote_task_options_arguments['num_cpus'] = ray_remote_task_info.ray_remote_task_options.num_cpus + + if ray_remote_task_info.ray_remote_task_options.placement_group: + ray_remote_task_options_arguments['placement_group'] = ray_remote_task_info.ray_remote_task_options.placement_group + + ray_remote_task_promise_obj_ref = submit_single_task.options(**ray_remote_task_options_arguments).remote(ray_remote_task_info=ray_remote_task_info) + self.task_promise_obj_ref_to_task_info_map[str(ray_remote_task_promise_obj_ref)] = ray_remote_task_info + + return ray_remote_task_promise_obj_ref + + def __update_ray_remote_task_options_on_exception(self, exception: Exception, ray_remote_task_info: RayRemoteTaskInfo): + exception_retry_strategy_config = get_retry_strategy_config_for_known_exception(exception, ray_remote_task_info.exception_retry_strategy_configs) + if exception_retry_strategy_config and ray_remote_task_info.ray_remote_task_options.memory: + logger.info(f"Updating the Ray remote task options after encountering exception: {exception}") + ray_remote_task_memory_multiply_factor = exception_retry_strategy_config.ray_remote_task_memory_multiply_factor + ray_remote_task_info.ray_remote_task_options.memory *= ray_remote_task_memory_multiply_factor + logger.info(f"Updated ray remote task options Memory: {ray_remote_task_info.ray_remote_task_options.memory}") + + def __handle_ray_exception(self, exception: Exception, ray_remote_task_info: RayRemoteTaskInfo) -> RayRemoteTaskExecutionError: + logger.error(f"Ray remote task failed with {type(exception)} Ray exception: {exception}") + if type(exception).__name__ == "RayTaskError(UnexpectedRayTaskError)": + raise UnexpectedRayTaskError(str(exception)) + elif type(exception).__name__ == "RayTaskError(RayOutOfMemoryError)": + return RayRemoteTaskExecutionError(exception=RayOutOfMemoryError(str(exception)), ray_remote_task_info=ray_remote_task_info) + elif type(exception) == ray.exceptions.OwnerDiedError: + return RayRemoteTaskExecutionError(exception=RayOwnerDiedError(str(exception)), ray_remote_task_info=ray_remote_task_info) + elif type(exception) == ray.exceptions.WorkerCrashedError: + return RayRemoteTaskExecutionError(exception=RayWorkerCrashedError(str(exception)), ray_remote_task_info=ray_remote_task_info) + elif type(exception) == ray.exceptions.LocalRayletDiedError: + return RayRemoteTaskExecutionError(exception=RayLocalRayletDiedError(str(exception)), ray_remote_task_info=ray_remote_task_info) + elif type(exception) == ray.exceptions.RaySystemError: + return RayRemoteTaskExecutionError(exception=RaySystemError(str(exception)), ray_remote_task_info=ray_remote_task_info) + + raise UnexpectedRayPlatformError(str(exception)) \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/retry_strategy_config.py b/deltacat/utils/ray_utils/retry_handler/retry_strategy_config.py new file mode 100644 index 00000000..b75b0e28 --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/retry_strategy_config.py @@ -0,0 +1,14 @@ +from typing import List, Optional + +from ray_manager.models.ray_remote_task_exception_retry_strategy_config import RayRemoteTaskExceptionRetryConfig + +def get_retry_strategy_config_for_known_exception(exception: Exception, exception_retry_strategy_configs: List[RayRemoteTaskExceptionRetryConfig]) -> Optional[RayRemoteTaskExceptionRetryConfig]: + for exception_retry_strategy_config in exception_retry_strategy_configs: + if type(exception) == type(exception_retry_strategy_config.exception): + return exception_retry_strategy_config + + for exception_retry_strategy_config in exception_retry_strategy_configs: + if isinstance(exception, type(exception_retry_strategy_config.exception)): + return exception_retry_strategy_config + + return None \ No newline at end of file diff --git a/deltacat/utils/ray_utils/RetryHandler/RetryableExc.py b/deltacat/utils/ray_utils/retry_handler/retryable_error.py similarity index 76% rename from deltacat/utils/ray_utils/RetryHandler/RetryableExc.py rename to deltacat/utils/ray_utils/retry_handler/retryable_error.py index 0ae67661..0ecf0aa5 100644 --- a/deltacat/utils/ray_utils/RetryHandler/RetryableExc.py +++ b/deltacat/utils/ray_utils/retry_handler/retryable_error.py @@ -1,4 +1,4 @@ -class RetryableEXc(RuntimeError): +class RetryableError(RuntimeError): """ class for errors that can be retried """ diff --git a/deltacat/utils/ray_utils/retry_handler/task_constants.py b/deltacat/utils/ray_utils/retry_handler/task_constants.py new file mode 100644 index 00000000..ff09b49e --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/task_constants.py @@ -0,0 +1,32 @@ +""" +Default max retry attempts of Ray remote task +""" +DEFAULT_MAX_RAY_REMOTE_TASK_RETRY_ATTEMPTS = 3 +""" +Default initial backoff before Ray remote task retry, in milli seconds +""" +DEFAULT_RAY_REMOTE_TASK_RETRY_INITIAL_BACK_OFF_IN_MS = 5000 +""" +Default Ray remote task retry back off factor +""" +DEFAULT_RAY_REMOTE_TASK_RETRY_BACK_OFF_FACTOR = 2 +""" +Default Ray remote task memory multiplication factor +""" +DEFAULT_RAY_REMOTE_TASK_MEMORY_MULTIPLICATION_FACTOR = 1 +""" +Ray remote task memory multiplication factor for Ray out of memory error +""" +RAY_REMOTE_TASK_MEMORY_MULTIPLICATION_FACTOR_FOR_OUT_OF_MEMORY_ERROR = 2 +""" +Default Ray remote task batch negative feedback back off in milli seconds +""" +DEFAULT_RAY_REMOTE_TASK_BATCH_NEGATIVE_FEEDBACK_BACK_OFF_IN_MS = 0 +""" +Default Ray remote task batch positive feedback batch size additive increase +""" +DEFAULT_RAY_REMOTE_TASK_BATCH_POSITIVE_FEEDBACK_BATCH_SIZE_ADDITIVE_INCREASE = 0 +""" +Default Ray remote task batch positive feedback batch size multiplicative decrease factor +""" +DEFAULT_RAY_REMOTE_TASK_BATCH_NEGATIVE_FEEDBACK_BATCH_SIZE_MULTIPLICATIVE_DECREASE_FACTOR = 1 \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/task_exception_retry_config.py b/deltacat/utils/ray_utils/retry_handler/task_exception_retry_config.py new file mode 100644 index 00000000..a0e0cbc5 --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/task_exception_retry_config.py @@ -0,0 +1,23 @@ +from __future__ import annotations +from dataclasses import dataclass +from typing import List +from deltacat.utils.ray_utils.retry_handler.task_constants import DEFAULT_RAY_REMOTE_TASK_BATCH_NEGATIVE_FEEDBACK_BATCH_SIZE_MULTIPLICATIVE_DECREASE_FACTOR, DEFAULT_RAY_REMOTE_TASK_BATCH_NEGATIVE_FEEDBACK_BACK_OFF_IN_MS, DEFAULT_RAY_REMOTE_TASK_BATCH_POSITIVE_FEEDBACK_BATCH_SIZE_ADDITIVE_INCREASE + +class TaskExceptionRetryConfig(): + def __init__(self, exception: Exception, + max_retry_attempts: int = DEFAULT_MAX_RAY_REMOTE_TASK_RETRY_ATTEMPTS, + initial_back_off_in_ms: int = DEFAULT_RAY_REMOTE_TASK_RETRY_INITIAL_BACK_OFF_IN_MS, + back_off_factor: int = DEFAULT_RAY_REMOTE_TASK_RETRY_BACK_OFF_FACTOR, + ray_remote_task_memory_multiplication_factor: float = DEFAULT_RAY_REMOTE_TASK_MEMORY_MULTIPLICATION_FACTOR, + is_throttling_exception: bool = False) -> None: + self.exception = exception + self.max_retry_attempts = max_retry_attempts + self.initial_back_off_in_ms = initial_back_off_in_ms + self.back_off_factor = back_off_factor + self.ray_remote_task_memory_multiply_factor = ray_remote_task_memory_multiplication_factor + self.is_throttling_exception = is_throttling_exception + + @staticmethod + def getDefaultConfig() -> List[TaskExceptionRetryConfig]: + return [TaskExceptionRetryConfig(exception=RetryableError(), is_throttling_exception=True), + TaskExceptionRetryConfig(exception=RayOutOfMemoryError(), ray_remote_task_memory_multiplication_factor=RAY_REMOTE_TASK_MEMORY_MULTIPLICATION_FACTOR_FOR_OUT_OF_MEMORY_ERROR)] \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/task_execution_error.py b/deltacat/utils/ray_utils/retry_handler/task_execution_error.py new file mode 100644 index 00000000..8117040d --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/task_execution_error.py @@ -0,0 +1,7 @@ +class RayRemoteTaskExecutionError(): + """ + An error class that denotes the Ray Remote Task Execution Failure + """ + def __init__(self, exception: Exception, ray_remote_task_info: RayRemoteTaskInfo) -> None: + self.exception = exception + self.ray_remote_task_info = ray_remote_task_info \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/task_info_object.py b/deltacat/utils/ray_utils/retry_handler/task_info_object.py new file mode 100644 index 00000000..9471d7de --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/task_info_object.py @@ -0,0 +1,17 @@ +from dataclasses import dataclass +from typing import Any, Callable, List +from deltacat.utils.ray_utils.retry_handler.task_info_object import TaskExceptionRetryConfig +from deltacat.utils.ray_utils.retry_handler.task_options import RayRemoteTaskOptions + +@dataclass +Class TaskInfoObject: + def __init__(self, + task_callable: Callable[[Any], [Any], + task_input: Any, + ray_remote_task_options: RayRemoteTaskOptions = RayRemoteTaskOptions(), + exception_retry_strategy_configs: List[TaskExceptionRetryConfig]): #what inputs do I need here + self.task_callable = task_callable + self.task_input = task_input + self.ray_remote_task_options = ray_remote_task_options + self.exception_retry_strategy_configs = exception_retry_strategy_configs + self.num_of_attempts = 0 diff --git a/deltacat/utils/ray_utils/retry_handler/task_options.py b/deltacat/utils/ray_utils/retry_handler/task_options.py new file mode 100644 index 00000000..4383d605 --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/task_options.py @@ -0,0 +1,15 @@ +from dataclasses import dataclass +from typing import Any, Optional + +@dataclass +class RayRemoteTaskOptions(): + """ + Represents the options corresponding to Ray remote task + """ + def __init__(self, + memory: Optional[float] = None, + num_cpus: Optional[int] = None, + placement_group: Optional[Any] = None) -> None: + self.memory = memory + self.num_cpus = num_cpus + self.placement_group = placement_group \ No newline at end of file From b55c7770263ca66b2ffe6840c4f941f2ae84d8ce Mon Sep 17 00:00:00 2001 From: Ekas Chawla Date: Thu, 20 Jul 2023 12:48:37 -0700 Subject: [PATCH 5/9] answering CR comments --- .../utils/ray_utils/retry_handler/README.md | 22 +++++ .../retry_handler/batch_scaling_strategy.py | 18 ++++ ...y_strategy_config.py => exception_util.py} | 0 .../aws_security_token_service_exception.py | 2 +- .../retry_handler/failures/broken_pipe.py | 6 ++ .../failures/cairns_client_exception.py | 6 ++ .../failures/cairns_resource_not_found.py | 6 ++ .../failures/manifest_generation_exception.py | 6 ++ .../{ => failures}/non_retryable_error.py | 4 +- .../retry_handler/failures/port_conflict.py | 6 ++ .../{ => failures}/retryable_error.py | 5 +- .../failures/upload_part_throttle.py | 6 ++ .../utils/ray_utils/retry_handler/logger.py | 14 --- .../ray_remote_tasks_batch_scaling_params.py | 16 +++- ...etry.py => ray_task_submission_handler.py} | 86 ++++++++----------- .../staggler_detection_interface.py | 25 ++++++ .../retry_handler/task_info_object.py | 2 +- 17 files changed, 157 insertions(+), 73 deletions(-) create mode 100644 deltacat/utils/ray_utils/retry_handler/README.md create mode 100644 deltacat/utils/ray_utils/retry_handler/batch_scaling_strategy.py rename deltacat/utils/ray_utils/retry_handler/{retry_strategy_config.py => exception_util.py} (100%) rename deltacat/utils/ray_utils/retry_handler/{ => failures}/aws_security_token_service_exception.py (58%) create mode 100644 deltacat/utils/ray_utils/retry_handler/failures/broken_pipe.py create mode 100644 deltacat/utils/ray_utils/retry_handler/failures/cairns_client_exception.py create mode 100644 deltacat/utils/ray_utils/retry_handler/failures/cairns_resource_not_found.py create mode 100644 deltacat/utils/ray_utils/retry_handler/failures/manifest_generation_exception.py rename deltacat/utils/ray_utils/retry_handler/{ => failures}/non_retryable_error.py (52%) create mode 100644 deltacat/utils/ray_utils/retry_handler/failures/port_conflict.py rename deltacat/utils/ray_utils/retry_handler/{ => failures}/retryable_error.py (50%) create mode 100644 deltacat/utils/ray_utils/retry_handler/failures/upload_part_throttle.py delete mode 100644 deltacat/utils/ray_utils/retry_handler/logger.py rename deltacat/utils/ray_utils/retry_handler/{retry.py => ray_task_submission_handler.py} (74%) create mode 100644 deltacat/utils/ray_utils/retry_handler/staggler_detection_interface.py diff --git a/deltacat/utils/ray_utils/retry_handler/README.md b/deltacat/utils/ray_utils/retry_handler/README.md new file mode 100644 index 00000000..6c250fe2 --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/README.md @@ -0,0 +1,22 @@ +This module represents a straggler detection and retry handler framework + +Within retry_strategy_config.py, the client can provide 3 parameters to start_tasks_execution to perform retries and detect stragglers +Params: +1. ray_remote_task_info: A list of Ray task objects +2. scaling_strategy: Batch scaling parameters for how many tasks to execute per batch (Optional) + a. If not provided, a default AIMD (additive increase, multiplicative decrease) strategy will be assigned for retries +3. straggler_detection: Client-provided class that holds logic for how they want to detect straggler tasks (Optional) + a. Client algorithm must inherit the interface for detection which will be used in wait_and_get_results + +Use cases: +1. Notifying progress + a. +2. Detecting stragglers + Given the straggler detection algorithm fed in by the client, the method get_timeout_val will be used to determine how + long the task will run before it is considered a straggler. The logic for this must be provided by the client internally. +3. Retrying retryable exceptions + a. Within the failure directory, there are common errors that are retryable and when detected as an instance + of the retryable class, will cause the task to be retried through submitting the task. + +The client can provide these inputs to fulfil the following use cases: + diff --git a/deltacat/utils/ray_utils/retry_handler/batch_scaling_strategy.py b/deltacat/utils/ray_utils/retry_handler/batch_scaling_strategy.py new file mode 100644 index 00000000..f43dd141 --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/batch_scaling_strategy.py @@ -0,0 +1,18 @@ +from abc import ABC, abstractmethod + +class BatchScalingStrategy(ABC): + """ + Interface for a generic batch scaling that the client can provide. + """ + + @abstractmethod + def increase_batch_size(self, current_size: int) -> int: + pass + + @abstractmethod + def decrease_batch_size(self, current_size: int) -> int: + pass + + @abstractmethod + def get_batch_size(self) -> int: + pass diff --git a/deltacat/utils/ray_utils/retry_handler/retry_strategy_config.py b/deltacat/utils/ray_utils/retry_handler/exception_util.py similarity index 100% rename from deltacat/utils/ray_utils/retry_handler/retry_strategy_config.py rename to deltacat/utils/ray_utils/retry_handler/exception_util.py diff --git a/deltacat/utils/ray_utils/retry_handler/aws_security_token_service_exception.py b/deltacat/utils/ray_utils/retry_handler/failures/aws_security_token_service_exception.py similarity index 58% rename from deltacat/utils/ray_utils/retry_handler/aws_security_token_service_exception.py rename to deltacat/utils/ray_utils/retry_handler/failures/aws_security_token_service_exception.py index 860482a3..ee9c776c 100644 --- a/deltacat/utils/ray_utils/retry_handler/aws_security_token_service_exception.py +++ b/deltacat/utils/ray_utils/retry_handler/failures/aws_security_token_service_exception.py @@ -1,4 +1,4 @@ -from deltacat.utils.ray_utils.retry_handler.retryable_error import RetryableError +from deltacat.utils.ray_utils.retry_handler.retryable_error.failures import RetryableError class AWSSecurityTokenException(RetryableError): diff --git a/deltacat/utils/ray_utils/retry_handler/failures/broken_pipe.py b/deltacat/utils/ray_utils/retry_handler/failures/broken_pipe.py new file mode 100644 index 00000000..537f00c6 --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/failures/broken_pipe.py @@ -0,0 +1,6 @@ +from deltacat.utils.ray_utils.retry_handler.retryable_error.failures import RetryableError + +class BrokenPipe(RetryableError): + + def __init__(self, *args: object) -> None: + super().__init__(*args) \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/failures/cairns_client_exception.py b/deltacat/utils/ray_utils/retry_handler/failures/cairns_client_exception.py new file mode 100644 index 00000000..ce2778e6 --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/failures/cairns_client_exception.py @@ -0,0 +1,6 @@ +from deltacat.utils.ray_utils.retry_handler.retryable_error.failures import RetryableError + +class CairnsClientException(RetryableError): + + def __init__(self, *args: object) -> None: + super().__init__(*args) \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/failures/cairns_resource_not_found.py b/deltacat/utils/ray_utils/retry_handler/failures/cairns_resource_not_found.py new file mode 100644 index 00000000..1e212cd7 --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/failures/cairns_resource_not_found.py @@ -0,0 +1,6 @@ +from deltacat.utils.ray_utils.retry_handler.retryable_error.failures import NonRetryableError + +class CairnsResourceNotFound(NonRetryableError): + + def __init__(self, *args: object) -> None: + super().__init__(*args) \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/failures/manifest_generation_exception.py b/deltacat/utils/ray_utils/retry_handler/failures/manifest_generation_exception.py new file mode 100644 index 00000000..a5cdfbc4 --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/failures/manifest_generation_exception.py @@ -0,0 +1,6 @@ +from deltacat.utils.ray_utils.retry_handler.retryable_error.failures import NonRetryableError + +class ManifestGenerationException(NonRetryableError): + + def __init__(self, *args: object) -> None: + super().__init__(*args) \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/non_retryable_error.py b/deltacat/utils/ray_utils/retry_handler/failures/non_retryable_error.py similarity index 52% rename from deltacat/utils/ray_utils/retry_handler/non_retryable_error.py rename to deltacat/utils/ray_utils/retry_handler/failures/non_retryable_error.py index e9b69704..3699b167 100644 --- a/deltacat/utils/ray_utils/retry_handler/non_retryable_error.py +++ b/deltacat/utils/ray_utils/retry_handler/failures/non_retryable_error.py @@ -3,5 +3,5 @@ class NonRetryableError(RuntimeError): Class represents a non-retryable error """ -def __init__(self, *args:object) --> None: - super().__init__(*args) + def __init__(self, *args:object) --> None: + super().__init__(*args) diff --git a/deltacat/utils/ray_utils/retry_handler/failures/port_conflict.py b/deltacat/utils/ray_utils/retry_handler/failures/port_conflict.py new file mode 100644 index 00000000..a92da6ac --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/failures/port_conflict.py @@ -0,0 +1,6 @@ +from deltacat.utils.ray_utils.retry_handler.retryable_error.failures import RetryableError + +class PortConflict(RetryableError): + + def __init__(self, *args: object) -> None: + super().__init__(*args) \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/retryable_error.py b/deltacat/utils/ray_utils/retry_handler/failures/retryable_error.py similarity index 50% rename from deltacat/utils/ray_utils/retry_handler/retryable_error.py rename to deltacat/utils/ray_utils/retry_handler/failures/retryable_error.py index 0ecf0aa5..6b75ce8f 100644 --- a/deltacat/utils/ray_utils/retry_handler/retryable_error.py +++ b/deltacat/utils/ray_utils/retry_handler/failures/retryable_error.py @@ -2,6 +2,5 @@ class RetryableError(RuntimeError): """ class for errors that can be retried """ - -def __init__(self, *args: object) --> None: - super().__init__(*args) \ No newline at end of file + def __init__(self, *args: object) --> None: + super().__init__(*args) \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/failures/upload_part_throttle.py b/deltacat/utils/ray_utils/retry_handler/failures/upload_part_throttle.py new file mode 100644 index 00000000..dd2e5c31 --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/failures/upload_part_throttle.py @@ -0,0 +1,6 @@ +from deltacat.utils.ray_utils.retry_handler.retryable_error.failures import RetryableError + +class UploadPartThrottle(RetryableError): + + def __init__(self, *args: object) -> None: + super().__init__(*args) \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/logger.py b/deltacat/utils/ray_utils/retry_handler/logger.py deleted file mode 100644 index 4527773a..00000000 --- a/deltacat/utils/ray_utils/retry_handler/logger.py +++ /dev/null @@ -1,14 +0,0 @@ -from logging import Logger -import logging - - -def configure_logger(logger: Logger) -> Logger: - logging.basicConfig(level=logging.INFO, - format='[%(asctime)s] %(levelname)s [%(name)s;%(filename)s.%(funcName)s:%(lineno)d] %(message)s', - datefmt='%a, %d %b %Y %H:%M:%S') - - # These modules were not configured to honor the log level specified, - # Hence, explicitly setting log level for them. - logging.getLogger("deltacat.utils.pyarrow").setLevel(logging.INFO) - logging.getLogger("amazoncerts.cacerts_helpers").setLevel(logging.ERROR) - return logger \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/ray_remote_tasks_batch_scaling_params.py b/deltacat/utils/ray_utils/retry_handler/ray_remote_tasks_batch_scaling_params.py index eb4faa05..1d4cd56a 100644 --- a/deltacat/utils/ray_utils/retry_handler/ray_remote_tasks_batch_scaling_params.py +++ b/deltacat/utils/ray_utils/retry_handler/ray_remote_tasks_batch_scaling_params.py @@ -1,7 +1,7 @@ from deltacat.utils.ray_utils.retry_handler.task_constants import DEFAULT_RAY_REMOTE_TASK_BATCH_NEGATIVE_FEEDBACK_BATCH_SIZE_MULTIPLICATIVE_DECREASE_FACTOR, DEFAULT_RAY_REMOTE_TASK_BATCH_NEGATIVE_FEEDBACK_BACK_OFF_IN_MS, DEFAULT_RAY_REMOTE_TASK_BATCH_POSITIVE_FEEDBACK_BATCH_SIZE_ADDITIVE_INCREASE from dataclasses import dataclass -class RayRemoteTasksBatchScalingParams(): +class RayRemoteTasksBatchScalingParams(BatchScalingStrategy): """ Represents the batch scaling params of the Ray remote tasks need to add constants that this file refers to @@ -14,4 +14,16 @@ def __init__(self, self.initial_batch_size = initial_batch_size self.negative_feedback_back_off_in_ms = negative_feedback_back_off_in_ms self.positive_feedback_batch_size_additive_increase = positive_feedback_batch_size_additive_increase - self.negative_feedback_batch_size_multiplicative_decrease_factor = negative_feedback_batch_size_multiplicative_decrease_factor \ No newline at end of file + self.negative_feedback_batch_size_multiplicative_decrease_factor = negative_feedback_batch_size_multiplicative_decrease_factor + self.current_batch_size = initial_batch_size + + def get_batch_size(self) -> int: + return self.current_batch_size + + def increase_batch_size(self, current_size: int) -> int: + self.current_batch_size = current_size + self.positive_feedback_batch_size_additive_increase + return self.current_batch_size + + def decrease_batch_size(self, current_size: int) -> int: + self.current_batch_size = int(current_size * self.negative_feedback_batch_size_multiplicative_decrease_factor) + return self.current_batch_size \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/retry.py b/deltacat/utils/ray_utils/retry_handler/ray_task_submission_handler.py similarity index 74% rename from deltacat/utils/ray_utils/retry_handler/retry.py rename to deltacat/utils/ray_utils/retry_handler/ray_task_submission_handler.py index f296717b..d8b46dd2 100644 --- a/deltacat/utils/ray_utils/retry_handler/retry.py +++ b/deltacat/utils/ray_utils/retry_handler/ray_task_submission_handler.py @@ -1,71 +1,53 @@ from __future__ import annotations -from typing import Any, Dict, List, cast +from typing import Any, Dict, List, cast, Optional from deltacat.utils.ray_utils.retry_handler.ray_remote_tasks_batch_scaling_params import RayRemoteTasksBatchScalingParams -#import necessary errors here import ray import time import logging -from deltacat.utils.ray_utils.retry_handler.logger import configure_logger +from deltacat.logs import configure_logger from deltacat.utils.ray_utils.retry_handler.task_execution_error import RayRemoteTaskExecutionError from deltacat.utils.ray_utils.retry_handler.task_info_object import TaskInfoObject from deltacat.utils.ray_utils.retry_handler.retry_strategy_config import get_retry_strategy_config_for_known_exception logger = configure_logger(logging.getLogger(__name__)) -import ray -import time -import logging -from typing import Any, Dict, List, cast -from ray.types import ObjectRef -from RetryExceptions.retryable_exception import RetryableException -from RetryExceptions.non_retryable_exception import NonRetryableException -from RetryExceptions.TaskInfoObject import TaskInfoObject - -#inputs: task_callable, task_input, ray_remote_task_options, exception_retry_strategy_configs -#include a seperate class for errors: break down into retryable and non-retryable -#seperate class to put info in a way that the retry class can handle: ray retry task info - -#This is what specifically retries a single task @ray.remote -def submit_single_task(taskObj: TaskInfoObject, progressNotifier: Optional[NotificationInterface] = None) -> Any: +def submit_single_task(taskObj: TaskInfoObject, TaskContext: Optional[Interface] = None) -> Any: try: taskObj.attempt_count += 1 curr_attempt = taskObj.attempt_count - if progressNotifier is not None: - #method call to straggler detection using notifier + if TaskContext is not None: + # custom logic for checking if taskContext has progress and then use to detect stragglers + #track time/progress in here logger.debug(f"Executing the submitted Ray remote task as part of attempt number: {current_attempt_number}") - return tackObj.task_callable(taskObj.task_input) + return taskObj.task_callable(taskObj.task_input) except (Exception) as exception: - exception_retry_strategy_config = get_retry_strategy_config_for_known_exception(exception, task_info_object.exception_retry_strategy_configs) - #pass to a new method that handles exception strategy - #retry_strat = ...exception_retry_strategy_configs + exception_retry_strategy_config = get_retry_strategy_config_for_known_exception(exception, taskObj.exception_retry_strategy_configs) if exception_retry_strategy_config is not None: - return RayRemoteTaskExecutionError(exception_retry_strategy_config.exception, task_info_object) - - + return RayRemoteTaskExecutionError(exception_retry_strategy_config.exception, taskObj) -class RetryHandler: - #given a list of tasks that are failing, we want to classify the error messages and redirect the task - #depending on the exception type using a wrapper - #wrapper function that before execution, checks what exception is being thrown and go to second method to - #commence retrying +class RayTaskSubmissionHandler: + """ + Executes a single Ray task given task info + """ def execute_task(self, ray_remote_task_info: RayRemoteTaskInfo) -> Any: self.start_tasks_execution([ray_remote_task_info]) return self.wait_and_get_all_task_results()[0] """ - Starts execution of all given Ray remote tasks - """ - def start_tasks_execution(self, ray_remote_task_infos: List[TaskInfoObject]) -> None: - self.start_tasks_execution_in_batches(ray_remote_task_infos, RayRemoteTasksBatchScalingParams(initial_batch_size=len(ray_remote_task_infos))) - - """ - Starts execution of given Ray remote tasks in batches depending on the given Batch scaling params + Starts execution of all given a list of Ray tasks with optional arguments: scaling strategy and straggler detection """ - def start_tasks_execution_in_batches(self, ray_remote_task_infos: List[RayRemoteTaskInfo], batch_scaling_params: RayRemoteTasksBatchScalingParams) -> None: + def start_tasks_execution(self, + ray_remote_task_infos: List[TaskInfoObject], + scaling_strategy: Optional[BatchScalingStrategy] = None, + straggler_detection: Optional[StragglerDetectionInterface] = None) -> None: + if scaling_strategy is None: + scaling_strategy = RayRemoteTasksBatchScalingParams(len(ray_remote_task_infos)) + + #use interface methods and data to detect stragglers in ray self.num_of_submitted_tasks = len(ray_remote_task_infos) - self.current_batch_size = min(batch_scaling_params.initial_batch_size, self.num_of_submitted_tasks) + self.current_batch_size = min(scaling_strategy.get_batch_size, self.num_of_submitted_tasks) self.num_of_submitted_tasks_completed = 0 self.remaining_ray_remote_task_infos = ray_remote_task_infos self.batch_scaling_params = batch_scaling_params @@ -73,25 +55,29 @@ def start_tasks_execution_in_batches(self, ray_remote_task_infos: List[RayRemote self.unfinished_promises: List[Any] = [] logger.info(f"Starting the execution of {len(ray_remote_task_infos)} Ray remote tasks. Concurrency of tasks execution: {self.current_batch_size}") + if straggler_detection is not None: + #feed to non-detection only retry handler + __wait_and_get_all_task_results(self, straggler_detection) self.__submit_tasks(self.remaining_ray_remote_task_infos[:self.current_batch_size]) self.remaining_ray_remote_task_infos = self.remaining_ray_remote_task_infos[self.current_batch_size:] - def wait_and_get_all_task_results(self) -> List[Any]: - return self.wait_and_get_task_results(self.num_of_submitted_tasks) + def __wait_and_get_all_task_results(self, straggler_detection: Optional[StragglerDetectionInterface]) -> List[Any]: + return self.__get_task_results(self.num_of_submitted_tasks, straggler_detection) - def get_task_results(self, num_of_results: int) -> List[Any]: - #implement wrapper here that before execution will try catch an exception - #get what tasks we need to run our execution on - finished, unfinished = ray.wait(unfinished, num_of_results) - #assuming we have the tasks we want to get results of + #Straggler detection will go in here + def __get_task_results(self, num_of_results: int, straggler_detection: Optional[StragglerDetectionInterface]) -> List[Any]: + if straggler_detection is not None: + finished, unfinished = ray.wait(unfinished, num_of_results, straggler_detection.calc_timeout_val) + else: + finished, unfinished = ray.wait(unfinished, num_of_results) for finished in finished: finished_result = None try: finished_result = ray.get(finished) except (Exception) as exception: #if exception send to method handle_ray_exception to determine what to do and assign the corresp error - finished_result = self.handle_ray_exception(exception=exception, TaskInfoObject = )#evaluate the exception and return the error + finished_result = self.handle_ray_exception(exception=exception, ray_remote_task_info=self.task_promise_obj_ref_to_task_info_map[str(finished_promise)] )#evaluate the exception and return the error if finished_result and type(finished_result) == RayRemoteTaskExecutionError: finished_result = cast(RayRemoteTaskExecutionError, finished_result) @@ -130,7 +116,7 @@ def __enqueue_new_tasks(self, num_of_tasks: int) -> None: def __submit_tasks(self, ray_remote_task_infos: List[RayRemoteTaskInfo]) -> None: for ray_remote_task_info in ray_remote_task_infos: time.sleep(0.005) - self.unfinished_promises.append(self.__invoke_ray_remote_task(ray_remote_task_info=ray_remote_task_info)) + self.unfinished_promises.append(self.__invoke_ray_remote_task(ray_remote_task_info)) def __invoke_ray_remote_task(self, ray_remote_task_info: RayRemoteTaskInfo) -> Any: ray_remote_task_options_arguments = dict() diff --git a/deltacat/utils/ray_utils/retry_handler/staggler_detection_interface.py b/deltacat/utils/ray_utils/retry_handler/staggler_detection_interface.py new file mode 100644 index 00000000..d70cadde --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/staggler_detection_interface.py @@ -0,0 +1,25 @@ +from abc import ABC, abstractmethod +from typing import Any + +class StragglerDetectionInterface(ABC): + @abstractmethod + def calc_timeout_val(self, task: Any) -> float: + """ + Determines timeout value for ray.wait() parameter based on client config + """ + pass + + @abstractmethod + def is_straggler(self, task: Any) -> bool: + """ + Given all the info, returns whether this specific task is a straggler or not + """ + pass + + @abstractmethod + def get_progress(self, task: Any) -> float: + """ + + :param task: takes in a task + :return: returns progress output of child task + """ \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/task_info_object.py b/deltacat/utils/ray_utils/retry_handler/task_info_object.py index 9471d7de..2259d65f 100644 --- a/deltacat/utils/ray_utils/retry_handler/task_info_object.py +++ b/deltacat/utils/ray_utils/retry_handler/task_info_object.py @@ -9,7 +9,7 @@ def __init__(self, task_callable: Callable[[Any], [Any], task_input: Any, ray_remote_task_options: RayRemoteTaskOptions = RayRemoteTaskOptions(), - exception_retry_strategy_configs: List[TaskExceptionRetryConfig]): #what inputs do I need here + exception_retry_strategy_configs: List[TaskExceptionRetryConfig]): self.task_callable = task_callable self.task_input = task_input self.ray_remote_task_options = ray_remote_task_options From f333384ba943356aeda4dcb8fb87d2cc0da06d50 Mon Sep 17 00:00:00 2001 From: Ekas Chawla Date: Thu, 20 Jul 2023 17:13:27 -0700 Subject: [PATCH 6/9] changes to interfaces --- .../utils/ray_utils/retry_handler/README.md | 3 +- .../ray_utils/retry_handler/TaskContext.py | 4 ++ .../retry_handler/batch_scaling_strategy.py | 18 --------- .../ray_utils/retry_handler/exception_util.py | 7 +++- .../retry_handler/interface_batch_scaling.py | 22 +++++++++++ .../interface_progress_notifier.py | 23 +++++++++++ .../retry_handler/interface_retry_task.py | 34 +++++++++++++++++ .../interface_straggler_detection.py | 11 ++++++ .../ray_remote_tasks_batch_scaling_params.py | 25 ++++-------- .../ray_task_submission_handler.py | 38 +++++++------------ .../staggler_detection_interface.py | 25 ------------ .../ray_utils/retry_handler/task_constants.py | 32 ---------------- .../task_exception_retry_config.py | 2 +- .../retry_handler/task_info_object.py | 4 +- 14 files changed, 125 insertions(+), 123 deletions(-) create mode 100644 deltacat/utils/ray_utils/retry_handler/TaskContext.py delete mode 100644 deltacat/utils/ray_utils/retry_handler/batch_scaling_strategy.py create mode 100644 deltacat/utils/ray_utils/retry_handler/interface_batch_scaling.py create mode 100644 deltacat/utils/ray_utils/retry_handler/interface_progress_notifier.py create mode 100644 deltacat/utils/ray_utils/retry_handler/interface_retry_task.py create mode 100644 deltacat/utils/ray_utils/retry_handler/interface_straggler_detection.py delete mode 100644 deltacat/utils/ray_utils/retry_handler/staggler_detection_interface.py diff --git a/deltacat/utils/ray_utils/retry_handler/README.md b/deltacat/utils/ray_utils/retry_handler/README.md index 6c250fe2..48bd8ae9 100644 --- a/deltacat/utils/ray_utils/retry_handler/README.md +++ b/deltacat/utils/ray_utils/retry_handler/README.md @@ -10,7 +10,7 @@ Params: Use cases: 1. Notifying progress - a. + a. TaskContext (progressNotifier - (send_heartbeat, send_progress, get_progress), timeout_time) from StragglerDetectionInterface 2. Detecting stragglers Given the straggler detection algorithm fed in by the client, the method get_timeout_val will be used to determine how long the task will run before it is considered a straggler. The logic for this must be provided by the client internally. @@ -20,3 +20,4 @@ Use cases: The client can provide these inputs to fulfil the following use cases: +Given a list of 1000 tasks, we will first scale each batch to a reasonable size and run the retry and detection on each batch diff --git a/deltacat/utils/ray_utils/retry_handler/TaskContext.py b/deltacat/utils/ray_utils/retry_handler/TaskContext.py new file mode 100644 index 00000000..2c41bef5 --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/TaskContext.py @@ -0,0 +1,4 @@ +class TaskContext(): + def __init__(self, progress_notifier: progressNotifierInterface, timeoutTime: float): + self.progress_notifier = progress_notifier + self.timeoutTime = diff --git a/deltacat/utils/ray_utils/retry_handler/batch_scaling_strategy.py b/deltacat/utils/ray_utils/retry_handler/batch_scaling_strategy.py deleted file mode 100644 index f43dd141..00000000 --- a/deltacat/utils/ray_utils/retry_handler/batch_scaling_strategy.py +++ /dev/null @@ -1,18 +0,0 @@ -from abc import ABC, abstractmethod - -class BatchScalingStrategy(ABC): - """ - Interface for a generic batch scaling that the client can provide. - """ - - @abstractmethod - def increase_batch_size(self, current_size: int) -> int: - pass - - @abstractmethod - def decrease_batch_size(self, current_size: int) -> int: - pass - - @abstractmethod - def get_batch_size(self) -> int: - pass diff --git a/deltacat/utils/ray_utils/retry_handler/exception_util.py b/deltacat/utils/ray_utils/retry_handler/exception_util.py index b75b0e28..7f633a77 100644 --- a/deltacat/utils/ray_utils/retry_handler/exception_util.py +++ b/deltacat/utils/ray_utils/retry_handler/exception_util.py @@ -1,8 +1,11 @@ from typing import List, Optional from ray_manager.models.ray_remote_task_exception_retry_strategy_config import RayRemoteTaskExceptionRetryConfig - -def get_retry_strategy_config_for_known_exception(exception: Exception, exception_retry_strategy_configs: List[RayRemoteTaskExceptionRetryConfig]) -> Optional[RayRemoteTaskExceptionRetryConfig]: +""" +Checks whether the exception seen is recognized as a retryable error or not +""" +def get_retry_strategy_config_for_known_exception(exception: Exception, + exception_retry_strategy_configs: List[RayRemoteTaskExceptionRetryConfig]) -> Optional[RayRemoteTaskExceptionRetryConfig]: for exception_retry_strategy_config in exception_retry_strategy_configs: if type(exception) == type(exception_retry_strategy_config.exception): return exception_retry_strategy_config diff --git a/deltacat/utils/ray_utils/retry_handler/interface_batch_scaling.py b/deltacat/utils/ray_utils/retry_handler/interface_batch_scaling.py new file mode 100644 index 00000000..3866b095 --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/interface_batch_scaling.py @@ -0,0 +1,22 @@ +from abc import ABC, abstractmethod +from typing import List + +class BatchScalingInterface(ABC): + """ + Interface for a generic batch scaling that the client can provide. + """ + """ + Loads all tasks to be executed for retry and straggler detection + """ + def init_tasks(self, task_infos): + pass + """ + Gets the next batch of x size to execute on + """ + def next_batch(self, task_info) -> List: + pass + """ + Returns true if there are tasks remaining in the overall List of tasks + """ + def has_next_batch(self, running_tasks) -> bool: + pass diff --git a/deltacat/utils/ray_utils/retry_handler/interface_progress_notifier.py b/deltacat/utils/ray_utils/retry_handler/interface_progress_notifier.py new file mode 100644 index 00000000..89b8cc32 --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/interface_progress_notifier.py @@ -0,0 +1,23 @@ +from abc import ABC, abstractmethod + +class ProgressNotifierInterface(ABC): + """ + Gets progress message regarding current task + """ + @abstractmethod + def get_progress(self, task): + pass + + """ + Tells parent task if the current task has a heartbeat or not + """ + @abstractmethod + def has_heartbeat(self, task) -> bool: + pass + + """ + Sends progress of current task to parent task + """ + @abstractmethod + def send_progress(self, task): + pass \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/interface_retry_task.py b/deltacat/utils/ray_utils/retry_handler/interface_retry_task.py new file mode 100644 index 00000000..32fd61e9 --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/interface_retry_task.py @@ -0,0 +1,34 @@ +from abc import ABC, abstractmethod +class RetryTaskInterface(ABC): + @abstractmethod + def init_tasks(self, task_infos): + """ + Loads all tasks to check for retries if exception + :param task_infos: + :return: List of tasks + """ + pass + @abstractmethod + def should_retry(self, task) -> bool: + """ + Given a task, determine whether it can be retried or not + :param task: + :return: True or False + """ + pass + @abstractmethod + def get_wait_time(self, task): + """ + Wait time between retries + :param task: + :return: + """ + pass + @abstractmethod + def retry(self, task): + """ + Executes retry behavior for the exception + :param task: + :return: + """ + pass \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/interface_straggler_detection.py b/deltacat/utils/ray_utils/retry_handler/interface_straggler_detection.py new file mode 100644 index 00000000..4c8fd13b --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/interface_straggler_detection.py @@ -0,0 +1,11 @@ +from abc import ABC, abstractmethod +from typing import Any + +class StragglerDetectionInterface(ABC): + + @abstractmethod + def is_straggler(self, task, task_context) -> bool: + """ + Given all the info, returns whether this specific task is a straggler or not + """ + pass \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/ray_remote_tasks_batch_scaling_params.py b/deltacat/utils/ray_utils/retry_handler/ray_remote_tasks_batch_scaling_params.py index 1d4cd56a..dfc7c618 100644 --- a/deltacat/utils/ray_utils/retry_handler/ray_remote_tasks_batch_scaling_params.py +++ b/deltacat/utils/ray_utils/retry_handler/ray_remote_tasks_batch_scaling_params.py @@ -7,23 +7,14 @@ class RayRemoteTasksBatchScalingParams(BatchScalingStrategy): need to add constants that this file refers to """ def __init__(self, - initial_batch_size: int, - negative_feedback_back_off_in_ms: int = DEFAULT_RAY_REMOTE_TASK_BATCH_NEGATIVE_FEEDBACK_BACK_OFF_IN_MS, - positive_feedback_batch_size_additive_increase: int = DEFAULT_RAY_REMOTE_TASK_BATCH_POSITIVE_FEEDBACK_BATCH_SIZE_ADDITIVE_INCREASE, - negative_feedback_batch_size_multiplicative_decrease_factor: int = DEFAULT_RAY_REMOTE_TASK_BATCH_NEGATIVE_FEEDBACK_BATCH_SIZE_MULTIPLICATIVE_DECREASE_FACTOR): - self.initial_batch_size = initial_batch_size - self.negative_feedback_back_off_in_ms = negative_feedback_back_off_in_ms - self.positive_feedback_batch_size_additive_increase = positive_feedback_batch_size_additive_increase - self.negative_feedback_batch_size_multiplicative_decrease_factor = negative_feedback_batch_size_multiplicative_decrease_factor - self.current_batch_size = initial_batch_size + straggler_detection: StragglerDetectionInterface): + self.straggler_detection = straggler_detection - def get_batch_size(self) -> int: - return self.current_batch_size + def init_tasks(self, task_infos): + pass - def increase_batch_size(self, current_size: int) -> int: - self.current_batch_size = current_size + self.positive_feedback_batch_size_additive_increase - return self.current_batch_size + def next_batch(self, task_info) -> List: + pass - def decrease_batch_size(self, current_size: int) -> int: - self.current_batch_size = int(current_size * self.negative_feedback_batch_size_multiplicative_decrease_factor) - return self.current_batch_size \ No newline at end of file + def has_next_batch(self, running_tasks) -> bool: + pass \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/ray_task_submission_handler.py b/deltacat/utils/ray_utils/retry_handler/ray_task_submission_handler.py index d8b46dd2..8448e04b 100644 --- a/deltacat/utils/ray_utils/retry_handler/ray_task_submission_handler.py +++ b/deltacat/utils/ray_utils/retry_handler/ray_task_submission_handler.py @@ -28,22 +28,22 @@ def submit_single_task(taskObj: TaskInfoObject, TaskContext: Optional[Interface] class RayTaskSubmissionHandler: - """ - Executes a single Ray task given task info - """ - def execute_task(self, ray_remote_task_info: RayRemoteTaskInfo) -> Any: - self.start_tasks_execution([ray_remote_task_info]) - return self.wait_and_get_all_task_results()[0] - """ Starts execution of all given a list of Ray tasks with optional arguments: scaling strategy and straggler detection """ def start_tasks_execution(self, ray_remote_task_infos: List[TaskInfoObject], scaling_strategy: Optional[BatchScalingStrategy] = None, - straggler_detection: Optional[StragglerDetectionInterface] = None) -> None: + straggler_detection: Optional[StragglerDetectionInterface] = None, + task_context: Optional[TaskContext]) -> None: if scaling_strategy is None: scaling_strategy = RayRemoteTasksBatchScalingParams(len(ray_remote_task_infos)) + while scaling_strategy.hasNextBatch: + current_batch = scaling_strategy.next_batch() + for tasks in current_batch: + #execute and retry and detect straggler if avail + + #use interface methods and data to detect stragglers in ray self.num_of_submitted_tasks = len(ray_remote_task_infos) @@ -57,9 +57,10 @@ def start_tasks_execution(self, logger.info(f"Starting the execution of {len(ray_remote_task_infos)} Ray remote tasks. Concurrency of tasks execution: {self.current_batch_size}") if straggler_detection is not None: #feed to non-detection only retry handler - __wait_and_get_all_task_results(self, straggler_detection) - self.__submit_tasks(self.remaining_ray_remote_task_infos[:self.current_batch_size]) - self.remaining_ray_remote_task_infos = self.remaining_ray_remote_task_infos[self.current_batch_size:] + self.__wait_and_get_all_task_results(straggler_detection) + else: + self.__submit_tasks(self.remaining_ray_remote_task_infos[:self.current_batch_size]) + self.remaining_ray_remote_task_infos = self.remaining_ray_remote_task_infos[self.current_batch_size:] def __wait_and_get_all_task_results(self, straggler_detection: Optional[StragglerDetectionInterface]) -> List[Any]: @@ -145,17 +146,4 @@ def __update_ray_remote_task_options_on_exception(self, exception: Exception, ra def __handle_ray_exception(self, exception: Exception, ray_remote_task_info: RayRemoteTaskInfo) -> RayRemoteTaskExecutionError: logger.error(f"Ray remote task failed with {type(exception)} Ray exception: {exception}") - if type(exception).__name__ == "RayTaskError(UnexpectedRayTaskError)": - raise UnexpectedRayTaskError(str(exception)) - elif type(exception).__name__ == "RayTaskError(RayOutOfMemoryError)": - return RayRemoteTaskExecutionError(exception=RayOutOfMemoryError(str(exception)), ray_remote_task_info=ray_remote_task_info) - elif type(exception) == ray.exceptions.OwnerDiedError: - return RayRemoteTaskExecutionError(exception=RayOwnerDiedError(str(exception)), ray_remote_task_info=ray_remote_task_info) - elif type(exception) == ray.exceptions.WorkerCrashedError: - return RayRemoteTaskExecutionError(exception=RayWorkerCrashedError(str(exception)), ray_remote_task_info=ray_remote_task_info) - elif type(exception) == ray.exceptions.LocalRayletDiedError: - return RayRemoteTaskExecutionError(exception=RayLocalRayletDiedError(str(exception)), ray_remote_task_info=ray_remote_task_info) - elif type(exception) == ray.exceptions.RaySystemError: - return RayRemoteTaskExecutionError(exception=RaySystemError(str(exception)), ray_remote_task_info=ray_remote_task_info) - - raise UnexpectedRayPlatformError(str(exception)) \ No newline at end of file + if type(exception).__name__ == \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/staggler_detection_interface.py b/deltacat/utils/ray_utils/retry_handler/staggler_detection_interface.py deleted file mode 100644 index d70cadde..00000000 --- a/deltacat/utils/ray_utils/retry_handler/staggler_detection_interface.py +++ /dev/null @@ -1,25 +0,0 @@ -from abc import ABC, abstractmethod -from typing import Any - -class StragglerDetectionInterface(ABC): - @abstractmethod - def calc_timeout_val(self, task: Any) -> float: - """ - Determines timeout value for ray.wait() parameter based on client config - """ - pass - - @abstractmethod - def is_straggler(self, task: Any) -> bool: - """ - Given all the info, returns whether this specific task is a straggler or not - """ - pass - - @abstractmethod - def get_progress(self, task: Any) -> float: - """ - - :param task: takes in a task - :return: returns progress output of child task - """ \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/task_constants.py b/deltacat/utils/ray_utils/retry_handler/task_constants.py index ff09b49e..e69de29b 100644 --- a/deltacat/utils/ray_utils/retry_handler/task_constants.py +++ b/deltacat/utils/ray_utils/retry_handler/task_constants.py @@ -1,32 +0,0 @@ -""" -Default max retry attempts of Ray remote task -""" -DEFAULT_MAX_RAY_REMOTE_TASK_RETRY_ATTEMPTS = 3 -""" -Default initial backoff before Ray remote task retry, in milli seconds -""" -DEFAULT_RAY_REMOTE_TASK_RETRY_INITIAL_BACK_OFF_IN_MS = 5000 -""" -Default Ray remote task retry back off factor -""" -DEFAULT_RAY_REMOTE_TASK_RETRY_BACK_OFF_FACTOR = 2 -""" -Default Ray remote task memory multiplication factor -""" -DEFAULT_RAY_REMOTE_TASK_MEMORY_MULTIPLICATION_FACTOR = 1 -""" -Ray remote task memory multiplication factor for Ray out of memory error -""" -RAY_REMOTE_TASK_MEMORY_MULTIPLICATION_FACTOR_FOR_OUT_OF_MEMORY_ERROR = 2 -""" -Default Ray remote task batch negative feedback back off in milli seconds -""" -DEFAULT_RAY_REMOTE_TASK_BATCH_NEGATIVE_FEEDBACK_BACK_OFF_IN_MS = 0 -""" -Default Ray remote task batch positive feedback batch size additive increase -""" -DEFAULT_RAY_REMOTE_TASK_BATCH_POSITIVE_FEEDBACK_BATCH_SIZE_ADDITIVE_INCREASE = 0 -""" -Default Ray remote task batch positive feedback batch size multiplicative decrease factor -""" -DEFAULT_RAY_REMOTE_TASK_BATCH_NEGATIVE_FEEDBACK_BATCH_SIZE_MULTIPLICATIVE_DECREASE_FACTOR = 1 \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/task_exception_retry_config.py b/deltacat/utils/ray_utils/retry_handler/task_exception_retry_config.py index a0e0cbc5..060382cf 100644 --- a/deltacat/utils/ray_utils/retry_handler/task_exception_retry_config.py +++ b/deltacat/utils/ray_utils/retry_handler/task_exception_retry_config.py @@ -3,7 +3,7 @@ from typing import List from deltacat.utils.ray_utils.retry_handler.task_constants import DEFAULT_RAY_REMOTE_TASK_BATCH_NEGATIVE_FEEDBACK_BATCH_SIZE_MULTIPLICATIVE_DECREASE_FACTOR, DEFAULT_RAY_REMOTE_TASK_BATCH_NEGATIVE_FEEDBACK_BACK_OFF_IN_MS, DEFAULT_RAY_REMOTE_TASK_BATCH_POSITIVE_FEEDBACK_BATCH_SIZE_ADDITIVE_INCREASE -class TaskExceptionRetryConfig(): +class TaskExceptionRetryConfig: def __init__(self, exception: Exception, max_retry_attempts: int = DEFAULT_MAX_RAY_REMOTE_TASK_RETRY_ATTEMPTS, initial_back_off_in_ms: int = DEFAULT_RAY_REMOTE_TASK_RETRY_INITIAL_BACK_OFF_IN_MS, diff --git a/deltacat/utils/ray_utils/retry_handler/task_info_object.py b/deltacat/utils/ray_utils/retry_handler/task_info_object.py index 2259d65f..225f2bb4 100644 --- a/deltacat/utils/ray_utils/retry_handler/task_info_object.py +++ b/deltacat/utils/ray_utils/retry_handler/task_info_object.py @@ -4,9 +4,9 @@ from deltacat.utils.ray_utils.retry_handler.task_options import RayRemoteTaskOptions @dataclass -Class TaskInfoObject: +class TaskInfoObject: def __init__(self, - task_callable: Callable[[Any], [Any], + task_callable: Callable[[Any], [Any]], task_input: Any, ray_remote_task_options: RayRemoteTaskOptions = RayRemoteTaskOptions(), exception_retry_strategy_configs: List[TaskExceptionRetryConfig]): From 9dc1c10ef72c504c522ef5d4babc6e83f7528ffb Mon Sep 17 00:00:00 2001 From: Ekas Chawla Date: Wed, 26 Jul 2023 18:43:00 -0700 Subject: [PATCH 7/9] skeleton CR final --- .../utils/ray_utils/retry_handler/README.md | 18 ++- .../ray_utils/retry_handler/TaskContext.py | 4 - .../retry_handler/batch_scaling_interface.py | 28 ++++ .../ray_utils/retry_handler/exception_util.py | 7 +- ...security_token_rate_exceeded_exception.py} | 2 +- .../aws_security_token_service_exception.py | 6 - .../retry_handler/failures/broken_pipe.py | 6 - .../failures/cairns_resource_not_found.py | 6 - .../failures/manifest_generation_exception.py | 6 - .../failures/non_retryable_error.py | 14 +- .../retry_handler/failures/retryable_error.py | 9 +- .../failures/upload_part_throttle.py | 6 - .../retry_handler/interface_batch_scaling.py | 22 ---- .../interface_progress_notifier.py | 23 ---- .../retry_handler/interface_retry_task.py | 34 ----- .../interface_straggler_detection.py | 11 -- .../progress_notifier_interface.py | 17 +++ .../ray_remote_tasks_batch_scaling_params.py | 20 --- ...ray_remote_tasks_batch_scaling_strategy.py | 24 ++++ .../ray_task_submission_handler.py | 121 ++++++++++-------- .../retry_handler/retry_task_interface.py | 28 ++++ .../straggler_detection_interface.py | 12 ++ .../ray_utils/retry_handler/task_constants.py | 0 .../ray_utils/retry_handler/task_context.py | 11 ++ .../task_exception_retry_config.py | 9 +- .../retry_handler/task_execution_error.py | 2 +- .../retry_handler/task_info_object.py | 9 +- 27 files changed, 222 insertions(+), 233 deletions(-) delete mode 100644 deltacat/utils/ray_utils/retry_handler/TaskContext.py create mode 100644 deltacat/utils/ray_utils/retry_handler/batch_scaling_interface.py rename deltacat/utils/ray_utils/retry_handler/failures/{port_conflict.py => aws_security_token_rate_exceeded_exception.py} (73%) delete mode 100644 deltacat/utils/ray_utils/retry_handler/failures/aws_security_token_service_exception.py delete mode 100644 deltacat/utils/ray_utils/retry_handler/failures/broken_pipe.py delete mode 100644 deltacat/utils/ray_utils/retry_handler/failures/cairns_resource_not_found.py delete mode 100644 deltacat/utils/ray_utils/retry_handler/failures/manifest_generation_exception.py delete mode 100644 deltacat/utils/ray_utils/retry_handler/failures/upload_part_throttle.py delete mode 100644 deltacat/utils/ray_utils/retry_handler/interface_batch_scaling.py delete mode 100644 deltacat/utils/ray_utils/retry_handler/interface_progress_notifier.py delete mode 100644 deltacat/utils/ray_utils/retry_handler/interface_retry_task.py delete mode 100644 deltacat/utils/ray_utils/retry_handler/interface_straggler_detection.py create mode 100644 deltacat/utils/ray_utils/retry_handler/progress_notifier_interface.py delete mode 100644 deltacat/utils/ray_utils/retry_handler/ray_remote_tasks_batch_scaling_params.py create mode 100644 deltacat/utils/ray_utils/retry_handler/ray_remote_tasks_batch_scaling_strategy.py create mode 100644 deltacat/utils/ray_utils/retry_handler/retry_task_interface.py create mode 100644 deltacat/utils/ray_utils/retry_handler/straggler_detection_interface.py delete mode 100644 deltacat/utils/ray_utils/retry_handler/task_constants.py create mode 100644 deltacat/utils/ray_utils/retry_handler/task_context.py diff --git a/deltacat/utils/ray_utils/retry_handler/README.md b/deltacat/utils/ray_utils/retry_handler/README.md index 48bd8ae9..c7a410c9 100644 --- a/deltacat/utils/ray_utils/retry_handler/README.md +++ b/deltacat/utils/ray_utils/retry_handler/README.md @@ -10,14 +10,18 @@ Params: Use cases: 1. Notifying progress - a. TaskContext (progressNotifier - (send_heartbeat, send_progress, get_progress), timeout_time) from StragglerDetectionInterface + This will be done through ProgressNotifierInterface. The client can use has_progress and send_progress + to recieve updates on task level progress. This can be an SNSQueue or any type of indicator the client may choose. 2. Detecting stragglers - Given the straggler detection algorithm fed in by the client, the method get_timeout_val will be used to determine how - long the task will run before it is considered a straggler. The logic for this must be provided by the client internally. + Given the straggler detection algorithm implemented by StragglerDetectionInterface, the method is_straggler will inform + the customer if the current node is a straggler according to their own logic and proving them with TaskContext, the information + they might need to make that decision. 3. Retrying retryable exceptions - a. Within the failure directory, there are common errors that are retryable and when detected as an instance - of the retryable class, will cause the task to be retried through submitting the task. + Within the failure directory, there are common errors that are retryable and when detected as an instance + of the retryable class, will cause the task to be retried when the exception is caught. If the client would like + to create their own exceptions to be handles, they can create a class that is an extension of retryable_error or + non_retryable_error and the framework should handle it based on the configuration strategy. + + -The client can provide these inputs to fulfil the following use cases: -Given a list of 1000 tasks, we will first scale each batch to a reasonable size and run the retry and detection on each batch diff --git a/deltacat/utils/ray_utils/retry_handler/TaskContext.py b/deltacat/utils/ray_utils/retry_handler/TaskContext.py deleted file mode 100644 index 2c41bef5..00000000 --- a/deltacat/utils/ray_utils/retry_handler/TaskContext.py +++ /dev/null @@ -1,4 +0,0 @@ -class TaskContext(): - def __init__(self, progress_notifier: progressNotifierInterface, timeoutTime: float): - self.progress_notifier = progress_notifier - self.timeoutTime = diff --git a/deltacat/utils/ray_utils/retry_handler/batch_scaling_interface.py b/deltacat/utils/ray_utils/retry_handler/batch_scaling_interface.py new file mode 100644 index 00000000..3dff5985 --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/batch_scaling_interface.py @@ -0,0 +1,28 @@ +from typing import List, Any, Protocol +from deltacat.utils.ray_utils.retry_handler.task_info_object import TaskInfoObject +class BatchScalingInterface(Protocol): + """ + Interface for a generic batch scaling that the client can provide. + """ + def init_tasks(self, initial_batch_size: int, max_batch_size: int, min_batch_size: int, task_infos: List[Any]) -> None: + """ + Loads all tasks to be executed for retry and straggler detection + """ + pass + def has_next_batch(self) -> bool: + """ + Returns true if there are tasks remaining in the overall List of tasks to create a new batch + """ + pass + def next_batch(self, task_info: TaskInfoObject) -> List: + """ + Gets the next batch to execute on + """ + pass + def mark_task_complete(self, task_info: TaskInfoObject) -> List: + """ + If the task has been completed, mark some field of it as true + so we know what tasks are completed and what need to be executed + """ + pass + diff --git a/deltacat/utils/ray_utils/retry_handler/exception_util.py b/deltacat/utils/ray_utils/retry_handler/exception_util.py index 7f633a77..53087187 100644 --- a/deltacat/utils/ray_utils/retry_handler/exception_util.py +++ b/deltacat/utils/ray_utils/retry_handler/exception_util.py @@ -1,11 +1,10 @@ from typing import List, Optional - from ray_manager.models.ray_remote_task_exception_retry_strategy_config import RayRemoteTaskExceptionRetryConfig -""" -Checks whether the exception seen is recognized as a retryable error or not -""" def get_retry_strategy_config_for_known_exception(exception: Exception, exception_retry_strategy_configs: List[RayRemoteTaskExceptionRetryConfig]) -> Optional[RayRemoteTaskExceptionRetryConfig]: + """ + Checks whether the exception seen is recognized as a retryable error or not + """ for exception_retry_strategy_config in exception_retry_strategy_configs: if type(exception) == type(exception_retry_strategy_config.exception): return exception_retry_strategy_config diff --git a/deltacat/utils/ray_utils/retry_handler/failures/port_conflict.py b/deltacat/utils/ray_utils/retry_handler/failures/aws_security_token_rate_exceeded_exception.py similarity index 73% rename from deltacat/utils/ray_utils/retry_handler/failures/port_conflict.py rename to deltacat/utils/ray_utils/retry_handler/failures/aws_security_token_rate_exceeded_exception.py index a92da6ac..c275941b 100644 --- a/deltacat/utils/ray_utils/retry_handler/failures/port_conflict.py +++ b/deltacat/utils/ray_utils/retry_handler/failures/aws_security_token_rate_exceeded_exception.py @@ -1,6 +1,6 @@ from deltacat.utils.ray_utils.retry_handler.retryable_error.failures import RetryableError -class PortConflict(RetryableError): +class AWSSecurityTokenRateExceededException(RetryableError): def __init__(self, *args: object) -> None: super().__init__(*args) \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/failures/aws_security_token_service_exception.py b/deltacat/utils/ray_utils/retry_handler/failures/aws_security_token_service_exception.py deleted file mode 100644 index ee9c776c..00000000 --- a/deltacat/utils/ray_utils/retry_handler/failures/aws_security_token_service_exception.py +++ /dev/null @@ -1,6 +0,0 @@ -from deltacat.utils.ray_utils.retry_handler.retryable_error.failures import RetryableError - -class AWSSecurityTokenException(RetryableError): - - def __init__(self, *args: object) -> None: - super().__init__(*args) \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/failures/broken_pipe.py b/deltacat/utils/ray_utils/retry_handler/failures/broken_pipe.py deleted file mode 100644 index 537f00c6..00000000 --- a/deltacat/utils/ray_utils/retry_handler/failures/broken_pipe.py +++ /dev/null @@ -1,6 +0,0 @@ -from deltacat.utils.ray_utils.retry_handler.retryable_error.failures import RetryableError - -class BrokenPipe(RetryableError): - - def __init__(self, *args: object) -> None: - super().__init__(*args) \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/failures/cairns_resource_not_found.py b/deltacat/utils/ray_utils/retry_handler/failures/cairns_resource_not_found.py deleted file mode 100644 index 1e212cd7..00000000 --- a/deltacat/utils/ray_utils/retry_handler/failures/cairns_resource_not_found.py +++ /dev/null @@ -1,6 +0,0 @@ -from deltacat.utils.ray_utils.retry_handler.retryable_error.failures import NonRetryableError - -class CairnsResourceNotFound(NonRetryableError): - - def __init__(self, *args: object) -> None: - super().__init__(*args) \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/failures/manifest_generation_exception.py b/deltacat/utils/ray_utils/retry_handler/failures/manifest_generation_exception.py deleted file mode 100644 index a5cdfbc4..00000000 --- a/deltacat/utils/ray_utils/retry_handler/failures/manifest_generation_exception.py +++ /dev/null @@ -1,6 +0,0 @@ -from deltacat.utils.ray_utils.retry_handler.retryable_error.failures import NonRetryableError - -class ManifestGenerationException(NonRetryableError): - - def __init__(self, *args: object) -> None: - super().__init__(*args) \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/failures/non_retryable_error.py b/deltacat/utils/ray_utils/retry_handler/failures/non_retryable_error.py index 3699b167..c549093f 100644 --- a/deltacat/utils/ray_utils/retry_handler/failures/non_retryable_error.py +++ b/deltacat/utils/ray_utils/retry_handler/failures/non_retryable_error.py @@ -1,7 +1,7 @@ -class NonRetryableError(RuntimeError): -""" -Class represents a non-retryable error -""" - - def __init__(self, *args:object) --> None: - super().__init__(*args) +from exceptions import Exception +class NonRetryableError(Exception): + """ + Class represents a non-retryable error + """ + def __init__(self, *args: object): + super().__init__(*args) \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/failures/retryable_error.py b/deltacat/utils/ray_utils/retry_handler/failures/retryable_error.py index 6b75ce8f..c3746a65 100644 --- a/deltacat/utils/ray_utils/retry_handler/failures/retryable_error.py +++ b/deltacat/utils/ray_utils/retry_handler/failures/retryable_error.py @@ -1,6 +1,7 @@ -class RetryableError(RuntimeError): -""" -class for errors that can be retried -""" +from exceptions import Exception +class RetryableError(Exception): + """ + Class for errors that can be retried + """ def __init__(self, *args: object) --> None: super().__init__(*args) \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/failures/upload_part_throttle.py b/deltacat/utils/ray_utils/retry_handler/failures/upload_part_throttle.py deleted file mode 100644 index dd2e5c31..00000000 --- a/deltacat/utils/ray_utils/retry_handler/failures/upload_part_throttle.py +++ /dev/null @@ -1,6 +0,0 @@ -from deltacat.utils.ray_utils.retry_handler.retryable_error.failures import RetryableError - -class UploadPartThrottle(RetryableError): - - def __init__(self, *args: object) -> None: - super().__init__(*args) \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/interface_batch_scaling.py b/deltacat/utils/ray_utils/retry_handler/interface_batch_scaling.py deleted file mode 100644 index 3866b095..00000000 --- a/deltacat/utils/ray_utils/retry_handler/interface_batch_scaling.py +++ /dev/null @@ -1,22 +0,0 @@ -from abc import ABC, abstractmethod -from typing import List - -class BatchScalingInterface(ABC): - """ - Interface for a generic batch scaling that the client can provide. - """ - """ - Loads all tasks to be executed for retry and straggler detection - """ - def init_tasks(self, task_infos): - pass - """ - Gets the next batch of x size to execute on - """ - def next_batch(self, task_info) -> List: - pass - """ - Returns true if there are tasks remaining in the overall List of tasks - """ - def has_next_batch(self, running_tasks) -> bool: - pass diff --git a/deltacat/utils/ray_utils/retry_handler/interface_progress_notifier.py b/deltacat/utils/ray_utils/retry_handler/interface_progress_notifier.py deleted file mode 100644 index 89b8cc32..00000000 --- a/deltacat/utils/ray_utils/retry_handler/interface_progress_notifier.py +++ /dev/null @@ -1,23 +0,0 @@ -from abc import ABC, abstractmethod - -class ProgressNotifierInterface(ABC): - """ - Gets progress message regarding current task - """ - @abstractmethod - def get_progress(self, task): - pass - - """ - Tells parent task if the current task has a heartbeat or not - """ - @abstractmethod - def has_heartbeat(self, task) -> bool: - pass - - """ - Sends progress of current task to parent task - """ - @abstractmethod - def send_progress(self, task): - pass \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/interface_retry_task.py b/deltacat/utils/ray_utils/retry_handler/interface_retry_task.py deleted file mode 100644 index 32fd61e9..00000000 --- a/deltacat/utils/ray_utils/retry_handler/interface_retry_task.py +++ /dev/null @@ -1,34 +0,0 @@ -from abc import ABC, abstractmethod -class RetryTaskInterface(ABC): - @abstractmethod - def init_tasks(self, task_infos): - """ - Loads all tasks to check for retries if exception - :param task_infos: - :return: List of tasks - """ - pass - @abstractmethod - def should_retry(self, task) -> bool: - """ - Given a task, determine whether it can be retried or not - :param task: - :return: True or False - """ - pass - @abstractmethod - def get_wait_time(self, task): - """ - Wait time between retries - :param task: - :return: - """ - pass - @abstractmethod - def retry(self, task): - """ - Executes retry behavior for the exception - :param task: - :return: - """ - pass \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/interface_straggler_detection.py b/deltacat/utils/ray_utils/retry_handler/interface_straggler_detection.py deleted file mode 100644 index 4c8fd13b..00000000 --- a/deltacat/utils/ray_utils/retry_handler/interface_straggler_detection.py +++ /dev/null @@ -1,11 +0,0 @@ -from abc import ABC, abstractmethod -from typing import Any - -class StragglerDetectionInterface(ABC): - - @abstractmethod - def is_straggler(self, task, task_context) -> bool: - """ - Given all the info, returns whether this specific task is a straggler or not - """ - pass \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/progress_notifier_interface.py b/deltacat/utils/ray_utils/retry_handler/progress_notifier_interface.py new file mode 100644 index 00000000..fac7c81d --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/progress_notifier_interface.py @@ -0,0 +1,17 @@ +from typing import List, Protocol +from deltacat.utils.ray_utils.retry_handler.task_info_object import TaskInfoObject +class ProgressNotifierInterface(Protocol): + """ + Interface for client injected progress notification system. + """ + def has_heartbeat(self, task_info: TaskInfoObject) -> bool: + """ + Sends progress of current task to parent task + """ + pass + def send_heartbeat(self, parent_task_info: TaskInfoObject) -> bool: + """ + Tells parent task if the current task has a heartbeat or not + """ + pass + diff --git a/deltacat/utils/ray_utils/retry_handler/ray_remote_tasks_batch_scaling_params.py b/deltacat/utils/ray_utils/retry_handler/ray_remote_tasks_batch_scaling_params.py deleted file mode 100644 index dfc7c618..00000000 --- a/deltacat/utils/ray_utils/retry_handler/ray_remote_tasks_batch_scaling_params.py +++ /dev/null @@ -1,20 +0,0 @@ -from deltacat.utils.ray_utils.retry_handler.task_constants import DEFAULT_RAY_REMOTE_TASK_BATCH_NEGATIVE_FEEDBACK_BATCH_SIZE_MULTIPLICATIVE_DECREASE_FACTOR, DEFAULT_RAY_REMOTE_TASK_BATCH_NEGATIVE_FEEDBACK_BACK_OFF_IN_MS, DEFAULT_RAY_REMOTE_TASK_BATCH_POSITIVE_FEEDBACK_BATCH_SIZE_ADDITIVE_INCREASE -from dataclasses import dataclass - -class RayRemoteTasksBatchScalingParams(BatchScalingStrategy): - """ - Represents the batch scaling params of the Ray remote tasks - need to add constants that this file refers to - """ - def __init__(self, - straggler_detection: StragglerDetectionInterface): - self.straggler_detection = straggler_detection - - def init_tasks(self, task_infos): - pass - - def next_batch(self, task_info) -> List: - pass - - def has_next_batch(self, running_tasks) -> bool: - pass \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/ray_remote_tasks_batch_scaling_strategy.py b/deltacat/utils/ray_utils/retry_handler/ray_remote_tasks_batch_scaling_strategy.py new file mode 100644 index 00000000..4af88d6d --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/ray_remote_tasks_batch_scaling_strategy.py @@ -0,0 +1,24 @@ +from deltacat.utils.ray_utils.retry_handler.batch_scaling_interface import BatchScalingInterface +class RayRemoteTasksBatchScalingStrategy(BatchScalingInterface): + """ + Default batch scaling parameters for if the client does not provide their own batch_scaling parameters + """ + + def init_tasks(self)-> None: + """ + Setup AIMD scaling for the batches as the default + """ + pass + + def next_batch(self) -> List: + """ + Returns the list of tasks included in the next batch of whatever size based on AIMD + """ + + pass + + def has_next_batch(self) -> bool: + """ + If there are no more tasks to execute that can not create a batch, return False + """ + pass \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/ray_task_submission_handler.py b/deltacat/utils/ray_utils/retry_handler/ray_task_submission_handler.py index 8448e04b..85ee5b17 100644 --- a/deltacat/utils/ray_utils/retry_handler/ray_task_submission_handler.py +++ b/deltacat/utils/ray_utils/retry_handler/ray_task_submission_handler.py @@ -1,6 +1,6 @@ from __future__ import annotations from typing import Any, Dict, List, cast, Optional -from deltacat.utils.ray_utils.retry_handler.ray_remote_tasks_batch_scaling_params import RayRemoteTasksBatchScalingParams +from deltacat.utils.ray_utils.retry_handler.ray_remote_tasks_batch_scaling_strategy import RayRemoteTasksBatchScalingStrategy import ray import time import logging @@ -13,12 +13,13 @@ @ray.remote def submit_single_task(taskObj: TaskInfoObject, TaskContext: Optional[Interface] = None) -> Any: + """ + Submits a single task for execution, handles any exceptions that may occur during execution, + and applies appropriate retry strategies if they are defined. + """ try: taskObj.attempt_count += 1 curr_attempt = taskObj.attempt_count - if TaskContext is not None: - # custom logic for checking if taskContext has progress and then use to detect stragglers - #track time/progress in here logger.debug(f"Executing the submitted Ray remote task as part of attempt number: {current_attempt_number}") return taskObj.task_callable(taskObj.task_input) except (Exception) as exception: @@ -26,6 +27,8 @@ def submit_single_task(taskObj: TaskInfoObject, TaskContext: Optional[Interface] if exception_retry_strategy_config is not None: return RayRemoteTaskExecutionError(exception_retry_strategy_config.exception, taskObj) + logger.error(f"The exception thrown by submitted Ray task during attempt number: {current_attempt_number} is non-retryable or unexpected, hence throwing Non retryable exception: {exception}") + raise UnexpectedRayTaskError(str(exception)) class RayTaskSubmissionHandler: """ @@ -36,59 +39,58 @@ def start_tasks_execution(self, scaling_strategy: Optional[BatchScalingStrategy] = None, straggler_detection: Optional[StragglerDetectionInterface] = None, task_context: Optional[TaskContext]) -> None: + """ + Prepares and initiates the execution of a batch of tasks and can optionally support + custom client batch scaling, straggler detection, and task context + """ if scaling_strategy is None: - scaling_strategy = RayRemoteTasksBatchScalingParams(len(ray_remote_task_infos)) - while scaling_strategy.hasNextBatch: - current_batch = scaling_strategy.next_batch() - for tasks in current_batch: - #execute and retry and detect straggler if avail - - + scaling_strategy = RayRemoteTasksBatchScalingStrategy(ray_remote_task_infos) - #use interface methods and data to detect stragglers in ray - self.num_of_submitted_tasks = len(ray_remote_task_infos) - self.current_batch_size = min(scaling_strategy.get_batch_size, self.num_of_submitted_tasks) - self.num_of_submitted_tasks_completed = 0 - self.remaining_ray_remote_task_infos = ray_remote_task_infos - self.batch_scaling_params = batch_scaling_params - self.task_promise_obj_ref_to_task_info_map: Dict[Any, RayRemoteTaskInfo] = {} - - self.unfinished_promises: List[Any] = [] - logger.info(f"Starting the execution of {len(ray_remote_task_infos)} Ray remote tasks. Concurrency of tasks execution: {self.current_batch_size}") if straggler_detection is not None: - #feed to non-detection only retry handler - self.__wait_and_get_all_task_results(straggler_detection) - else: - self.__submit_tasks(self.remaining_ray_remote_task_infos[:self.current_batch_size]) - self.remaining_ray_remote_task_infos = self.remaining_ray_remote_task_infos[self.current_batch_size:] - + while scaling_strategy.hasNextBatch(): + current_batch = scaling_strategy.next_batch() + for task in current_batch: + if straggler_detection.isStraggler(task): + ray.cancel(task) + else: + self._submit_tasks(task) + + def _wait_and_get_all_task_results(self, straggler_detection: Optional[StragglerDetectionInterface]) -> List[Any]: + return self._get_task_results(self.num_of_submitted_tasks, straggler_detection) + + def _get_task_results(self, num_of_results: int, straggler_detection: Optional[StragglerDetectionInterface]) -> List[Any]: + """ + Gets results from a list of tasks to be executed, and catches exceptions to manage the retry strategy. + Optional: Given a StragglerDetectionInterface, can detect and handle straggler tasks according to the client logic + """ + if not self.unfinished_promises or num_of_results == 0: + return [] + elif num_of_results > len(self.unfinished_promises): + num_of_results = len(self.unfinished_promises) + + finished, self.unfinished_promises = ray.wait(self.unfinished_promises, num_of_results) + successful_results = [] - def __wait_and_get_all_task_results(self, straggler_detection: Optional[StragglerDetectionInterface]) -> List[Any]: - return self.__get_task_results(self.num_of_submitted_tasks, straggler_detection) - - #Straggler detection will go in here - def __get_task_results(self, num_of_results: int, straggler_detection: Optional[StragglerDetectionInterface]) -> List[Any]: - if straggler_detection is not None: - finished, unfinished = ray.wait(unfinished, num_of_results, straggler_detection.calc_timeout_val) - else: - finished, unfinished = ray.wait(unfinished, num_of_results) for finished in finished: finished_result = None try: finished_result = ray.get(finished) except (Exception) as exception: #if exception send to method handle_ray_exception to determine what to do and assign the corresp error - finished_result = self.handle_ray_exception(exception=exception, ray_remote_task_info=self.task_promise_obj_ref_to_task_info_map[str(finished_promise)] )#evaluate the exception and return the error + finished_result = self._handle_ray_exception(exception=exception, ray_remote_task_info=self.task_promise_obj_ref_to_task_info_map[str(finished_promise)] )#evaluate the exception and return the error if finished_result and type(finished_result) == RayRemoteTaskExecutionError: finished_result = cast(RayRemoteTaskExecutionError, finished_result) + + if straggler_detection and straggler_detection.isStraggler(finished_result): + ray.cancel(finished_result) exception_retry_strategy_config = get_retry_strategy_config_for_known_exception(finished_result.exception, finished_result.ray_remote_task_info.exception_retry_strategy_configs) if (exception_retry_strategy_config is None or finished_result.ray_remote_task_info.num_of_attempts > exception_retry_strategy_config.max_retry_attempts): logger.error(f"The submitted task has exhausted all the maximum retries configured and finally throws exception - {finished_result.exception}") raise finished_result.exception - self.__update_ray_remote_task_options_on_exception(finished_result.exception, finished_result.ray_remote_task_info) - self.unfinished_promises.append(self.__invoke_ray_remote_task(ray_remote_task_info=finished_result.ray_remote_task_info)) + self._update_ray_remote_task_options_on_exception(finished_result.exception, finished_result.ray_remote_task_info) + self.unfinished_promises.append(self._invoke_ray_remote_task(ray_remote_task_info=finished_result.ray_remote_task_info)) else: successful_results.append(finished_result) del self.task_promise_obj_ref_to_task_info_map[str(finished_promise)] @@ -97,29 +99,35 @@ def __get_task_results(self, num_of_results: int, straggler_detection: Optional[ self.num_of_submitted_tasks_completed += num_of_successful_results self.current_batch_size -= num_of_successful_results - self.__enqueue_new_tasks(num_of_successful_results) + self._enqueue_new_tasks(num_of_successful_results) if num_of_successful_results < num_of_results: - successful_results.extend(self.wait_and_get_task_results(num_of_results - num_of_successful_results)) + successful_results.extend(self._get_task_results(num_of_results - num_of_successful_results)) return successful_results else: return successful_results - def __enqueue_new_tasks(self, num_of_tasks: int) -> None: - new_tasks_submitted = self.remaining_ray_remote_task_infos[:num_of_tasks] - num_of_new_tasks_submitted = len(new_tasks_submitted) - self.__submit_tasks(new_tasks_submitted) - self.remaining_ray_remote_task_infos = self.remaining_ray_remote_task_infos[num_of_tasks:] - self.current_batch_size += num_of_new_tasks_submitted - logger.info(f"Enqueued {num_of_new_tasks_submitted} new tasks. Current concurrency of tasks execution: {self.current_batch_size}, Current Task progress: {self.num_of_submitted_tasks_completed}/{self.num_of_submitted_tasks}") + def _enqueue_new_tasks(self, num_of_tasks: int) -> None: + """ + Helper method to submit a specified number of tasks + """ + new_tasks_submitted = self.remaining_ray_remote_task_infos[:num_of_tasks] + num_of_new_tasks_submitted = len(new_tasks_submitted) + self._submit_tasks(new_tasks_submitted) + self.remaining_ray_remote_task_infos = self.remaining_ray_remote_task_infos[num_of_tasks:] + self.current_batch_size += num_of_new_tasks_submitted + logger.info(f"Enqueued {num_of_new_tasks_submitted} new tasks. Current concurrency of tasks execution: {self.current_batch_size}, Current Task progress: {self.num_of_submitted_tasks_completed}/{self.num_of_submitted_tasks}") - def __submit_tasks(self, ray_remote_task_infos: List[RayRemoteTaskInfo]) -> None: + def _submit_tasks(self, ray_remote_task_infos: List[RayRemoteTaskInfo]) -> None: for ray_remote_task_info in ray_remote_task_infos: time.sleep(0.005) - self.unfinished_promises.append(self.__invoke_ray_remote_task(ray_remote_task_info)) - - def __invoke_ray_remote_task(self, ray_remote_task_info: RayRemoteTaskInfo) -> Any: + if self.straggler_detection and self.straggler_detection.is_straggler(ray_remote_task_info): + ray.cancel(ray_remote_task_info) + else: + self.unfinished_promises.append(self._invoke_ray_remote_task(ray_remote_task_info)) + #replace with ray.options + def _invoke_ray_remote_task(self, ray_remote_task_info: RayRemoteTaskInfo) -> Any: ray_remote_task_options_arguments = dict() if ray_remote_task_info.ray_remote_task_options.memory: @@ -136,14 +144,15 @@ def __invoke_ray_remote_task(self, ray_remote_task_info: RayRemoteTaskInfo) -> A return ray_remote_task_promise_obj_ref - def __update_ray_remote_task_options_on_exception(self, exception: Exception, ray_remote_task_info: RayRemoteTaskInfo): + #replace with ray.options + def _update_ray_remote_task_options_on_exception(self, exception: Exception, ray_remote_task_info: RayRemoteTaskInfo): exception_retry_strategy_config = get_retry_strategy_config_for_known_exception(exception, ray_remote_task_info.exception_retry_strategy_configs) if exception_retry_strategy_config and ray_remote_task_info.ray_remote_task_options.memory: logger.info(f"Updating the Ray remote task options after encountering exception: {exception}") ray_remote_task_memory_multiply_factor = exception_retry_strategy_config.ray_remote_task_memory_multiply_factor ray_remote_task_info.ray_remote_task_options.memory *= ray_remote_task_memory_multiply_factor logger.info(f"Updated ray remote task options Memory: {ray_remote_task_info.ray_remote_task_options.memory}") - - def __handle_ray_exception(self, exception: Exception, ray_remote_task_info: RayRemoteTaskInfo) -> RayRemoteTaskExecutionError: + #replace with own exceptions + def _handle_ray_exception(self, exception: Exception, ray_remote_task_info: RayRemoteTaskInfo) -> RayRemoteTaskExecutionError: logger.error(f"Ray remote task failed with {type(exception)} Ray exception: {exception}") - if type(exception).__name__ == \ No newline at end of file + if type(exception).__name__ == "AWSSecurityTokenRateExceededException(RetryableError)" \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/retry_task_interface.py b/deltacat/utils/ray_utils/retry_handler/retry_task_interface.py new file mode 100644 index 00000000..a1284350 --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/retry_task_interface.py @@ -0,0 +1,28 @@ +from typing import List, Protocol +from deltacat.utils.ray_utils.retry_handler.task_info_object import TaskInfoObject +import Exception + +class RetryTaskInterface(Protocol): + def init_tasks(self, task_infos: List[TaskInfoObject]) -> None: + """ + Loads all tasks to check for retries if exception occurs + """ + pass + + def should_retry(self, task: TaskInfoObject, exception: Exception) -> bool: + """ + Given a task, determine whether it should be retried or not + """ + pass + + def get_wait_time(self, task: TaskInfoObject) -> int: + """ + Determines the wait time between retries + """ + pass + + def retry(self, task: TaskInfoObject, exception: Exception) -> None: + """ + Executes retry behavior for the given exception + """ + pass \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/straggler_detection_interface.py b/deltacat/utils/ray_utils/retry_handler/straggler_detection_interface.py new file mode 100644 index 00000000..ef986b3b --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/straggler_detection_interface.py @@ -0,0 +1,12 @@ +from typing import Any, Protocol +from deltacat.utils.ray_utils.retry_handler.task_info_object import TaskInfoObject +from deltacat.utils.ray_utils.retry_handler.task_context import TaskContext +class StragglerDetectionInterface(Protocol): + """ + Using TaskContext, handles the client-side implementation for straggler detection + """ + def is_straggler(self, task: TaskInfoObject, task_context: TaskContext) -> bool: + """ + Given all the info, returns whether this specific task is a straggler or not + """ + pass \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/task_constants.py b/deltacat/utils/ray_utils/retry_handler/task_constants.py deleted file mode 100644 index e69de29b..00000000 diff --git a/deltacat/utils/ray_utils/retry_handler/task_context.py b/deltacat/utils/ray_utils/retry_handler/task_context.py new file mode 100644 index 00000000..f6217465 --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/task_context.py @@ -0,0 +1,11 @@ +from dataclasses import dataclass +from deltacat.utils.ray_utils.retry_handler.progress_notifier_interface import ProgressNotifierInterface +@dataclass +class TaskContext(): + """ + This class represents important info pertaining to the task that other interfaces like Straggler Detection + can use to make decisions + """ + def __init__(self, progress_notifier: ProgressNotifierInterface, timeoutTime: float): + self.progress_notifier = progress_notifier + self.timeoutTime = timeoutTime diff --git a/deltacat/utils/ray_utils/retry_handler/task_exception_retry_config.py b/deltacat/utils/ray_utils/retry_handler/task_exception_retry_config.py index 060382cf..76d99d9e 100644 --- a/deltacat/utils/ray_utils/retry_handler/task_exception_retry_config.py +++ b/deltacat/utils/ray_utils/retry_handler/task_exception_retry_config.py @@ -2,8 +2,10 @@ from dataclasses import dataclass from typing import List from deltacat.utils.ray_utils.retry_handler.task_constants import DEFAULT_RAY_REMOTE_TASK_BATCH_NEGATIVE_FEEDBACK_BATCH_SIZE_MULTIPLICATIVE_DECREASE_FACTOR, DEFAULT_RAY_REMOTE_TASK_BATCH_NEGATIVE_FEEDBACK_BACK_OFF_IN_MS, DEFAULT_RAY_REMOTE_TASK_BATCH_POSITIVE_FEEDBACK_BATCH_SIZE_ADDITIVE_INCREASE - class TaskExceptionRetryConfig: + """ + Determines how to handle and retry specific exceptions during task executions + """ def __init__(self, exception: Exception, max_retry_attempts: int = DEFAULT_MAX_RAY_REMOTE_TASK_RETRY_ATTEMPTS, initial_back_off_in_ms: int = DEFAULT_RAY_REMOTE_TASK_RETRY_INITIAL_BACK_OFF_IN_MS, @@ -16,8 +18,3 @@ def __init__(self, exception: Exception, self.back_off_factor = back_off_factor self.ray_remote_task_memory_multiply_factor = ray_remote_task_memory_multiplication_factor self.is_throttling_exception = is_throttling_exception - - @staticmethod - def getDefaultConfig() -> List[TaskExceptionRetryConfig]: - return [TaskExceptionRetryConfig(exception=RetryableError(), is_throttling_exception=True), - TaskExceptionRetryConfig(exception=RayOutOfMemoryError(), ray_remote_task_memory_multiplication_factor=RAY_REMOTE_TASK_MEMORY_MULTIPLICATION_FACTOR_FOR_OUT_OF_MEMORY_ERROR)] \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/task_execution_error.py b/deltacat/utils/ray_utils/retry_handler/task_execution_error.py index 8117040d..4e3e1d30 100644 --- a/deltacat/utils/ray_utils/retry_handler/task_execution_error.py +++ b/deltacat/utils/ray_utils/retry_handler/task_execution_error.py @@ -1,4 +1,4 @@ -class RayRemoteTaskExecutionError(): +class RayRemoteTaskExecutionError: """ An error class that denotes the Ray Remote Task Execution Failure """ diff --git a/deltacat/utils/ray_utils/retry_handler/task_info_object.py b/deltacat/utils/ray_utils/retry_handler/task_info_object.py index 225f2bb4..7baabe4b 100644 --- a/deltacat/utils/ray_utils/retry_handler/task_info_object.py +++ b/deltacat/utils/ray_utils/retry_handler/task_info_object.py @@ -1,17 +1,20 @@ from dataclasses import dataclass from typing import Any, Callable, List -from deltacat.utils.ray_utils.retry_handler.task_info_object import TaskExceptionRetryConfig +from deltacat.utils.ray_utils.retry_handler.task_exception_retry_config import TaskExceptionRetryConfig from deltacat.utils.ray_utils.retry_handler.task_options import RayRemoteTaskOptions @dataclass class TaskInfoObject: + """ + Dataclass holding important fields representing the Task as an object + """ def __init__(self, task_callable: Callable[[Any], [Any]], task_input: Any, ray_remote_task_options: RayRemoteTaskOptions = RayRemoteTaskOptions(), - exception_retry_strategy_configs: List[TaskExceptionRetryConfig]): + task_exception_retry_config: List[TaskExceptionRetryConfig]): self.task_callable = task_callable self.task_input = task_input self.ray_remote_task_options = ray_remote_task_options - self.exception_retry_strategy_configs = exception_retry_strategy_configs + self.task_exception_retry_config = task_exception_retry_config self.num_of_attempts = 0 From 360aaae62b1d7ee1e8e45d1fa644e96a21310007 Mon Sep 17 00:00:00 2001 From: Ekas Chawla Date: Thu, 27 Jul 2023 11:51:40 -0700 Subject: [PATCH 8/9] updated README and implemented AIMDBatchScaling --- .../utils/ray_utils/retry_handler/README.md | 6 ++-- ...ray_remote_tasks_batch_scaling_strategy.py | 36 ++++++++++++++----- 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/deltacat/utils/ray_utils/retry_handler/README.md b/deltacat/utils/ray_utils/retry_handler/README.md index c7a410c9..466d9d6d 100644 --- a/deltacat/utils/ray_utils/retry_handler/README.md +++ b/deltacat/utils/ray_utils/retry_handler/README.md @@ -10,12 +10,12 @@ Params: Use cases: 1. Notifying progress - This will be done through ProgressNotifierInterface. The client can use has_progress and send_progress + This will be done through ProgressNotifierInterface. The client can implement has_progress and send_progress from the interface to recieve updates on task level progress. This can be an SNSQueue or any type of indicator the client may choose. 2. Detecting stragglers Given the straggler detection algorithm implemented by StragglerDetectionInterface, the method is_straggler will inform - the customer if the current node is a straggler according to their own logic and proving them with TaskContext, the information - they might need to make that decision. + the customer if the current node is a straggler according to their own logic. In order to make their decision, we will provide them + with TaskContext that contains fields and data that the client can use to decide if a task is a straggler or not. 3. Retrying retryable exceptions Within the failure directory, there are common errors that are retryable and when detected as an instance of the retryable class, will cause the task to be retried when the exception is caught. If the client would like diff --git a/deltacat/utils/ray_utils/retry_handler/ray_remote_tasks_batch_scaling_strategy.py b/deltacat/utils/ray_utils/retry_handler/ray_remote_tasks_batch_scaling_strategy.py index 4af88d6d..0463b496 100644 --- a/deltacat/utils/ray_utils/retry_handler/ray_remote_tasks_batch_scaling_strategy.py +++ b/deltacat/utils/ray_utils/retry_handler/ray_remote_tasks_batch_scaling_strategy.py @@ -1,24 +1,42 @@ +from typing import List, Any from deltacat.utils.ray_utils.retry_handler.batch_scaling_interface import BatchScalingInterface class RayRemoteTasksBatchScalingStrategy(BatchScalingInterface): """ Default batch scaling parameters for if the client does not provide their own batch_scaling parameters """ + def __init__(self, additive_increase: int, multiplicative_decrease: float): + self.task_infos = [] + self.batch_index = 0 + self.batch_size = None + self.max_batch_size = None + self.min_batch_size = None + self.additive_increase = additive_increase + self.multiplicative_decrease = multiplicative_decrease + def init_tasks(self, initial_batch_size: int, max_batch_size: int, min_batch_size: int, task_infos: List[TaskInfoObject])-> None: + """ + Setup AIMD scaling for the batches as the default + """ + self.task_infos = task_infos + self.batch_size = initial_batch_size + self.max_batch_size = max_batch_size + self.min_batch_size = min_batch_size - def init_tasks(self)-> None: - """ - Setup AIMD scaling for the batches as the default - """ - pass - def next_batch(self) -> List: + def has_next_batch(self) -> bool: """ Returns the list of tasks included in the next batch of whatever size based on AIMD """ + return self.batch_index < len(self.task_infos) - pass - def has_next_batch(self) -> bool: + def next_batch(self) -> List[TaskInfoObject]: """ If there are no more tasks to execute that can not create a batch, return False """ - pass \ No newline at end of file + batch_end = min(self.batch_index + self.batch_size, len(self.task_infos)) + batch = self.task_infos[self.batch_index:batch_end] + self.batch_index = batch_end + return batch + + def mark_task_completed(self, task_info: TaskInfoObject) -> None: + From 7fc63ae496c3193397868af9a13aaa3179407a6a Mon Sep 17 00:00:00 2001 From: Ekas Chawla Date: Tue, 1 Aug 2023 13:31:22 -0700 Subject: [PATCH 9/9] implementation --- ...y => AIMD_based_batch_scaling_strategy.py} | 11 +++- .../retry_handler/batch_scaling_interface.py | 20 +++++- ..._security_token_rate_exceeded_exception.py | 2 +- .../failures/cairns_client_exception.py | 2 +- .../failures/non_retryable_error.py | 2 +- .../retry_handler/failures/retryable_error.py | 2 +- .../ray_task_submission_handler.py | 66 +++++++++++++++---- .../retry_handler/retry_task_default.py | 31 +++++++++ .../retry_handler/retry_task_interface.py | 8 +-- .../ray_utils/retry_handler/task_context.py | 4 +- .../retry_handler/task_info_object.py | 5 +- 11 files changed, 120 insertions(+), 33 deletions(-) rename deltacat/utils/ray_utils/retry_handler/{ray_remote_tasks_batch_scaling_strategy.py => AIMD_based_batch_scaling_strategy.py} (78%) create mode 100644 deltacat/utils/ray_utils/retry_handler/retry_task_default.py diff --git a/deltacat/utils/ray_utils/retry_handler/ray_remote_tasks_batch_scaling_strategy.py b/deltacat/utils/ray_utils/retry_handler/AIMD_based_batch_scaling_strategy.py similarity index 78% rename from deltacat/utils/ray_utils/retry_handler/ray_remote_tasks_batch_scaling_strategy.py rename to deltacat/utils/ray_utils/retry_handler/AIMD_based_batch_scaling_strategy.py index 0463b496..2b46f908 100644 --- a/deltacat/utils/ray_utils/retry_handler/ray_remote_tasks_batch_scaling_strategy.py +++ b/deltacat/utils/ray_utils/retry_handler/AIMD_based_batch_scaling_strategy.py @@ -1,6 +1,6 @@ from typing import List, Any from deltacat.utils.ray_utils.retry_handler.batch_scaling_interface import BatchScalingInterface -class RayRemoteTasksBatchScalingStrategy(BatchScalingInterface): +class AIMDBasedBatchScalingStrategy(BatchScalingInterface): """ Default batch scaling parameters for if the client does not provide their own batch_scaling parameters """ @@ -38,5 +38,12 @@ def next_batch(self) -> List[TaskInfoObject]: self.batch_index = batch_end return batch - def mark_task_completed(self, task_info: TaskInfoObject) -> None: + def mark_task_complete(self, task_info: TaskInfoObject): + task_info.completed = True + def increase_batch_size(self): + self.batch_size = min(self.batch_size + self.additive_increase, self.max_batch_size) + + + def decrease_batch_size(self): + self.batch_size = max(self.batch_size * self.multiplicative_decrease, self.min_batch_size) \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/batch_scaling_interface.py b/deltacat/utils/ray_utils/retry_handler/batch_scaling_interface.py index 3dff5985..dca5c339 100644 --- a/deltacat/utils/ray_utils/retry_handler/batch_scaling_interface.py +++ b/deltacat/utils/ray_utils/retry_handler/batch_scaling_interface.py @@ -4,9 +4,9 @@ class BatchScalingInterface(Protocol): """ Interface for a generic batch scaling that the client can provide. """ - def init_tasks(self, initial_batch_size: int, max_batch_size: int, min_batch_size: int, task_infos: List[Any]) -> None: + def init_tasks(self, initial_batch_size: int, max_batch_size: int, min_batch_size: int, task_infos: List[TaskInfoObject]) -> None: """ - Loads all tasks to be executed for retry and straggler detection + Loads all tasks to be executed for retry batching """ pass def has_next_batch(self) -> bool: @@ -19,10 +19,24 @@ def next_batch(self, task_info: TaskInfoObject) -> List: Gets the next batch to execute on """ pass - def mark_task_complete(self, task_info: TaskInfoObject) -> List: + def mark_task_complete(self, task_info: TaskInfoObject) -> None: """ If the task has been completed, mark some field of it as true so we know what tasks are completed and what need to be executed """ pass + def increase_batch_size(self) -> None: + """ + Increase the batch size by some amount according to client specifications + :return: + """ + pass + + def decrease_batch_size(self) -> None: + """ + Decrease the batch size by some amount according to client specifications + :return: + """ + pass + diff --git a/deltacat/utils/ray_utils/retry_handler/failures/aws_security_token_rate_exceeded_exception.py b/deltacat/utils/ray_utils/retry_handler/failures/aws_security_token_rate_exceeded_exception.py index c275941b..f64978c2 100644 --- a/deltacat/utils/ray_utils/retry_handler/failures/aws_security_token_rate_exceeded_exception.py +++ b/deltacat/utils/ray_utils/retry_handler/failures/aws_security_token_rate_exceeded_exception.py @@ -3,4 +3,4 @@ class AWSSecurityTokenRateExceededException(RetryableError): def __init__(self, *args: object) -> None: - super().__init__(*args) \ No newline at end of file + super().__init__(*args) diff --git a/deltacat/utils/ray_utils/retry_handler/failures/cairns_client_exception.py b/deltacat/utils/ray_utils/retry_handler/failures/cairns_client_exception.py index ce2778e6..42e6a08b 100644 --- a/deltacat/utils/ray_utils/retry_handler/failures/cairns_client_exception.py +++ b/deltacat/utils/ray_utils/retry_handler/failures/cairns_client_exception.py @@ -3,4 +3,4 @@ class CairnsClientException(RetryableError): def __init__(self, *args: object) -> None: - super().__init__(*args) \ No newline at end of file + super().__init__(*args) diff --git a/deltacat/utils/ray_utils/retry_handler/failures/non_retryable_error.py b/deltacat/utils/ray_utils/retry_handler/failures/non_retryable_error.py index c549093f..636fc8c1 100644 --- a/deltacat/utils/ray_utils/retry_handler/failures/non_retryable_error.py +++ b/deltacat/utils/ray_utils/retry_handler/failures/non_retryable_error.py @@ -4,4 +4,4 @@ class NonRetryableError(Exception): Class represents a non-retryable error """ def __init__(self, *args: object): - super().__init__(*args) \ No newline at end of file + super().__init__(*args) diff --git a/deltacat/utils/ray_utils/retry_handler/failures/retryable_error.py b/deltacat/utils/ray_utils/retry_handler/failures/retryable_error.py index c3746a65..01124240 100644 --- a/deltacat/utils/ray_utils/retry_handler/failures/retryable_error.py +++ b/deltacat/utils/ray_utils/retry_handler/failures/retryable_error.py @@ -4,4 +4,4 @@ class RetryableError(Exception): Class for errors that can be retried """ def __init__(self, *args: object) --> None: - super().__init__(*args) \ No newline at end of file + super().__init__(*args) diff --git a/deltacat/utils/ray_utils/retry_handler/ray_task_submission_handler.py b/deltacat/utils/ray_utils/retry_handler/ray_task_submission_handler.py index 85ee5b17..a03e6102 100644 --- a/deltacat/utils/ray_utils/retry_handler/ray_task_submission_handler.py +++ b/deltacat/utils/ray_utils/retry_handler/ray_task_submission_handler.py @@ -38,22 +38,62 @@ def start_tasks_execution(self, ray_remote_task_infos: List[TaskInfoObject], scaling_strategy: Optional[BatchScalingStrategy] = None, straggler_detection: Optional[StragglerDetectionInterface] = None, + retry_strategy: Optional[RetryTaskInterface], task_context: Optional[TaskContext]) -> None: """ Prepares and initiates the execution of a batch of tasks and can optionally support custom client batch scaling, straggler detection, and task context """ if scaling_strategy is None: - scaling_strategy = RayRemoteTasksBatchScalingStrategy(ray_remote_task_infos) + scaling_strategy = AIMDBasedBatchScalingStrategy(ray_remote_task_infos) + if retry_strategy is None: + retry_strategy = RetryTaskDefault(max_retries = 3) + + active_tasks = [] + + while scaling_strategy.has_next_batch(): + current_batch = scaling_strategy.next_batch() + for task in current_batch: + try: + self._submit_tasks(task) + active_tasks.append(task) + except Exception as e: + if retry_strategy.should_retry(task, e): + retry_strategy.retry(task, e) + continue + else: + raise #? not sure what to do if the error isnt retryable + completed_tasks = self._wait_and_get_all_task_results(active_tasks) + + for task in completed_tasks: + scaling_strategy.mark_task_complete(task) + active_tasks.remove(task) - if straggler_detection is not None: - while scaling_strategy.hasNextBatch(): - current_batch = scaling_strategy.next_batch() - for task in current_batch: - if straggler_detection.isStraggler(task): + if all(task.completed for task in current_batch): + scaling_strategy.increase_batch_size() + else: + scaling_strategy.decrease_batch_size() + + #handle strags + if straggler_detection is not None: + for task in active_tasks: #tasks that are still running + if straggler_detection.is_straggler(task, task_context): ray.cancel(task) - else: - self._submit_tasks(task) + active_tasks.remove(task) + #maybe we need to requeue the cancelled task? can add back to ray_remote_task_infos + + + #call wait_and_get_all ... + #when ray returns results mark as completed --> to mark as completed we want to give a bool field to the task info object and set to true, when gets marked to true + #if success, additive increase method to batchScaling + #if failure, MD on the batch size and continue until nothing remains + #check at least 1 is completed from current batch + #mark task as completed + + #wait some time period here ? --> call to _wait_and_get_all_task_results so there is a period to collect completed tasks + #use result of wait and remove from active_tasks because it is completed + #use results of completed promises compared to total tasks in batch to determine batch scaling increase or decrease + def _wait_and_get_all_task_results(self, straggler_detection: Optional[StragglerDetectionInterface]) -> List[Any]: return self._get_task_results(self.num_of_submitted_tasks, straggler_detection) @@ -119,15 +159,13 @@ def _enqueue_new_tasks(self, num_of_tasks: int) -> None: self.current_batch_size += num_of_new_tasks_submitted logger.info(f"Enqueued {num_of_new_tasks_submitted} new tasks. Current concurrency of tasks execution: {self.current_batch_size}, Current Task progress: {self.num_of_submitted_tasks_completed}/{self.num_of_submitted_tasks}") - def _submit_tasks(self, ray_remote_task_infos: List[RayRemoteTaskInfo]) -> None: - for ray_remote_task_info in ray_remote_task_infos: + def _submit_tasks(self, info_objs: List[TaskInfoObject]) -> None: + for info_obj in info_objs: time.sleep(0.005) - if self.straggler_detection and self.straggler_detection.is_straggler(ray_remote_task_info): - ray.cancel(ray_remote_task_info) - else: - self.unfinished_promises.append(self._invoke_ray_remote_task(ray_remote_task_info)) + self.unfinished_promises.append(self._invoke_ray_remote_task(info_obj)) #replace with ray.options def _invoke_ray_remote_task(self, ray_remote_task_info: RayRemoteTaskInfo) -> Any: + #change to using ray.options ray_remote_task_options_arguments = dict() if ray_remote_task_info.ray_remote_task_options.memory: diff --git a/deltacat/utils/ray_utils/retry_handler/retry_task_default.py b/deltacat/utils/ray_utils/retry_handler/retry_task_default.py new file mode 100644 index 00000000..6c2bdf5c --- /dev/null +++ b/deltacat/utils/ray_utils/retry_handler/retry_task_default.py @@ -0,0 +1,31 @@ +from typing import List, Protocol +from deltacat.utils.ray_utils.retry_handler.task_info_object import TaskInfoObject +import Exception + +class RetryTaskDefault(RetryTaskInterface): + def __init__(self, max_retries: int): + self.max_retries = max_retries + def should_retry(self, task: TaskInfoObject, exception: Exception): + """ + Given a task, determine whether it should be retried or not based on if its an instance of the RetryableError + """ + if isinstance(exception, RetryableError): + return True + + + def get_wait_time(self, task: TaskInfoObject): + """ + Determines the wait time between retries + """ + pass + + def retry(self, task: TaskInfoObject, exception: Exception): + """ + Executes retry behavior for the given exception + """ + task_id = task.task_id + if self.should_retry(task, exception): + wait_time = self.get_wait_time(task) + time.sleep(wait_time) + #increase retry count here + self.execute_task(task) \ No newline at end of file diff --git a/deltacat/utils/ray_utils/retry_handler/retry_task_interface.py b/deltacat/utils/ray_utils/retry_handler/retry_task_interface.py index a1284350..2e661457 100644 --- a/deltacat/utils/ray_utils/retry_handler/retry_task_interface.py +++ b/deltacat/utils/ray_utils/retry_handler/retry_task_interface.py @@ -3,15 +3,9 @@ import Exception class RetryTaskInterface(Protocol): - def init_tasks(self, task_infos: List[TaskInfoObject]) -> None: - """ - Loads all tasks to check for retries if exception occurs - """ - pass - def should_retry(self, task: TaskInfoObject, exception: Exception) -> bool: """ - Given a task, determine whether it should be retried or not + Given a task, determine whether it should be retried or not based on if its an instance of the RetryableError """ pass diff --git a/deltacat/utils/ray_utils/retry_handler/task_context.py b/deltacat/utils/ray_utils/retry_handler/task_context.py index f6217465..b41e68e5 100644 --- a/deltacat/utils/ray_utils/retry_handler/task_context.py +++ b/deltacat/utils/ray_utils/retry_handler/task_context.py @@ -6,6 +6,6 @@ class TaskContext(): This class represents important info pertaining to the task that other interfaces like Straggler Detection can use to make decisions """ - def __init__(self, progress_notifier: ProgressNotifierInterface, timeoutTime: float): + def __init__(self, progress_notifier: ProgressNotifierInterface, timeout: float): self.progress_notifier = progress_notifier - self.timeoutTime = timeoutTime + self.timeout = timeout diff --git a/deltacat/utils/ray_utils/retry_handler/task_info_object.py b/deltacat/utils/ray_utils/retry_handler/task_info_object.py index 7baabe4b..3f852460 100644 --- a/deltacat/utils/ray_utils/retry_handler/task_info_object.py +++ b/deltacat/utils/ray_utils/retry_handler/task_info_object.py @@ -9,12 +9,15 @@ class TaskInfoObject: Dataclass holding important fields representing the Task as an object """ def __init__(self, + task_id: str, task_callable: Callable[[Any], [Any]], task_input: Any, ray_remote_task_options: RayRemoteTaskOptions = RayRemoteTaskOptions(), task_exception_retry_config: List[TaskExceptionRetryConfig]): + self.task_complete = False + self.task_id = task_id self.task_callable = task_callable self.task_input = task_input self.ray_remote_task_options = ray_remote_task_options self.task_exception_retry_config = task_exception_retry_config - self.num_of_attempts = 0 +