From be8c1b24a22d3b624072d51369e42f80493cb9f7 Mon Sep 17 00:00:00 2001 From: be-marc Date: Tue, 2 Apr 2024 18:24:28 +0200 Subject: [PATCH] docs: fix parameter description --- R/Rush.R | 20 ++++++++++++-------- R/rush_plan.R | 1 + man-roxygen/param_large_objects_path.R | 2 ++ man/Rush.Rd | 10 ++++------ man/rush_plan.Rd | 10 +++++++++- man/store_large_object.Rd | 18 ++++++++++++++++++ tests/testthat/test-RushWorker.R | 2 +- 7 files changed, 47 insertions(+), 16 deletions(-) create mode 100644 man-roxygen/param_large_objects_path.R create mode 100644 man/store_large_object.Rd diff --git a/R/Rush.R b/R/Rush.R index 8bd6011..6bb9669 100644 --- a/R/Rush.R +++ b/R/Rush.R @@ -253,6 +253,8 @@ Rush = R6::R6Class("Rush", assert_subset(unlist(worker_ids), self$worker_ids) r = self$connector + # FIXME: Kill workers? + lg$error("Restarting %i worker(s): %s", length(worker_ids), str_collapse(worker_ids)) processes = set_names(map(worker_ids, function(worker_id) { # restart worker @@ -386,11 +388,8 @@ Rush = R6::R6Class("Rush", #' #' @param restart_workers (`logical(1)`)\cr #' Whether to restart lost workers. - #' @param restart_tasks (`logical(1)`)\cr - #' Whether to restart lost tasks. - detect_lost_workers = function(restart_workers = FALSE, restart_tasks = FALSE) { + detect_lost_workers = function(restart_workers = FALSE) { assert_flag(restart_workers) - assert_flag(restart_tasks) r = self$connector # check workers with a heartbeat @@ -690,13 +689,13 @@ Rush = R6::R6Class("Rush", #' #' @param keys (`character()`)\cr #' Keys of the tasks to be retried. - #' @param ignore_max_retires (`logical(1)`)\cr + #' @param ignore_max_retries (`logical(1)`)\cr #' Whether to ignore the maximum number of retries. #' @param next_seed (`logical(1)`)\cr #' Whether to change the seed of the task. - retry_tasks = function(keys, ignore_max_retires = FALSE, next_seed = FALSE) { + retry_tasks = function(keys, ignore_max_retries = FALSE, next_seed = FALSE) { assert_character(keys) - assert_flag(ignore_max_retires) + assert_flag(ignore_max_retries) assert_flag(next_seed) tasks = self$read_hashes(keys, fields = c("seed", "max_retries", "n_retries"), flatten = FALSE) seeds = map(tasks, "seed") @@ -707,7 +706,7 @@ Rush = R6::R6Class("Rush", if (!all(failed)) lg$error("Not all task(s) failed: %s", str_collapse(keys[!failed])) - if (ignore_max_retires) { + if (ignore_max_retries) { keys = keys[failed] } else { if (!all(retrieable)) lg$error("Task(s) reached the maximum number of retries: %s", str_collapse(keys[!retrieable])) @@ -932,6 +931,7 @@ Rush = R6::R6Class("Rush", #' Fetch tasks with different states from the data base. #' If tasks with different states are to be queried at the same time, this function prevents tasks from appearing twice. #' This could be the case if a worker changes the state of a task while the tasks are being fetched. + #' Finished tasks are cached. #' #' @param fields (`character()`)\cr #' Fields to be read from the hashes. @@ -1445,7 +1445,11 @@ Rush = R6::R6Class("Rush", # finished tasks keys can be restricted to uncached tasks .tasks_with_state = function(states, only_new_keys = FALSE) { r = self$connector + + # optionally limit finished tasks to uncached tasks start_finished_tasks = if (only_new_keys) length(private$.cached_tasks) else 0 + + # get keys of tasks with different states in one transaction r$MULTI() if ("queued" %in% states) r$LRANGE(private$.get_key("queued_tasks"), 0, -1) if ("running" %in% states) r$SMEMBERS(private$.get_key("running_tasks")) diff --git a/R/rush_plan.R b/R/rush_plan.R index 79ce40c..20cdb2f 100644 --- a/R/rush_plan.R +++ b/R/rush_plan.R @@ -12,6 +12,7 @@ #' #' @template param_n_workers #' @template param_lgr_thresholds +#' @template param_large_objects_path #' #' @export rush_plan = function(n_workers, config = NULL, lgr_thresholds = NULL, large_objects_path = NULL) { diff --git a/man-roxygen/param_large_objects_path.R b/man-roxygen/param_large_objects_path.R new file mode 100644 index 0000000..a7d3abb --- /dev/null +++ b/man-roxygen/param_large_objects_path.R @@ -0,0 +1,2 @@ +#' @param large_objects_path [character(1)]\cr +#' The path to the directory where large objects are stored. diff --git a/man/Rush.Rd b/man/Rush.Rd index aaf6fab..36ecef3 100644 --- a/man/Rush.Rd +++ b/man/Rush.Rd @@ -513,7 +513,7 @@ But checking local workers on windows might be very slow. Workers with a heartbeat process are checked with the heartbeat. Lost tasks are marked as \code{"lost"}. \subsection{Usage}{ -\if{html}{\out{
}}\preformatted{Rush$detect_lost_workers(restart_workers = FALSE, restart_tasks = FALSE)}\if{html}{\out{
}} +\if{html}{\out{
}}\preformatted{Rush$detect_lost_workers(restart_workers = FALSE)}\if{html}{\out{
}} } \subsection{Arguments}{ @@ -521,9 +521,6 @@ Lost tasks are marked as \code{"lost"}. \describe{ \item{\code{restart_workers}}{(\code{logical(1)})\cr Whether to restart lost workers.} - -\item{\code{restart_tasks}}{(\code{logical(1)})\cr -Whether to restart lost tasks.} } \if{html}{\out{}} } @@ -688,7 +685,7 @@ List of lists of conditions.} \subsection{Method \code{retry_tasks()}}{ Retry failed tasks. \subsection{Usage}{ -\if{html}{\out{
}}\preformatted{Rush$retry_tasks(keys, ignore_max_retires = FALSE, next_seed = FALSE)}\if{html}{\out{
}} +\if{html}{\out{
}}\preformatted{Rush$retry_tasks(keys, ignore_max_retries = FALSE, next_seed = FALSE)}\if{html}{\out{
}} } \subsection{Arguments}{ @@ -697,7 +694,7 @@ Retry failed tasks. \item{\code{keys}}{(\code{character()})\cr Keys of the tasks to be retried.} -\item{\code{ignore_max_retires}}{(\code{logical(1)})\cr +\item{\code{ignore_max_retries}}{(\code{logical(1)})\cr Whether to ignore the maximum number of retries.} \item{\code{next_seed}}{(\code{logical(1)})\cr @@ -1006,6 +1003,7 @@ Table of all tasks. Fetch tasks with different states from the data base. If tasks with different states are to be queried at the same time, this function prevents tasks from appearing twice. This could be the case if a worker changes the state of a task while the tasks are being fetched. +Finished tasks are cached. \subsection{Usage}{ \if{html}{\out{
}}\preformatted{Rush$fetch_tasks_with_state( fields = c("xs", "ys", "xs_extra", "worker_extra", "ys_extra", "condition"), diff --git a/man/rush_plan.Rd b/man/rush_plan.Rd index 52ee78a..e114809 100644 --- a/man/rush_plan.Rd +++ b/man/rush_plan.Rd @@ -4,7 +4,12 @@ \alias{rush_plan} \title{Create Rush Plan} \usage{ -rush_plan(n_workers, config = NULL, lgr_thresholds = NULL) +rush_plan( + n_workers, + config = NULL, + lgr_thresholds = NULL, + large_objects_path = NULL +) } \arguments{ \item{n_workers}{(\code{integer(1)})\cr @@ -18,6 +23,9 @@ See \link[redux:redis_config]{redux::redis_config} for details.} \item{lgr_thresholds}{(named \code{character()} | named \code{numeric()})\cr Logger threshold on the workers e.g. \code{c(rush = "debug")}.} + +\item{large_objects_path}{\link{character(1)}\cr +The path to the directory where large objects are stored.} } \description{ Stores the number of workers and Redis configuration options (\link[redux:redis_config]{redux::redis_config}) for \link{Rush}. diff --git a/man/store_large_object.Rd b/man/store_large_object.Rd new file mode 100644 index 0000000..172deaf --- /dev/null +++ b/man/store_large_object.Rd @@ -0,0 +1,18 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/store_large_object.R +\name{store_large_object} +\alias{store_large_object} +\title{Store Large Objects} +\usage{ +store_large_object(obj, path) +} +\arguments{ +\item{obj}{(\code{any})\cr +Object to store.} + +\item{path}{(\code{character(1)})\cr +Path to store the object.} +} +\description{ +Store large objects to disk and return a reference to the object. +} diff --git a/tests/testthat/test-RushWorker.R b/tests/testthat/test-RushWorker.R index 7b35d1a..926d03f 100644 --- a/tests/testthat/test-RushWorker.R +++ b/tests/testthat/test-RushWorker.R @@ -527,7 +527,7 @@ test_that("retry a failed task works with a maximum of retries", { rush$push_failed(task$key, condition = list(list(message = "error"))) expect_output(rush$retry_tasks(keys), "reached the maximum number of retries") - rush$retry_tasks(keys, ignore_max_retires = TRUE) + rush$retry_tasks(keys, ignore_max_retries = TRUE) task_info = rush$read_hash(keys, fields = c("max_retries", "n_retries")) expect_equal(task_info$max_retries, 1) expect_equal(task_info$n_retries, 2)