Skip to content

Commit

Permalink
docs: restart tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
be-marc committed Dec 4, 2023
1 parent 291f8d7 commit 1451787
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 13 deletions.
19 changes: 11 additions & 8 deletions R/Rush.R
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
#' @template param_lgr_buffer_size
#' @template param_seed
#' @template param_data_format
#' @template param_max_tries
#'
#'
#' @export
Expand Down Expand Up @@ -182,7 +183,7 @@ Rush = R6::R6Class("Rush",
heartbeat_expire = NULL,
lgr_thresholds = NULL,
lgr_buffer_size = 0,
max_retries = 0,
max_tries = 0,
seed = NULL,
supervise = TRUE,
worker_loop = worker_loop_default,
Expand All @@ -194,7 +195,7 @@ Rush = R6::R6Class("Rush",
r = self$connector

# set global maximum retries of tasks
private$.max_retries = assert_count(max_retries)
private$.max_tries = assert_count(max_tries)

# push worker config to redis
private$.push_worker_config(
Expand All @@ -204,7 +205,7 @@ Rush = R6::R6Class("Rush",
heartbeat_expire = heartbeat_expire,
lgr_thresholds = lgr_thresholds,
lgr_buffer_size = lgr_buffer_size,
max_retries = max_retries,
max_tries = max_tries,
seed = seed,
worker_loop = worker_loop,
...
Expand Down Expand Up @@ -365,6 +366,8 @@ Rush = R6::R6Class("Rush",
#'
#' @param restart_workers (`logical(1)`)\cr
#' Whether to restart lost workers.
#' @param restart_tasks (`logical(1)`)\cr
#' Whether to restart lost tasks.
detect_lost_workers = function(restart_workers = FALSE, restart_tasks = FALSE) {
assert_flag(restart_workers)
assert_flag(restart_tasks)
Expand Down Expand Up @@ -431,7 +434,7 @@ Rush = R6::R6Class("Rush",
if (restart_tasks) {

# check whether the tasks should be retried
retry = self$n_tries(keys) < private$.max_retries
retry = self$n_tries(keys) < private$.max_tries
keys = keys[retry]

if (length(keys)) {
Expand Down Expand Up @@ -1155,7 +1158,7 @@ Rush = R6::R6Class("Rush",
#
.hostname = NULL,

.max_retries = NULL,
.max_tries = NULL,

# prefix key with instance id
.get_key = function(key) {
Expand All @@ -1176,7 +1179,7 @@ Rush = R6::R6Class("Rush",
heartbeat_expire = NULL,
lgr_thresholds = NULL,
lgr_buffer_size = 0,
max_retries = 0,
max_tries = 0,
seed = NULL,
worker_loop = worker_loop_default,
...
Expand All @@ -1188,7 +1191,7 @@ Rush = R6::R6Class("Rush",
if (!is.null(heartbeat_period)) require_namespaces("callr")
assert_vector(lgr_thresholds, names = "named", null.ok = TRUE)
assert_count(lgr_buffer_size)
assert_count(max_retries)
assert_count(max_tries)
assert_int(seed, null.ok = TRUE)
assert_function(worker_loop)
dots = list(...)
Expand All @@ -1210,7 +1213,7 @@ Rush = R6::R6Class("Rush",
lgr_thresholds = lgr_thresholds,
lgr_buffer_size = lgr_buffer_size,
seed = seed,
max_retries = max_retries)
max_tries = max_tries)

# arguments needed for initializing the worker
start_args = list(
Expand Down
12 changes: 10 additions & 2 deletions R/RushWorker.R
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#' @template param_lgr_thresholds
#' @template param_lgr_buffer_size
#' @template param_seed
#' @template param_max_tries
#'
#' @export
RushWorker = R6::R6Class("RushWorker",
Expand Down Expand Up @@ -46,13 +47,13 @@ RushWorker = R6::R6Class("RushWorker",
lgr_thresholds = NULL,
lgr_buffer_size = 0,
seed = NULL,
max_retries = 0
max_tries = 0
) {
super$initialize(network_id = network_id, config = config)

self$host = assert_choice(host, c("local", "remote"))
self$worker_id = assert_string(worker_id %??% uuid::UUIDgenerate())
private$.max_retries = assert_count(max_retries)
private$.max_tries = assert_count(max_tries)
r = self$connector

# setup heartbeat
Expand Down Expand Up @@ -216,6 +217,13 @@ RushWorker = R6::R6Class("RushWorker",
return(invisible(self))
},

#' @description
#' Pushes failed tasks to the data base.
#'
#' @param keys (`character(1)`)\cr
#' Keys of the associated tasks.
#' @param conditions (named `list()`)\cr
#' List of lists of conditions.
push_failed = function(keys, conditions) {
assert_string(keys)
assert_list(conditions, types = "list")
Expand Down
2 changes: 2 additions & 0 deletions man-roxygen/param_max_tries.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#' @param max_tries (`integer(1)`)\cr
#' Maximum number of tries for a task before it is considered failed.
8 changes: 7 additions & 1 deletion man/Rush.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 16 additions & 1 deletion man/RushWorker.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion tests/testthat/test-Rush.R
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ test_that("a task is restarted when a worker is lost", {
tools::pskill(Sys.getpid())
}

rush$start_workers(fun = fun, n_workers = 1, max_retries = 1, wait_for_workers = TRUE)
rush$start_workers(fun = fun, n_workers = 1, max_tries = 1, wait_for_workers = TRUE)

xss = list(list(x1 = 1, x2 = 2))
keys = rush$push_tasks(xss)
Expand All @@ -633,6 +633,7 @@ test_that("a task is restarted when a worker is lost", {
test_that("blocking on new results works", {
skip_on_cran()
skip_on_ci()
skip_if(TRUE) # does not work in testthat on environment

config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
Expand Down Expand Up @@ -677,6 +678,7 @@ test_that("wait for tasks works when a task gets lost", {
test_that("saving lgr logs works", {
skip_on_cran()
skip_on_ci()
skip_if(TRUE) # does not work in testthat on environment

config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
Expand Down

0 comments on commit 1451787

Please sign in to comment.