Skip to content

Commit

Permalink
refactor: set worker arguments only once
Browse files Browse the repository at this point in the history
  • Loading branch information
be-marc committed Feb 6, 2024
1 parent 3fb462a commit 36b02cb
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 67 deletions.
65 changes: 58 additions & 7 deletions R/Rush.R
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,10 @@ Rush = R6::R6Class("Rush",
#'
#' @param n_workers (`integer(1)`)\cr
#' Number of workers to be started.
#' @param supervise (`logical(1)`)\cr
#' Whether to kill the workers when the main R process is shut down.
#' @param wait_for_workers (`logical(1)`)\cr
#' Whether to wait until all workers are available.
#' @param supervise (`logical(1)`)\cr
#' Whether to kill the workers when the main R process is shut down.
#' @param ... (`any`)\cr
#' Arguments passed to `worker_loop`.
start_workers = function(
Expand All @@ -218,6 +218,11 @@ Rush = R6::R6Class("Rush",
assert_flag(supervise)
r = self$connector

if (self$has_start_arguments) {
# too reduce the complexity, we don't allow to start workers with different arguments for now.
stop("Worker configuration is already set. Use `$add_workers()` to add additional workers or `$stop_workers()` to stop the workers and start with new arguments.")
}

# push worker config to redis
private$.push_worker_config(
globals = globals,
Expand All @@ -230,21 +235,54 @@ Rush = R6::R6Class("Rush",
...
)

lg$debug("Starting %i worker(s)", n_workers)

worker_ids = uuid::UUIDgenerate(n = n_workers)
self$processes = c(self$processes, set_names(map(worker_ids, function(worker_id) {
self$processes = set_names(map(worker_ids, function(worker_id) {
processx::process$new("Rscript",
args = c("-e", sprintf("rush::start_worker(network_id = '%s', worker_id = '%s', hostname = '%s', url = '%s')",
self$network_id, worker_id, private$.hostname, self$config$url)),
supervise = supervise, stdout = "|", stderr = "|") # , stdout = "|", stderr = "|"
}), worker_ids))
}), worker_ids)

if (wait_for_workers) self$wait_for_workers(n_workers)

return(invisible(worker_ids))
},

#' @description
#' Restart workers.
#' Add local workers to the network.
#'
#' @param n_workers (`integer(1)`)\cr
#' Number of workers to be started.
#' @param wait_for_workers (`logical(1)`)\cr
#' Whether to wait until all workers are available.
#' @param supervise (`logical(1)`)\cr
#' Whether to kill the workers when the main R process is shut down.
add_workers = function(n_workers, wait_for_workers = TRUE, supervise = TRUE) {
assert_count(n_workers)
assert_flag(wait_for_workers)
assert_flag(supervise)
r = self$connector
n_running_workers = self$n_running_workers

lg$debug("Starting %i worker(s)", n_workers)

worker_ids = uuid::UUIDgenerate(n = n_workers)
self$processes = c(self$processes, set_names(map(worker_ids, function(worker_id) {
processx::process$new("Rscript",
args = c("-e", sprintf("rush::start_worker(network_id = '%s', worker_id = '%s', hostname = '%s', url = '%s')",
self$network_id, worker_id, private$.hostname, self$config$url)),
supervise = TRUE, stdout = "|", stderr = "|")
}), worker_ids))

if (wait_for_workers) self$wait_for_workers(n_workers + n_running_workers)

return(invisible(worker_ids))
},

#' @description
#' Restart local workers.
#'
#' @param worker_ids (`character()`)\cr
#' Worker ids to be restarted.
Expand Down Expand Up @@ -281,6 +319,11 @@ Rush = R6::R6Class("Rush",
...
) {

if (self$has_start_arguments) {
# too reduce the complexity, we don't allow to start workers with different arguments for now.
lg$warn("Worker configuration is already set. Use `$stop_workers()` to stop the workers and start with new arguments.")
}

# push worker config to redis
private$.push_worker_config(
globals = globals,
Expand Down Expand Up @@ -1043,8 +1086,8 @@ Rush = R6::R6Class("Rush",
#' @description
#' Reads a single Redis hash and returns the values as a list named by the fields.
#'
#' @param keys (`character()`)\cr
#' Keys of the hashes.
#' @param key (`character(1)`)\cr
#' Key of the hash.
#' @param fields (`character()`)\cr
#' Fields to be read from the hashes.
#'
Expand Down Expand Up @@ -1276,6 +1319,14 @@ Rush = R6::R6Class("Rush",
r = self$connector
r$command(c("CONFIG", "SET", "save", str_collapse(rhs, sep = " ")))
private$.snapshot_schedule = rhs
},

#' @field has_start_arguments (`logical(1)`)\cr
#' Whether the start arguments for the workers are set.
has_start_arguments = function(rhs) {
assert_ro_binding(rhs)
r = self$connector
as.logical(r$EXISTS(private$.get_key("start_args")))
}
),

Expand Down
89 changes: 67 additions & 22 deletions man/Rush.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 3 additions & 37 deletions man/RushWorker.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkgdown/_pkgdown.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,4 @@ reference:
contents:
- get_hostname
- rush-package
- with_rng_state
Loading

0 comments on commit 36b02cb

Please sign in to comment.