Skip to content

Commit

Permalink
Revert "refactor: set worker arguments only once (#31)" (#32)
Browse files Browse the repository at this point in the history
This reverts commit 85c2ffc.
  • Loading branch information
be-marc authored Feb 6, 2024
1 parent 85c2ffc commit c12a539
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 154 deletions.
65 changes: 7 additions & 58 deletions R/Rush.R
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,10 @@ Rush = R6::R6Class("Rush",
#'
#' @param n_workers (`integer(1)`)\cr
#' Number of workers to be started.
#' @param wait_for_workers (`logical(1)`)\cr
#' Whether to wait until all workers are available.
#' @param supervise (`logical(1)`)\cr
#' Whether to kill the workers when the main R process is shut down.
#' @param wait_for_workers (`logical(1)`)\cr
#' Whether to wait until all workers are available.
#' @param ... (`any`)\cr
#' Arguments passed to `worker_loop`.
start_workers = function(
Expand All @@ -218,11 +218,6 @@ Rush = R6::R6Class("Rush",
assert_flag(supervise)
r = self$connector

if (self$has_start_arguments) {
# too reduce the complexity, we don't allow to start workers with different arguments for now.
stop("Worker configuration is already set. Use `$add_workers()` to add additional workers or `$stop_workers()` to stop the workers and start with new arguments.")
}

# push worker config to redis
private$.push_worker_config(
globals = globals,
Expand All @@ -235,54 +230,21 @@ Rush = R6::R6Class("Rush",
...
)

lg$debug("Starting %i worker(s)", n_workers)

worker_ids = uuid::UUIDgenerate(n = n_workers)
self$processes = set_names(map(worker_ids, function(worker_id) {
self$processes = c(self$processes, set_names(map(worker_ids, function(worker_id) {
processx::process$new("Rscript",
args = c("-e", sprintf("rush::start_worker(network_id = '%s', worker_id = '%s', hostname = '%s', url = '%s')",
self$network_id, worker_id, private$.hostname, self$config$url)),
supervise = supervise, stdout = "|", stderr = "|") # , stdout = "|", stderr = "|"
}), worker_ids)

if (wait_for_workers) self$wait_for_workers(n_workers)

return(invisible(worker_ids))
},

#' @description
#' Add local workers to the network.
#'
#' @param n_workers (`integer(1)`)\cr
#' Number of workers to be started.
#' @param wait_for_workers (`logical(1)`)\cr
#' Whether to wait until all workers are available.
#' @param supervise (`logical(1)`)\cr
#' Whether to kill the workers when the main R process is shut down.
add_workers = function(n_workers, wait_for_workers = TRUE, supervise = TRUE) {
assert_count(n_workers)
assert_flag(wait_for_workers)
assert_flag(supervise)
r = self$connector
n_running_workers = self$n_running_workers

lg$debug("Starting %i worker(s)", n_workers)

worker_ids = uuid::UUIDgenerate(n = n_workers)
self$processes = c(self$processes, set_names(map(worker_ids, function(worker_id) {
processx::process$new("Rscript",
args = c("-e", sprintf("rush::start_worker(network_id = '%s', worker_id = '%s', hostname = '%s', url = '%s')",
self$network_id, worker_id, private$.hostname, self$config$url)),
supervise = TRUE, stdout = "|", stderr = "|")
}), worker_ids))

if (wait_for_workers) self$wait_for_workers(n_workers + n_running_workers)
if (wait_for_workers) self$wait_for_workers(n_workers)

return(invisible(worker_ids))
},

#' @description
#' Restart local workers.
#' Restart workers.
#'
#' @param worker_ids (`character()`)\cr
#' Worker ids to be restarted.
Expand Down Expand Up @@ -319,11 +281,6 @@ Rush = R6::R6Class("Rush",
...
) {

if (self$has_start_arguments) {
# too reduce the complexity, we don't allow to start workers with different arguments for now.
lg$warn("Worker configuration is already set. Use `$stop_workers()` to stop the workers and start with new arguments.")
}

# push worker config to redis
private$.push_worker_config(
globals = globals,
Expand Down Expand Up @@ -1086,8 +1043,8 @@ Rush = R6::R6Class("Rush",
#' @description
#' Reads a single Redis hash and returns the values as a list named by the fields.
#'
#' @param key (`character(1)`)\cr
#' Key of the hash.
#' @param keys (`character()`)\cr
#' Keys of the hashes.
#' @param fields (`character()`)\cr
#' Fields to be read from the hashes.
#'
Expand Down Expand Up @@ -1319,14 +1276,6 @@ Rush = R6::R6Class("Rush",
r = self$connector
r$command(c("CONFIG", "SET", "save", str_collapse(rhs, sep = " ")))
private$.snapshot_schedule = rhs
},

#' @field has_start_arguments (`logical(1)`)\cr
#' Whether the start arguments for the workers are set.
has_start_arguments = function(rhs) {
assert_ro_binding(rhs)
r = self$connector
as.logical(r$EXISTS(private$.get_key("start_args")))
}
),

Expand Down
89 changes: 22 additions & 67 deletions man/Rush.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 37 additions & 3 deletions man/RushWorker.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion pkgdown/_pkgdown.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,3 @@ reference:
contents:
- get_hostname
- rush-package
- with_rng_state
Loading

0 comments on commit c12a539

Please sign in to comment.