Skip to content

Commit

Permalink
feat: store pre-started workers
Browse files Browse the repository at this point in the history
  • Loading branch information
be-marc committed Jun 12, 2024
1 parent fb03341 commit 11105c8
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 4 deletions.
15 changes: 15 additions & 0 deletions R/Rush.R
Original file line number Diff line number Diff line change
Expand Up @@ -1247,6 +1247,14 @@ Rush = R6::R6Class("Rush",
as.integer(r$SCARD(private$.get_key("lost_worker_ids"))) %??% 0
},

#' @field n_pre_workers (`integer(1)`)\cr
#' Number of workers that are not yet completely started.
n_pre_workers = function(rhs) {
assert_ro_binding(rhs)
r = self$connector
as.integer(r$SCARD(private$.get_key("pre_worker_ids"))) %??% 0
},

#' @field worker_ids (`character()`)\cr
#' Ids of workers.
worker_ids = function() {
Expand Down Expand Up @@ -1282,6 +1290,13 @@ Rush = R6::R6Class("Rush",
unlist(r$SMEMBERS(private$.get_key("lost_worker_ids")))
},

#' @field pre_worker_ids (`character()`)\cr
#' Ids of workers that are not yet completely started.
pre_worker_ids = function() {
r = self$connector
unlist(r$SMEMBERS(private$.get_key("pre_worker_ids")))
},

#' @field tasks (`character()`)\cr
#' Keys of all tasks.
tasks = function() {
Expand Down
3 changes: 3 additions & 0 deletions R/RushWorker.R
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ RushWorker = R6::R6Class("RushWorker",
"remote", self$remote,
"hostname", rush::get_hostname(),
"heartbeat", if (is.null(self$heartbeat)) NA_character_ else private$.get_worker_key("heartbeat")))

# remove from pre-started workers
r$SREM(sprintf("%s:pre_worker_ids", self$network_id), self$worker_id)
},

#' @description
Expand Down
4 changes: 2 additions & 2 deletions R/rush_plan.R
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@
#' rush
#' }
rush_plan = function(
n_workers,
n_workers = NULL,
config = NULL,
lgr_thresholds = NULL,
large_objects_path = NULL,
start_worker_timeout = Inf
) {
assert_count(n_workers)
assert_count(n_workers, null.ok = TRUE)
assert_class(config, "redis_config", null.ok = TRUE)
assert_vector(lgr_thresholds, names = "named", null.ok = TRUE)
assert_string(large_objects_path, null.ok = TRUE)
Expand Down
5 changes: 4 additions & 1 deletion R/start_worker.R
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ start_worker = function(
...
) {
checkmate::assert_string(network_id)
checkmate::assert_string(worker_id, null.ok = TRUE)
worker_id = checkmate::assert_string(worker_id, null.ok = TRUE) %??% uuid::UUIDgenerate()
checkmate::assert_flag(remote)

# connect to redis
Expand All @@ -36,6 +36,9 @@ start_worker = function(
config = redux::redis_config(config = config)
r = redux::hiredis(config)

# register to pre-started workers
r$SADD(sprintf("%s:pre_worker_ids", network_id), worker_id)

# wait for start arguments
while (!r$EXISTS(sprintf("%s:start_args", network_id))) {
lg$debug("Wait for start arguments for network '%s'.", network_id)
Expand Down
6 changes: 6 additions & 0 deletions man/Rush.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/rush_plan.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions tests/testthat/test-Rush.R
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ test_that("worker can be started with script", {
supervise = TRUE,
stderr = "|", stdout = "|")

Sys.sleep(5)

expect_string(rush$pre_worker_ids)

on.exit({
px$kill()
}, add = TRUE)
Expand All @@ -212,6 +216,7 @@ test_that("worker can be started with script", {
expect_true(px$is_alive())
expect_equal(rush$n_running_workers, 1)
expect_true(all(rush$worker_info$remote))
expect_null(rush$pre_worker_ids)

expect_rush_reset(rush, type = "terminate")
px$kill()
Expand Down

0 comments on commit 11105c8

Please sign in to comment.