diff --git a/.github/workflows/r-cmd-check.yml b/.github/workflows/r-cmd-check.yml index 14883cc..0ac9afa 100644 --- a/.github/workflows/r-cmd-check.yml +++ b/.github/workflows/r-cmd-check.yml @@ -57,3 +57,5 @@ jobs: limit-access-to-actor: true - uses: r-lib/actions/check-r-package@v2 + with: + args: 'c("--no-manual")' # "--as-cran" prevents to start external processes diff --git a/NAMESPACE b/NAMESPACE index 7708194..479e082 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -9,6 +9,7 @@ export(assert_rush_workers) export(assert_rushs) export(get_hostname) export(heartbeat) +export(remove_rush_plan) export(rsh) export(rush_available) export(rush_config) diff --git a/R/Rush.R b/R/Rush.R index 008fdea..f7d38f4 100644 --- a/R/Rush.R +++ b/R/Rush.R @@ -331,7 +331,7 @@ Rush = R6::R6Class("Rush", wait_for_workers = function(n, timeout = Inf) { assert_count(n) assert_number(timeout) - timeout = if (is.finite(timeout)) timeout else rush_config()$start_worker_timeout + timeout = if (is.finite(timeout)) timeout else rush_config()$start_worker_timeout %??% Inf start_time = Sys.time() while(self$n_workers < n) { diff --git a/R/rush_plan.R b/R/rush_plan.R index 35c2a4b..f2d32f5 100644 --- a/R/rush_plan.R +++ b/R/rush_plan.R @@ -9,6 +9,8 @@ #' If `NULL`, the `REDIS_URL` environment variable is parsed. #' If `REDIS_URL` is not set, a default configuration is used. #' See [redux::redis_config] for details. +#' @param start_worker_timeout (`numeric(1)`)\cr +#' The time in seconds to wait for a worker to start. #' #' @template param_n_workers #' @template param_lgr_thresholds @@ -55,6 +57,17 @@ rush_config = function() { start_worker_timeout = rush_env$start_worker_timeout) } +#' @title Remove Rush Plan +#' +#' @description +#' Removes the rush plan that was set by [rush_plan()]. +#' +#' @export +remove_rush_plan = function() { + rm(list = ls(envir = rush_env), envir = rush_env) + invisible(NULL) +} + #' @title Rush Available #' #' @description diff --git a/man/Rush.Rd b/man/Rush.Rd index 1ca0882..0daabda 100644 --- a/man/Rush.Rd +++ b/man/Rush.Rd @@ -346,6 +346,7 @@ This function takes the arguments \code{fun} and optionally \code{constants} whi \if{html}{\out{
}}\preformatted{Rush$start_workers( n_workers = NULL, wait_for_workers = TRUE, + timeout = Inf, globals = NULL, packages = NULL, heartbeat_period = NULL, @@ -367,6 +368,9 @@ Number of workers to be started.} \item{\code{wait_for_workers}}{(\code{logical(1)})\cr Whether to wait until all workers are available.} +\item{\code{timeout}}{(\code{numeric(1)})\cr +Timeout to wait for workers in seconds.} + \item{\code{globals}}{(\code{character()})\cr Global variables to be loaded to the workers global environment.} @@ -481,7 +485,7 @@ Arguments passed to \code{worker_loop}.} \subsection{Method \code{wait_for_workers()}}{ Wait until \code{n} workers are available. \subsection{Usage}{ -\if{html}{\out{
}}\preformatted{Rush$wait_for_workers(n)}\if{html}{\out{
}} +\if{html}{\out{
}}\preformatted{Rush$wait_for_workers(n, timeout = Inf)}\if{html}{\out{
}} } \subsection{Arguments}{ @@ -489,6 +493,10 @@ Wait until \code{n} workers are available. \describe{ \item{\code{n}}{(\code{integer(1)})\cr Number of workers to wait for.} + +\item{\code{timeout}}{(\code{numeric(1)})\cr +Timeout in seconds. +Default is \code{Inf}.} } \if{html}{\out{
}} } diff --git a/man/remove_rush_plan.Rd b/man/remove_rush_plan.Rd new file mode 100644 index 0000000..db94a73 --- /dev/null +++ b/man/remove_rush_plan.Rd @@ -0,0 +1,11 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/rush_plan.R +\name{remove_rush_plan} +\alias{remove_rush_plan} +\title{Remove Rush Plan} +\usage{ +remove_rush_plan() +} +\description{ +Removes the rush plan that was set by \code{\link[=rush_plan]{rush_plan()}}. +} diff --git a/man/rush_plan.Rd b/man/rush_plan.Rd index 2b94e57..e8e3fcf 100644 --- a/man/rush_plan.Rd +++ b/man/rush_plan.Rd @@ -8,7 +8,8 @@ rush_plan( n_workers, config = NULL, lgr_thresholds = NULL, - large_objects_path = NULL + large_objects_path = NULL, + start_worker_timeout = Inf ) } \arguments{ @@ -26,6 +27,9 @@ Logger threshold on the workers e.g. \code{c(rush = "debug")}.} \item{large_objects_path}{(\code{character(1)})\cr The path to the directory where large objects are stored.} + +\item{start_worker_timeout}{(\code{numeric(1)})\cr +The time in seconds to wait for a worker to start.} } \description{ Stores the number of workers and Redis configuration options (\link[redux:redis_config]{redux::redis_config}) for \link{Rush}. diff --git a/pkgdown/_pkgdown.yml b/pkgdown/_pkgdown.yml index 2e2c70e..d67f5d7 100644 --- a/pkgdown/_pkgdown.yml +++ b/pkgdown/_pkgdown.yml @@ -47,6 +47,7 @@ reference: - rush_plan - rush_available - rush_config + - remove_rush_plan - title: Worker Loop contents: - start_worker diff --git a/tests/testthat/_snaps/Rush.md b/tests/testthat/_snaps/Rush.md deleted file mode 100644 index c2214ac..0000000 --- a/tests/testthat/_snaps/Rush.md +++ /dev/null @@ -1,11 +0,0 @@ -# worker can be started with script - - Code - rush$create_worker_script(fun = fun) - Output - DEBUG (500): [rush] Pushing worker config to Redis - DEBUG (500): [rush] Serializing worker configuration to 2384528 bytes - INFO (400): [rush] Start worker with: - INFO (400): [rush] Rscript -e 'rush::start_worker(network_id = 'test-rush', hostname = 'host', url = 'redis://127.0.0.1:6379')' - INFO (400): [rush] See ?rush::start_worker for more details. - diff --git a/tests/testthat/helper.R b/tests/testthat/helper.R index 93a7e56..9c7b6cc 100644 --- a/tests/testthat/helper.R +++ b/tests/testthat/helper.R @@ -12,10 +12,9 @@ expect_rush_task = function(task) { } expect_rush_reset = function(rush, type = "kill") { + remove_rush_plan() processes = rush$processes rush$reset(type = type) expect_list(rush$connector$command(c("KEYS", "*")), len = 0) walk(processes, function(p) p$kill()) } - -lg$set_threshold("debug") diff --git a/tests/testthat/setup.R b/tests/testthat/setup.R new file mode 100644 index 0000000..af02590 --- /dev/null +++ b/tests/testthat/setup.R @@ -0,0 +1,13 @@ +old_opts = options( + warnPartialMatchArgs = TRUE, + warnPartialMatchAttr = TRUE, + warnPartialMatchDollar = TRUE +) + +# https://github.com/HenrikBengtsson/Wishlist-for-R/issues/88 +old_opts = lapply(old_opts, function(x) if (is.null(x)) FALSE else x) + +lg_rush = lgr::get_logger("rush") +old_threshold_rush = lg_rush$threshold +lg_rush$set_threshold(0) + diff --git a/tests/testthat/teardown.R b/tests/testthat/teardown.R new file mode 100644 index 0000000..de41d02 --- /dev/null +++ b/tests/testthat/teardown.R @@ -0,0 +1,2 @@ +options(old_opts) +lg_rush$set_threshold(old_threshold_rush) diff --git a/tests/testthat/test-Rush.R b/tests/testthat/test-Rush.R index 93777a7..d6f1173 100644 --- a/tests/testthat/test-Rush.R +++ b/tests/testthat/test-Rush.R @@ -2,7 +2,6 @@ test_that("constructing a rush controller works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -14,7 +13,6 @@ test_that("constructing a rush controller works", { test_that("workers are started", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -22,7 +20,7 @@ test_that("workers are started", { expect_data_table(rush$worker_info, nrows = 0) - worker_ids = rush$start_workers(fun = fun, n_workers = 2, lgr_threshold = c(rush = "debug"), wait_for_workers = TRUE) + worker_ids = rush$start_workers(fun = fun, n_workers = 2, lgr_thresholds = c(rush = "debug"), wait_for_workers = TRUE) expect_equal(rush$n_workers, 2) # check fields @@ -73,7 +71,6 @@ test_that("workers are started with Redis on unix socket", { test_that("workers are started with a heartbeat", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -90,7 +87,6 @@ test_that("workers are started with a heartbeat", { test_that("additional workers are started", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -117,7 +113,6 @@ test_that("additional workers are started", { test_that("packages are available on the worker", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -136,7 +131,6 @@ test_that("packages are available on the worker", { test_that("globals are available on the worker", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -157,7 +151,6 @@ test_that("globals are available on the worker", { test_that("named globals are available on the worker", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -180,16 +173,14 @@ test_that("named globals are available on the worker", { test_that("worker can be started with script", { skip_on_cran() - skip_on_ci() - skip_if(TRUE) set.seed(1) # make log messages reproducible root_logger = lgr::get_logger("root") - old_fmt = root_logger$appenders$cons$layout$fmt - root_logger$appenders$cons$layout$set_fmt("%L (%n): %m") + old_fmt = root_logger$appenders$console$layout$fmt + root_logger$appenders$console$layout$set_fmt("%L (%n): %m") on.exit({ - root_logger$appenders$cons$layout$set_fmt(old_fmt) + root_logger$appenders$console$layout$set_fmt(old_fmt) }) config = start_flush_redis() @@ -203,7 +194,6 @@ test_that("worker can be started with script", { test_that("a remote worker is started", { skip_on_cran() - skip_on_ci() config = start_flush_redis() fun = function(x1, x2, ...) list(y = x1 + x2) @@ -220,7 +210,6 @@ test_that("a remote worker is started", { test_that("a worker is terminated", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -251,7 +240,6 @@ test_that("a worker is terminated", { test_that("a local worker is killed", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -282,7 +270,6 @@ test_that("a local worker is killed", { test_that("a remote worker is killed via the heartbeat", { skip_on_cran() - skip_on_ci() skip_on_os("windows") config = start_flush_redis() @@ -318,7 +305,6 @@ test_that("a remote worker is killed via the heartbeat", { test_that("reading and writing a hash works with flatten", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") @@ -346,7 +332,6 @@ test_that("reading and writing a hash works with flatten", { test_that("reading and writing a hash works without flatten", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -374,7 +359,6 @@ test_that("reading and writing a hash works without flatten", { test_that("reading and writing hashes works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") @@ -414,7 +398,6 @@ test_that("reading and writing hashes works", { test_that("writing hashes to specific keys works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") @@ -437,7 +420,6 @@ test_that("writing hashes to specific keys works", { test_that("writing list columns works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") @@ -469,7 +451,6 @@ test_that("writing list columns works", { test_that("evaluating a task works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -508,7 +489,6 @@ test_that("evaluating a task works", { test_that("evaluating tasks works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -547,7 +527,6 @@ test_that("evaluating tasks works", { test_that("caching results works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -584,8 +563,7 @@ test_that("caching results works", { # segfault detection ----------------------------------------------------------- test_that("a segfault on a local worker is detected", { - skip_on_cran() - skip_on_ci() + skip_if(TRUE) # does not work in testthat on environment config = start_flush_redis() @@ -606,8 +584,7 @@ test_that("a segfault on a local worker is detected", { }) test_that("a segfault on a worker is detected via the heartbeat", { - skip_on_cran() - skip_on_ci() + skip_if(TRUE) # does not work in testthat on environment config = start_flush_redis() @@ -633,7 +610,6 @@ test_that("a segfault on a worker is detected via the heartbeat", { test_that("a simple error is catched", { skip_on_cran() - skip_on_ci() skip_if(TRUE) # does not work in testthat on environment config = start_flush_redis() @@ -681,7 +657,6 @@ test_that("a simple error is catched", { test_that("a lost task is detected", { skip_on_cran() - skip_on_ci() skip_if(TRUE) # does not work in testthat on environment config = start_flush_redis() @@ -729,7 +704,6 @@ test_that("a lost task is detected", { test_that("a lost task is detected when waiting", { skip_on_cran() - skip_on_ci() skip_if(TRUE) # does not work in testthat on environment config = start_flush_redis() @@ -776,7 +750,6 @@ test_that("a lost task is detected when waiting", { test_that("restarting a worker works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -798,8 +771,7 @@ test_that("restarting a worker works", { test_that("restarting a worker kills the worker", { skip_on_cran() - skip_on_ci() - skip_on_windows() + skip_on_os("windows") config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -811,6 +783,9 @@ test_that("restarting a worker kills the worker", { expect_true(tools::pskill(pid, signal = 0)) rush$restart_workers(worker_ids = worker_id) + + Sys.sleep(1) + expect_false(pid == rush$worker_info$pid) expect_false(tools::pskill(pid, signal = 0)) @@ -821,7 +796,6 @@ test_that("restarting a worker kills the worker", { test_that("blocking on new results works", { skip_on_cran() - skip_on_ci() skip_if(TRUE) # does not work in testthat on environment config = start_flush_redis() @@ -843,7 +817,6 @@ test_that("blocking on new results works", { test_that("wait for tasks works when a task gets lost", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -866,7 +839,6 @@ test_that("wait for tasks works when a task gets lost", { test_that("saving lgr logs works", { skip_on_cran() - skip_on_ci() skip_if(TRUE) # does not work in testthat on environment config = start_flush_redis() @@ -898,7 +870,6 @@ test_that("saving lgr logs works", { test_that("snapshot option works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -918,7 +889,6 @@ test_that("snapshot option works", { test_that("terminating workers on idle works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -937,7 +907,6 @@ test_that("terminating workers on idle works", { test_that("constants works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -957,7 +926,6 @@ test_that("constants works", { test_that("network without controller works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -993,7 +961,6 @@ test_that("network without controller works", { test_that("seeds are generated from regular rng seed", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config, seed = 123) @@ -1009,7 +976,6 @@ test_that("seeds are generated from regular rng seed", { test_that("seed are generated from L'Ecuyer-CMRG seed", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config, seed = c(10407L, 1801422725L, -2057975723L, 1156894209L, 1595475487L, 210384600L, -1655729657L)) @@ -1025,7 +991,6 @@ test_that("seed are generated from L'Ecuyer-CMRG seed", { test_that("seed is set correctly on two workers", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config, seed = 123) @@ -1051,9 +1016,13 @@ test_that("seed is set correctly on two workers", { test_that("printing logs with redis appender works", { skip_on_cran() - skip_on_ci() skip_if(TRUE) # does not work in testthat on environment + lg_rush = lgr::get_logger("rush") + old_threshold_rush = lg_rush$threshold + on.exit(lg_rush$set_threshold(old_threshold_rush)) + lg_rush$set_threshold("info") + config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config, seed = 123) fun = function(x1, x2, ...) { @@ -1081,8 +1050,7 @@ test_that("printing logs with redis appender works", { }) test_that("redis info works", { - skip_on_cran() - skip_on_ci() + config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -1093,7 +1061,6 @@ test_that("redis info works", { test_that("evaluating a task works", { skip_on_cran() - skip_on_ci() skip_if(TRUE) # takes too long config = start_flush_redis() diff --git a/tests/testthat/test-RushWorker.R b/tests/testthat/test-RushWorker.R index 926d03f..21543af 100644 --- a/tests/testthat/test-RushWorker.R +++ b/tests/testthat/test-RushWorker.R @@ -1,6 +1,5 @@ test_that("constructing a rush worker works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") @@ -24,7 +23,6 @@ test_that("constructing a rush worker works", { test_that("active bindings work after construction", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") @@ -49,7 +47,6 @@ test_that("active bindings work after construction", { test_that("a worker is registered", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") @@ -70,7 +67,6 @@ test_that("a worker is registered", { test_that("a worker is terminated", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") @@ -85,7 +81,6 @@ test_that("a worker is terminated", { test_that("a heartbeat is started", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local", heartbeat_period = 3) @@ -99,7 +94,6 @@ test_that("a heartbeat is started", { test_that("pushing a task to the queue works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") @@ -139,7 +133,6 @@ test_that("pushing a task to the queue works", { test_that("pushing a task with extras to the queue works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") @@ -182,7 +175,6 @@ test_that("pushing a task with extras to the queue works", { test_that("pushing tasks to the queue works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") @@ -223,7 +215,6 @@ test_that("pushing tasks to the queue works", { test_that("pushing tasks with extras to the queue works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") @@ -267,7 +258,6 @@ test_that("pushing tasks with extras to the queue works", { test_that("popping a task from the queue works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") @@ -310,7 +300,6 @@ test_that("popping a task from the queue works", { test_that("popping a task with seed, max_retries and timeout works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") @@ -318,7 +307,7 @@ test_that("popping a task with seed, max_retries and timeout works", { seed = 123456 max_retries = 2 timeout = 1 - rush$push_tasks(xss, seed = list(seed), max_retries = max_retries, timeout = timeout) + rush$push_tasks(xss, seeds = list(seed), max_retries = max_retries, timeouts = timeout) # check task task = rush$pop_task(fields = c("xs", "seed", "max_retries", "timeout")) @@ -359,7 +348,6 @@ test_that("popping a task with seed, max_retries and timeout works", { test_that("pushing a finished task works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") @@ -401,7 +389,6 @@ test_that("pushing a finished task works", { test_that("pushing a failed tasks works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") @@ -409,7 +396,7 @@ test_that("pushing a failed tasks works", { rush$push_tasks(xss) task = rush$pop_task() - rush$push_failed(task$key, condition = list(list(message = "error"))) + rush$push_failed(task$key, conditions = list(list(message = "error"))) # check task count expect_equal(rush$n_tasks, 1) @@ -443,7 +430,11 @@ test_that("pushing a failed tasks works", { test_that("retry a failed task works", { skip_on_cran() - skip_on_ci() + + lg_rush = lgr::get_logger("rush") + old_threshold_rush = lg_rush$threshold + on.exit(lg_rush$set_threshold(old_threshold_rush)) + lg_rush$set_threshold("info") config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") @@ -453,7 +444,7 @@ test_that("retry a failed task works", { expect_output(rush$retry_tasks(keys), "Not all task") - rush$push_failed(task$key, condition = list(list(message = "error"))) + rush$push_failed(task$key, conditions = list(list(message = "error"))) expect_equal(rush$n_queued_tasks, 0) expect_equal(rush$n_failed_tasks, 1) @@ -470,17 +461,16 @@ test_that("retry a failed task works", { test_that("retry a failed task works and setting a new seed works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") xss = list(list(x1 = 1, x2 = 2)) seed = c(10407L, 1280795612L, -169270483L, -442010614L, -603558397L, -222347416L, 1489374793L) - keys = rush$push_tasks(xss, seed = list(seed)) + keys = rush$push_tasks(xss, seeds = list(seed)) task = rush$pop_task(fields = c("xs", "seed")) expect_equal(task$seed, seed) - rush$push_failed(task$key, condition = list(list(message = "error"))) + rush$push_failed(task$key, conditions = list(list(message = "error"))) expect_equal(rush$n_queued_tasks, 0) expect_equal(rush$n_failed_tasks, 1) @@ -495,8 +485,10 @@ test_that("retry a failed task works and setting a new seed works", { }) test_that("retry a failed task works with a maximum of retries", { - skip_on_cran() - skip_on_ci() + lg_rush = lgr::get_logger("rush") + old_threshold_rush = lg_rush$threshold + on.exit(lg_rush$set_threshold(old_threshold_rush)) + lg_rush$set_threshold("info") config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") @@ -508,7 +500,7 @@ test_that("retry a failed task works with a maximum of retries", { expect_null(task$n_retries) expect_output(rush$retry_tasks(keys), "Not all task") - rush$push_failed(task$key, condition = list(list(message = "error"))) + rush$push_failed(task$key, conditions = list(list(message = "error"))) expect_equal(rush$n_queued_tasks, 0) expect_equal(rush$n_failed_tasks, 1) @@ -524,7 +516,7 @@ test_that("retry a failed task works with a maximum of retries", { expect_false(rush$is_failed_task(task$key)) task = rush$pop_task() - rush$push_failed(task$key, condition = list(list(message = "error"))) + rush$push_failed(task$key, conditions = list(list(message = "error"))) expect_output(rush$retry_tasks(keys), "reached the maximum number of retries") rush$retry_tasks(keys, ignore_max_retries = TRUE) @@ -540,7 +532,11 @@ test_that("retry a failed task works with a maximum of retries", { test_that("retry failed tasks works", { skip_on_cran() - skip_on_ci() + + lg_rush = lgr::get_logger("rush") + old_threshold_rush = lg_rush$threshold + on.exit(lg_rush$set_threshold(old_threshold_rush)) + lg_rush$set_threshold("info") config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") @@ -552,7 +548,7 @@ test_that("retry failed tasks works", { expect_output(rush$retry_tasks(keys), "Not all task") - rush$push_failed(keys, condition = list(list(message = "error"))) + rush$push_failed(keys, conditions = list(list(message = "error"))) expect_equal(rush$n_queued_tasks, 0) expect_equal(rush$n_failed_tasks, 2) @@ -569,7 +565,6 @@ test_that("retry failed tasks works", { test_that("moving and fetching tasks works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") @@ -618,7 +613,7 @@ test_that("moving and fetching tasks works", { # push failed task task = rush$pop_task() - rush$push_failed(task$key, condition = list(list(message = "error"))) + rush$push_failed(task$key, conditions = list(list(message = "error"))) queued_tasks = rush$fetch_queued_tasks() expect_data_table(queued_tasks, nrows = 1) expect_character(queued_tasks$keys, unique = TRUE) @@ -645,7 +640,6 @@ test_that("moving and fetching tasks works", { test_that("fetching as list works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") @@ -692,7 +686,7 @@ test_that("fetching as list works", { # push failed task task = rush$pop_task() - rush$push_failed(task$key, condition = list(list(message = "error"))) + rush$push_failed(task$key, conditions = list(list(message = "error"))) failed_tasks = rush$fetch_failed_tasks(data_format = "list") expect_list(failed_tasks, len = 1) expect_names(names(failed_tasks), identical.to = task$key) @@ -702,7 +696,6 @@ test_that("fetching as list works", { test_that("fetch task with states works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local", seed = 123) @@ -747,7 +740,7 @@ test_that("fetch task with states works", { xss = list(list(x1 = 2, x2 = 2)) rush$push_tasks(xss) task_2 = rush$pop_task() - rush$push_failed(task_2$key, condition = list(list(message = "error"))) + rush$push_failed(task_2$key, conditions = list(list(message = "error"))) tab = rush$fetch_tasks_with_state() expect_data_table(tab, nrows = 2) expect_equal(tab$state, c("finished", "failed")) @@ -760,7 +753,6 @@ test_that("fetch task with states works", { test_that("latest results are fetched", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") @@ -803,7 +795,6 @@ test_that("latest results are fetched", { test_that("priority queues work", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -857,7 +848,6 @@ test_that("priority queues work", { test_that("redirecting to shared queue works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -888,7 +878,6 @@ test_that("redirecting to shared queue works", { test_that("mixing priority queue and shared queue works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -910,7 +899,7 @@ test_that("mixing priority queue and shared queue works", { test_that("saving logs with redis appender works", { skip_on_cran() - skip_on_ci() + appenders = lgr::get_logger("root")$appenders on.exit({ @@ -945,7 +934,7 @@ test_that("saving logs with redis appender works", { test_that("settings the buffer size in redis appender works", { skip_on_cran() - skip_on_ci() + appenders = lgr::get_logger("root")$appenders on.exit({ @@ -977,7 +966,6 @@ test_that("settings the buffer size in redis appender works", { test_that("pushing tasks and terminating worker works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") @@ -998,7 +986,6 @@ test_that("pushing tasks and terminating worker works", { test_that("terminate on idle works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") @@ -1018,7 +1005,6 @@ test_that("terminate on idle works", { test_that("popping a task with seed from the queue works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local", seed = 123) @@ -1036,7 +1022,6 @@ test_that("popping a task with seed from the queue works", { test_that("task in states works", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local", seed = 123) @@ -1076,7 +1061,7 @@ test_that("task in states works", { xss = list(list(x1 = 2, x2 = 2)) keys = rush$push_tasks(xss) task_2 = rush$pop_task() - rush$push_failed(task_2$key, condition = list(list(message = "error"))) + rush$push_failed(task_2$key, conditions = list(list(message = "error"))) keys_list = rush$tasks_with_state(c("queued", "running", "finished", "failed")) expect_list(keys_list, len = 4) expect_names(names(keys_list), identical.to = c("queued", "running", "finished", "failed")) diff --git a/tests/testthat/test-rush_plan.R b/tests/testthat/test-rush_plan.R index a315ce5..f9ac09d 100644 --- a/tests/testthat/test-rush_plan.R +++ b/tests/testthat/test-rush_plan.R @@ -1,7 +1,4 @@ test_that("rush_plan family works", { - skip_on_cran() - skip_on_ci() - expect_false(rush_available()) config = redis_config() rush_plan(n_workers = 2, config) @@ -13,16 +10,12 @@ test_that("rush_plan family works", { }) test_that("rush_plan throws and error if redis is not available", { - skip_on_cran() - skip_on_ci() - config = redis_config(url = "redis://localhost:1234") expect_error(rush_plan(n_workers = 2, config), "Can't connect to Redis") }) test_that("start workers", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush_plan(n_workers = 2, config) @@ -40,7 +33,11 @@ test_that("start workers", { test_that("set threshold", { skip_on_cran() - skip_on_ci() + + lg_rush = lgr::get_logger("rush") + old_threshold_rush = lg_rush$threshold + on.exit(lg_rush$set_threshold(old_threshold_rush)) + lg_rush$set_threshold("debug") config = start_flush_redis() rush_plan(n_workers = 2, config, lgr_thresholds = c(rush = "debug")) @@ -57,7 +54,6 @@ test_that("set threshold", { test_that("set start worker timeout", { skip_on_cran() - skip_on_ci() config = start_flush_redis() rush_plan(n_workers = 2, config, start_worker_timeout = -Inf) @@ -67,4 +63,6 @@ test_that("set start worker timeout", { rush = rsh("test-rush") fun = function(x1, x2, ...) list(y = x1 + x2) expect_error(rush$start_workers(fun = fun), "Timeout waiting") + + expect_rush_reset(rush) }) diff --git a/tests/testthat/test-worker_loops.R b/tests/testthat/test-worker_loops.R index 954b1cd..1a0a2e3 100644 --- a/tests/testthat/test-worker_loops.R +++ b/tests/testthat/test-worker_loops.R @@ -1,6 +1,8 @@ # default ---------------------------------------------------------------------- test_that("worker_loop_default works", { + skip_on_cran() + config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") xss = list(list(x1 = 1, x2 = 2)) @@ -14,6 +16,8 @@ test_that("worker_loop_default works", { }) test_that("worker_loop_default works with failed task", { + skip_on_cran() + config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") xss = list(list(x1 = 1, x2 = 2)) @@ -28,6 +32,8 @@ test_that("worker_loop_default works with failed task", { }) test_that("worker_loop_default retries failed task", { + skip_on_cran() + config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") xss = list(list(x1 = 1, x2 = 2)) @@ -42,6 +48,8 @@ test_that("worker_loop_default retries failed task", { }) test_that("worker_loop_default sets seed is set correctly", { + skip_on_cran() + config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local", seed = 123456) xss = list(list(x1 = 1, x2 = 2), list(x1 = 2, x2 = 2), list(x1 = 3, x2 = 2)) @@ -58,6 +66,8 @@ test_that("worker_loop_default sets seed is set correctly", { # callr ------------------------------------------------------------------------ test_that("worker_loop_callr works", { + skip_on_cran() + config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") xss = list(list(x1 = 1, x2 = 2)) @@ -71,6 +81,8 @@ test_that("worker_loop_callr works", { }) test_that("worker_loop_callr works with failed task", { + skip_on_cran() + config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") xss = list(list(x1 = 1, x2 = 2)) @@ -85,6 +97,8 @@ test_that("worker_loop_callr works with failed task", { }) test_that("worker_loop_callr works with lost task", { + skip_on_cran() + config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") xss = list(list(x1 = 1, x2 = 2)) @@ -99,6 +113,8 @@ test_that("worker_loop_callr works with lost task", { }) test_that("worker_loop_callr works with timeout", { + skip_on_cran() + config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") xss = list(list(x1 = 1, x2 = 2)) @@ -113,6 +129,8 @@ test_that("worker_loop_callr works with timeout", { }) test_that("worker_loop_callr sets seed correctly", { + skip_on_cran() + config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local", seed = 123456) xss = list(list(x1 = 1, x2 = 2), list(x1 = 2, x2 = 2), list(x1 = 3, x2 = 2))