diff --git a/NAMESPACE b/NAMESPACE index 8a971a5..51a2c55 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -2,8 +2,8 @@ export(Rush) export(RushWorker) -export(fun_heartbeat) export(get_hostname) +export(heartbeat) export(rsh) export(rush_available) export(rush_config) diff --git a/R/Rush.R b/R/Rush.R index f4544a4..04e4827 100644 --- a/R/Rush.R +++ b/R/Rush.R @@ -159,6 +159,8 @@ Rush = R6::R6Class("Rush", #' Start workers locally with `processx`. #' The [processx::process] are stored in `$processes`. #' Alternatively, use `$create_worker_script()` to create a script for starting workers on remote machines. + #' By default, [worker_loop_default()] is used as worker loop. + #' This function takes the arguments `fun` and optionally `constants` which are passed in `...`. #' #' @param n_workers (`integer(1)`)\cr #' Number of workers to be started. diff --git a/R/RushWorker.R b/R/RushWorker.R index b54aad9..d0f7041 100644 --- a/R/RushWorker.R +++ b/R/RushWorker.R @@ -60,7 +60,7 @@ RushWorker = R6::R6Class("RushWorker", heartbeat_expire = heartbeat_expire, pid = Sys.getpid() ) - self$heartbeat = callr::r_bg(fun_heartbeat, args = heartbeat_args, supervise = TRUE) + self$heartbeat = callr::r_bg(heartbeat, args = heartbeat_args, supervise = TRUE) # wait until heartbeat process is able to work Sys.sleep(1) diff --git a/R/heartbeat_loops.R b/R/heartbeat_loops.R index 8768f87..198c196 100644 --- a/R/heartbeat_loops.R +++ b/R/heartbeat_loops.R @@ -15,7 +15,7 @@ #' @template param_heartbeat_expire #' #' @export -fun_heartbeat = function(network_id, config, worker_id, heartbeat_period, heartbeat_expire, pid) { +heartbeat = function(network_id, config, worker_id, heartbeat_period, heartbeat_expire, pid) { r = redux::hiredis(config) worker_id_key = sprintf("%s:%s", network_id, worker_id) heartbeat_key = sprintf("%s:%s:heartbeat", network_id, worker_id) diff --git a/R/worker_loops.R b/R/worker_loops.R index ae0b7f7..b1064e8 100644 --- a/R/worker_loops.R +++ b/R/worker_loops.R @@ -7,18 +7,21 @@ #' #' @param fun (`function`)\cr #' Function to be executed. +#' @param constants (`list`)\cr +#' List of constants passed to `fun`. #' @param rush ([RushWorker])\cr #' Rush worker instance. #' #' @export -worker_loop_default = function(fun, rush) { +worker_loop_default = function(fun, constants = NULL, rush) { assert_function(fun) + assert_list(constants, null.ok = TRUE, names = "named") while(!rush$terminated) { task = rush$pop_task() if (!is.null(task)) { tryCatch({ - ys = mlr3misc::invoke(fun, .args = task$xs) + ys = mlr3misc::invoke(fun, .args = c(task$xs, constants)) rush$push_results(task$key, yss = list(ys)) }, error = function(e) { condition = list(message = e$message) diff --git a/man/Rush.Rd b/man/Rush.Rd index 1bc86f1..f06adea 100644 --- a/man/Rush.Rd +++ b/man/Rush.Rd @@ -302,6 +302,8 @@ Print method. Start workers locally with \code{processx}. The \link[processx:process]{processx::process} are stored in \verb{$processes}. Alternatively, use \verb{$create_worker_script()} to create a script for starting workers on remote machines. +By default, \code{\link[=worker_loop_default]{worker_loop_default()}} is used as worker loop. +This function takes the arguments \code{fun} and optionally \code{constants} which are passed in \code{...}. \subsection{Usage}{ \if{html}{\out{
}}\preformatted{Rush$start_workers( n_workers = NULL, diff --git a/man/fun_heartbeat.Rd b/man/heartbeat.Rd similarity index 94% rename from man/fun_heartbeat.Rd rename to man/heartbeat.Rd index 6554904..098d95c 100644 --- a/man/fun_heartbeat.Rd +++ b/man/heartbeat.Rd @@ -1,10 +1,10 @@ % Generated by roxygen2: do not edit by hand % Please edit documentation in R/heartbeat_loops.R -\name{fun_heartbeat} -\alias{fun_heartbeat} +\name{heartbeat} +\alias{heartbeat} \title{Heartbeat Loop} \usage{ -fun_heartbeat( +heartbeat( network_id, config, worker_id, diff --git a/man/worker_loop_default.Rd b/man/worker_loop_default.Rd index fcfb3c8..4e33ce3 100644 --- a/man/worker_loop_default.Rd +++ b/man/worker_loop_default.Rd @@ -4,12 +4,15 @@ \alias{worker_loop_default} \title{Single Task Worker Loop} \usage{ -worker_loop_default(fun, rush) +worker_loop_default(fun, constants = NULL, rush) } \arguments{ \item{fun}{(\code{function})\cr Function to be executed.} +\item{constants}{(\code{list})\cr +List of constants passed to \code{fun}.} + \item{rush}{(\link{RushWorker})\cr Rush worker instance.} } diff --git a/pkgdown/_pkgdown.yml b/pkgdown/_pkgdown.yml index 923d5e2..b6118df 100644 --- a/pkgdown/_pkgdown.yml +++ b/pkgdown/_pkgdown.yml @@ -42,9 +42,16 @@ reference: - Rush - RushWorker - rsh - - title: Misc + - title: Plan + - rush_plan + - rush_available + - rush_config + - title: Worker Loop contents: - - worker_loop_default - start_worker - - fun_heartbeat + - worker_loop_default + - heartbeat + - title: Misc + contents: + - get_hostname - rush-package diff --git a/tests/testthat/test-Rush.R b/tests/testthat/test-Rush.R index f635e8a..491cc40 100644 --- a/tests/testthat/test-Rush.R +++ b/tests/testthat/test-Rush.R @@ -186,6 +186,34 @@ test_that("a remote worker is started", { expect_set_equal(rush$worker_info$host, "remote") }) +# start workers with script ---------------------------------------------------- + +test_that("worker can be started with script", { + # skip_on_cran() + skip_on_ci() + skip_if(TRUE) + + config = start_flush_redis() + rush = Rush$new(network_id = "test-rush", config = config) + fun = function(x1, x2, ...) list(y = x1 + x2) + + rush$create_worker_script(fun = fun) +}) + +test_that("a remote worker is started", { + # skip_on_cran() + + config = start_flush_redis() + fun = function(x1, x2, ...) list(y = x1 + x2) + rush = Rush$new(network_id = "test-rush", config = config) + + withr::with_envvar(list("HOST" = "remote_host"), { + rush$start_workers(fun = fun, n_workers = 2, heartbeat_period = 1, heartbeat_expire = 2, await_workers = TRUE) + }) + + expect_set_equal(rush$worker_info$host, "remote") +}) + # stop workers ----------------------------------------------------------------- test_that("a worker is terminated", { @@ -683,6 +711,23 @@ test_that("terminating workers on idle works", { expect_rush_reset(rush) }) +test_that("constants works", { + # skip_on_cran() + + config = start_flush_redis() + rush = Rush$new(network_id = "test-rush", config = config) + fun = function(x1, x2, x3, ...) list(y = x1 + x2 + x3) + rush$start_workers(fun = fun, n_workers = 4, constants = list(x3 = 5), await_workers = TRUE) + + xss = list(list(x1 = 1, x2 = 2)) + keys = rush$push_tasks(xss) + rush$await_tasks(keys) + + expect_equal(rush$fetch_finished_tasks()$y, 8) + + expect_rush_reset(rush) +}) + # rush network without controller ---------------------------------------------- test_that("network without controller works", { diff --git a/tests/testthat/test-rush_plan.R b/tests/testthat/test-rush_plan.R index 77a5984..02adc3f 100644 --- a/tests/testthat/test-rush_plan.R +++ b/tests/testthat/test-rush_plan.R @@ -2,7 +2,6 @@ test_that("rush_plan family works", { skip_on_cran() skip_on_ci() - expect_false(rush_available()) config = redis_config() rush_plan(n_workers = 2, config)