Skip to content

Commit

Permalink
Added sleep feature
Browse files Browse the repository at this point in the history
- Implemented a sleep feature with dynamic progress bar updatio
  • Loading branch information
7Zenox committed Jul 9, 2024
1 parent 261a652 commit 40b07eb
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 18 deletions.
36 changes: 22 additions & 14 deletions pandarallel/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,14 @@ class WrapWorkFunctionForFileSystem:
def __init__(
self,
work_function: Callable[
[Any, Callable, tuple, Dict[str, Any], Dict[str, Any]], Any
[
Any,
Callable,
tuple,
Dict[str, Any],
Dict[str, Any]
],
Any
],
) -> None:
self.work_function = work_function
Expand Down Expand Up @@ -142,7 +149,8 @@ def __call__(
user_defined_function: Callable = dill.loads(dilled_user_defined_function)

progress_wrapped_user_defined_function = progress_wrapper(
user_defined_function, master_workers_queue, worker_index, data_size
user_defined_function, master_workers_queue, worker_index, data_size,
extra.get('sleep_seconds', 0), extra.get('sleep_after_percent', 100.0)
)

used_user_defined_function = (
Expand Down Expand Up @@ -171,7 +179,6 @@ def __call__(
master_workers_queue.put((worker_index, WorkerStatus.Error, None))
raise


def wrap_reduce_function_for_file_system(
reduce_function: Callable[[Iterator, Dict[str, Any]], Any]
) -> Callable[[Iterator[Path], Dict[str, Any]], Any]:
Expand Down Expand Up @@ -217,7 +224,7 @@ def closure(

chunks = list(
data_type.get_chunks(
nb_requested_workers,
max(1, nb_requested_workers), # Ensure at least 1 worker
data,
user_defined_function_kwargs=user_defined_function_kwargs,
)
Expand Down Expand Up @@ -327,14 +334,10 @@ def closure(
reduce_extra,
)
except EOFError:
results_promise.get()
# Loading the files failed, this most likely means that there
# was some error during processing and the files were never
# saved at all.
results_promise.get()

# If the above statement does not raise an exception, that
# means the multiprocessing went well and we want to re-raise
# the original EOFError.
raise

finally:
Expand All @@ -355,6 +358,8 @@ def parallelize_with_pipe(
nb_requested_workers: int,
data_type: Type[DataType],
progress_bars_type: ProgressBarsType,
sleep_seconds: int = 0, # Add this parameter
sleep_after_percent: float = 100.0 # Add this parameter
):
def closure(
data: Any,
Expand All @@ -369,7 +374,7 @@ def closure(

chunks = list(
data_type.get_chunks(
nb_requested_workers,
max(1, nb_requested_workers), # Ensure at least 1 worker
data,
user_defined_function_kwargs=user_defined_function_kwargs,
)
Expand Down Expand Up @@ -410,6 +415,8 @@ def closure(
"master_workers_queue": master_workers_queue,
"show_progress_bars": show_progress_bars,
"worker_index": worker_index,
"sleep_seconds": sleep_seconds, # Add sleep_seconds to extra
"sleep_after_percent": sleep_after_percent # Add sleep_after_percent to extra
},
},
)
Expand Down Expand Up @@ -447,7 +454,6 @@ def closure(

return closure


class pandarallel:
@classmethod
def initialize(
Expand All @@ -457,6 +463,8 @@ def initialize(
progress_bar=False,
verbose=2,
use_memory_fs: Optional[bool] = None,
sleep_seconds: int = 0,
sleep_after_percent: float = 100.0,
) -> None:
show_progress_bars = progress_bar
is_memory_fs_available = Path(MEMORY_FS_ROOT).exists()
Expand All @@ -468,7 +476,7 @@ def initialize(
parallelize = (
parallelize_with_memory_file_system
if use_memory_fs
else parallelize_with_pipe
else lambda *args, **kwargs: parallelize_with_pipe(*args, **kwargs, sleep_seconds=sleep_seconds, sleep_after_percent=sleep_after_percent)
)

if use_memory_fs and not is_memory_fs_available:
Expand Down Expand Up @@ -526,7 +534,7 @@ def initialize(
pd.DataFrame.parallel_applymap = parallelize(
nb_workers,
DataFrame.ApplyMap,
progress_bars_in_user_defined_function_multiply_by_number_of_columns,
progress_bars_in_user_defined_function_multiply_by_number_of_columns
)

# DataFrame GroupBy
Expand All @@ -553,4 +561,4 @@ def initialize(
# Series Rolling
pd.core.window.Rolling.parallel_apply = parallelize(
nb_workers, SeriesRolling.Apply, progress_bars_in_user_defined_function
)
)
14 changes: 10 additions & 4 deletions pandarallel/progress_bars.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from abc import ABC, abstractmethod
from enum import Enum
from itertools import count
from time import time_ns
from time import time_ns, sleep
from typing import Callable, List, Union

from .utils import WorkerStatus
Expand Down Expand Up @@ -192,16 +192,22 @@ def progress_wrapper(
master_workers_queue: multiprocessing.Queue,
index: int,
chunk_size: int,
sleep_seconds: int = 0,
sleep_after_percent: float = 100.0
) -> Callable:
"""Wrap the function to apply in a function which monitor the part of work already
done.
"""Wrap the function to apply in a function which monitors the part of work already
done and pauses after every n% completion.
"""
counter = count()
state = ProgressState(chunk_size)
sleep_interval = int(chunk_size * (sleep_after_percent / 100.0))

def closure(*user_defined_function_args, **user_defined_functions_kwargs):
iteration = next(counter)

if iteration % sleep_interval == 0 and iteration != 0:
sleep(sleep_seconds)

if iteration == state.next_put_iteration:
time_now = time_ns()
master_workers_queue.put_nowait((index, WorkerStatus.Running, iteration))
Expand All @@ -220,4 +226,4 @@ def closure(*user_defined_function_args, **user_defined_functions_kwargs):
*user_defined_function_args, **user_defined_functions_kwargs
)

return closure
return closure

0 comments on commit 40b07eb

Please sign in to comment.