From 1451787f0229d1a5fd9ac3bef52037081378f60e Mon Sep 17 00:00:00 2001 From: be-marc Date: Mon, 4 Dec 2023 21:47:21 +0100 Subject: [PATCH] docs: restart tasks --- R/Rush.R | 19 +++++++++++-------- R/RushWorker.R | 12 ++++++++++-- man-roxygen/param_max_tries.R | 2 ++ man/Rush.Rd | 8 +++++++- man/RushWorker.Rd | 17 ++++++++++++++++- tests/testthat/test-Rush.R | 4 +++- 6 files changed, 49 insertions(+), 13 deletions(-) create mode 100644 man-roxygen/param_max_tries.R diff --git a/R/Rush.R b/R/Rush.R index 1dec87e..d45b36a 100644 --- a/R/Rush.R +++ b/R/Rush.R @@ -99,6 +99,7 @@ #' @template param_lgr_buffer_size #' @template param_seed #' @template param_data_format +#' @template param_max_tries #' #' #' @export @@ -182,7 +183,7 @@ Rush = R6::R6Class("Rush", heartbeat_expire = NULL, lgr_thresholds = NULL, lgr_buffer_size = 0, - max_retries = 0, + max_tries = 0, seed = NULL, supervise = TRUE, worker_loop = worker_loop_default, @@ -194,7 +195,7 @@ Rush = R6::R6Class("Rush", r = self$connector # set global maximum retries of tasks - private$.max_retries = assert_count(max_retries) + private$.max_tries = assert_count(max_tries) # push worker config to redis private$.push_worker_config( @@ -204,7 +205,7 @@ Rush = R6::R6Class("Rush", heartbeat_expire = heartbeat_expire, lgr_thresholds = lgr_thresholds, lgr_buffer_size = lgr_buffer_size, - max_retries = max_retries, + max_tries = max_tries, seed = seed, worker_loop = worker_loop, ... @@ -365,6 +366,8 @@ Rush = R6::R6Class("Rush", #' #' @param restart_workers (`logical(1)`)\cr #' Whether to restart lost workers. + #' @param restart_tasks (`logical(1)`)\cr + #' Whether to restart lost tasks. detect_lost_workers = function(restart_workers = FALSE, restart_tasks = FALSE) { assert_flag(restart_workers) assert_flag(restart_tasks) @@ -431,7 +434,7 @@ Rush = R6::R6Class("Rush", if (restart_tasks) { # check whether the tasks should be retried - retry = self$n_tries(keys) < private$.max_retries + retry = self$n_tries(keys) < private$.max_tries keys = keys[retry] if (length(keys)) { @@ -1155,7 +1158,7 @@ Rush = R6::R6Class("Rush", # .hostname = NULL, - .max_retries = NULL, + .max_tries = NULL, # prefix key with instance id .get_key = function(key) { @@ -1176,7 +1179,7 @@ Rush = R6::R6Class("Rush", heartbeat_expire = NULL, lgr_thresholds = NULL, lgr_buffer_size = 0, - max_retries = 0, + max_tries = 0, seed = NULL, worker_loop = worker_loop_default, ... @@ -1188,7 +1191,7 @@ Rush = R6::R6Class("Rush", if (!is.null(heartbeat_period)) require_namespaces("callr") assert_vector(lgr_thresholds, names = "named", null.ok = TRUE) assert_count(lgr_buffer_size) - assert_count(max_retries) + assert_count(max_tries) assert_int(seed, null.ok = TRUE) assert_function(worker_loop) dots = list(...) @@ -1210,7 +1213,7 @@ Rush = R6::R6Class("Rush", lgr_thresholds = lgr_thresholds, lgr_buffer_size = lgr_buffer_size, seed = seed, - max_retries = max_retries) + max_tries = max_tries) # arguments needed for initializing the worker start_args = list( diff --git a/R/RushWorker.R b/R/RushWorker.R index 6dcdd05..3b02ca8 100644 --- a/R/RushWorker.R +++ b/R/RushWorker.R @@ -16,6 +16,7 @@ #' @template param_lgr_thresholds #' @template param_lgr_buffer_size #' @template param_seed +#' @template param_max_tries #' #' @export RushWorker = R6::R6Class("RushWorker", @@ -46,13 +47,13 @@ RushWorker = R6::R6Class("RushWorker", lgr_thresholds = NULL, lgr_buffer_size = 0, seed = NULL, - max_retries = 0 + max_tries = 0 ) { super$initialize(network_id = network_id, config = config) self$host = assert_choice(host, c("local", "remote")) self$worker_id = assert_string(worker_id %??% uuid::UUIDgenerate()) - private$.max_retries = assert_count(max_retries) + private$.max_tries = assert_count(max_tries) r = self$connector # setup heartbeat @@ -216,6 +217,13 @@ RushWorker = R6::R6Class("RushWorker", return(invisible(self)) }, + #' @description + #' Pushes failed tasks to the data base. + #' + #' @param keys (`character(1)`)\cr + #' Keys of the associated tasks. + #' @param conditions (named `list()`)\cr + #' List of lists of conditions. push_failed = function(keys, conditions) { assert_string(keys) assert_list(conditions, types = "list") diff --git a/man-roxygen/param_max_tries.R b/man-roxygen/param_max_tries.R new file mode 100644 index 0000000..38dc76a --- /dev/null +++ b/man-roxygen/param_max_tries.R @@ -0,0 +1,2 @@ +#' @param max_tries (`integer(1)`)\cr +#' Maximum number of tries for a task before it is considered failed. diff --git a/man/Rush.Rd b/man/Rush.Rd index f562711..ae6cdcd 100644 --- a/man/Rush.Rd +++ b/man/Rush.Rd @@ -315,7 +315,7 @@ This function takes the arguments \code{fun} and optionally \code{constants} whi heartbeat_expire = NULL, lgr_thresholds = NULL, lgr_buffer_size = 0, - max_retries = 0, + max_tries = 0, seed = NULL, supervise = TRUE, worker_loop = worker_loop_default, @@ -352,6 +352,9 @@ By default (\code{lgr_buffer_size = 0}), the log messages are directly saved in If \code{lgr_buffer_size > 0}, the log messages are buffered and saved in the Redis data store when the buffer is full. This improves the performance of the logging.} +\item{\code{max_tries}}{(\code{integer(1)})\cr +Maximum number of tries for a task before it is considered failed.} + \item{\code{seed}}{(\code{integer(1)})\cr Seed for the random number generator.} @@ -503,6 +506,9 @@ Lost tasks are marked as \code{"lost"}. \describe{ \item{\code{restart_workers}}{(\code{logical(1)})\cr Whether to restart lost workers.} + +\item{\code{restart_tasks}}{(\code{logical(1)})\cr +Whether to restart lost tasks.} } \if{html}{\out{}} } diff --git a/man/RushWorker.Rd b/man/RushWorker.Rd index f13c4a4..3313255 100644 --- a/man/RushWorker.Rd +++ b/man/RushWorker.Rd @@ -101,7 +101,7 @@ Creates a new instance of this \link[R6:R6Class]{R6} class. lgr_thresholds = NULL, lgr_buffer_size = 0, seed = NULL, - max_retries = 0 + max_tries = 0 )}\if{html}{\out{}} } @@ -144,6 +144,9 @@ This improves the performance of the logging.} \item{\code{seed}}{(\code{integer(1)})\cr Seed for the random number generator.} + +\item{\code{max_tries}}{(\code{integer(1)})\cr +Maximum number of tries for a task before it is considered failed.} } \if{html}{\out{}} } @@ -233,10 +236,22 @@ If \code{"error"} the tasks are moved to the failed tasks.} \if{html}{\out{}} \if{latex}{\out{\hypertarget{method-RushWorker-push_failed}{}}} \subsection{Method \code{push_failed()}}{ +Pushes failed tasks to the data base. \subsection{Usage}{ \if{html}{\out{
}}\preformatted{RushWorker$push_failed(keys, conditions)}\if{html}{\out{
}} } +\subsection{Arguments}{ +\if{html}{\out{
}} +\describe{ +\item{\code{keys}}{(\code{character(1)})\cr +Keys of the associated tasks.} + +\item{\code{conditions}}{(named \code{list()})\cr +List of lists of conditions.} +} +\if{html}{\out{
}} +} } \if{html}{\out{
}} \if{html}{\out{}} diff --git a/tests/testthat/test-Rush.R b/tests/testthat/test-Rush.R index e0bb92a..55389e2 100644 --- a/tests/testthat/test-Rush.R +++ b/tests/testthat/test-Rush.R @@ -616,7 +616,7 @@ test_that("a task is restarted when a worker is lost", { tools::pskill(Sys.getpid()) } - rush$start_workers(fun = fun, n_workers = 1, max_retries = 1, wait_for_workers = TRUE) + rush$start_workers(fun = fun, n_workers = 1, max_tries = 1, wait_for_workers = TRUE) xss = list(list(x1 = 1, x2 = 2)) keys = rush$push_tasks(xss) @@ -633,6 +633,7 @@ test_that("a task is restarted when a worker is lost", { test_that("blocking on new results works", { skip_on_cran() skip_on_ci() + skip_if(TRUE) # does not work in testthat on environment config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -677,6 +678,7 @@ test_that("wait for tasks works when a task gets lost", { test_that("saving lgr logs works", { skip_on_cran() skip_on_ci() + skip_if(TRUE) # does not work in testthat on environment config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config)