From 40b07eb3174814ec32d227585c0fb59ddd0d48a0 Mon Sep 17 00:00:00 2001 From: Vasu Jain <91457798+7Zenox@users.noreply.github.com> Date: Wed, 10 Jul 2024 00:06:23 +0530 Subject: [PATCH] Added sleep feature - Implemented a sleep feature with dynamic progress bar updatio --- pandarallel/core.py | 36 ++++++++++++++++++++++-------------- pandarallel/progress_bars.py | 14 ++++++++++---- 2 files changed, 32 insertions(+), 18 deletions(-) diff --git a/pandarallel/core.py b/pandarallel/core.py index c54ec14..82eff1a 100644 --- a/pandarallel/core.py +++ b/pandarallel/core.py @@ -49,7 +49,14 @@ class WrapWorkFunctionForFileSystem: def __init__( self, work_function: Callable[ - [Any, Callable, tuple, Dict[str, Any], Dict[str, Any]], Any + [ + Any, + Callable, + tuple, + Dict[str, Any], + Dict[str, Any] + ], + Any ], ) -> None: self.work_function = work_function @@ -142,7 +149,8 @@ def __call__( user_defined_function: Callable = dill.loads(dilled_user_defined_function) progress_wrapped_user_defined_function = progress_wrapper( - user_defined_function, master_workers_queue, worker_index, data_size + user_defined_function, master_workers_queue, worker_index, data_size, + extra.get('sleep_seconds', 0), extra.get('sleep_after_percent', 100.0) ) used_user_defined_function = ( @@ -171,7 +179,6 @@ def __call__( master_workers_queue.put((worker_index, WorkerStatus.Error, None)) raise - def wrap_reduce_function_for_file_system( reduce_function: Callable[[Iterator, Dict[str, Any]], Any] ) -> Callable[[Iterator[Path], Dict[str, Any]], Any]: @@ -217,7 +224,7 @@ def closure( chunks = list( data_type.get_chunks( - nb_requested_workers, + max(1, nb_requested_workers), # Ensure at least 1 worker data, user_defined_function_kwargs=user_defined_function_kwargs, ) @@ -327,14 +334,10 @@ def closure( reduce_extra, ) except EOFError: + results_promise.get() # Loading the files failed, this most likely means that there # was some error during processing and the files were never # saved at all. - results_promise.get() - - # If the above statement does not raise an exception, that - # means the multiprocessing went well and we want to re-raise - # the original EOFError. raise finally: @@ -355,6 +358,8 @@ def parallelize_with_pipe( nb_requested_workers: int, data_type: Type[DataType], progress_bars_type: ProgressBarsType, + sleep_seconds: int = 0, # Add this parameter + sleep_after_percent: float = 100.0 # Add this parameter ): def closure( data: Any, @@ -369,7 +374,7 @@ def closure( chunks = list( data_type.get_chunks( - nb_requested_workers, + max(1, nb_requested_workers), # Ensure at least 1 worker data, user_defined_function_kwargs=user_defined_function_kwargs, ) @@ -410,6 +415,8 @@ def closure( "master_workers_queue": master_workers_queue, "show_progress_bars": show_progress_bars, "worker_index": worker_index, + "sleep_seconds": sleep_seconds, # Add sleep_seconds to extra + "sleep_after_percent": sleep_after_percent # Add sleep_after_percent to extra }, }, ) @@ -447,7 +454,6 @@ def closure( return closure - class pandarallel: @classmethod def initialize( @@ -457,6 +463,8 @@ def initialize( progress_bar=False, verbose=2, use_memory_fs: Optional[bool] = None, + sleep_seconds: int = 0, + sleep_after_percent: float = 100.0, ) -> None: show_progress_bars = progress_bar is_memory_fs_available = Path(MEMORY_FS_ROOT).exists() @@ -468,7 +476,7 @@ def initialize( parallelize = ( parallelize_with_memory_file_system if use_memory_fs - else parallelize_with_pipe + else lambda *args, **kwargs: parallelize_with_pipe(*args, **kwargs, sleep_seconds=sleep_seconds, sleep_after_percent=sleep_after_percent) ) if use_memory_fs and not is_memory_fs_available: @@ -526,7 +534,7 @@ def initialize( pd.DataFrame.parallel_applymap = parallelize( nb_workers, DataFrame.ApplyMap, - progress_bars_in_user_defined_function_multiply_by_number_of_columns, + progress_bars_in_user_defined_function_multiply_by_number_of_columns ) # DataFrame GroupBy @@ -553,4 +561,4 @@ def initialize( # Series Rolling pd.core.window.Rolling.parallel_apply = parallelize( nb_workers, SeriesRolling.Apply, progress_bars_in_user_defined_function - ) + ) \ No newline at end of file diff --git a/pandarallel/progress_bars.py b/pandarallel/progress_bars.py index ec6602c..7f56748 100644 --- a/pandarallel/progress_bars.py +++ b/pandarallel/progress_bars.py @@ -5,7 +5,7 @@ from abc import ABC, abstractmethod from enum import Enum from itertools import count -from time import time_ns +from time import time_ns, sleep from typing import Callable, List, Union from .utils import WorkerStatus @@ -192,16 +192,22 @@ def progress_wrapper( master_workers_queue: multiprocessing.Queue, index: int, chunk_size: int, + sleep_seconds: int = 0, + sleep_after_percent: float = 100.0 ) -> Callable: - """Wrap the function to apply in a function which monitor the part of work already - done. + """Wrap the function to apply in a function which monitors the part of work already + done and pauses after every n% completion. """ counter = count() state = ProgressState(chunk_size) + sleep_interval = int(chunk_size * (sleep_after_percent / 100.0)) def closure(*user_defined_function_args, **user_defined_functions_kwargs): iteration = next(counter) + if iteration % sleep_interval == 0 and iteration != 0: + sleep(sleep_seconds) + if iteration == state.next_put_iteration: time_now = time_ns() master_workers_queue.put_nowait((index, WorkerStatus.Running, iteration)) @@ -220,4 +226,4 @@ def closure(*user_defined_function_args, **user_defined_functions_kwargs): *user_defined_function_args, **user_defined_functions_kwargs ) - return closure + return closure \ No newline at end of file