Skip to content

Commit

Permalink
feat: network without controller
Browse files Browse the repository at this point in the history
  • Loading branch information
be-marc committed Oct 17, 2023
1 parent 0042d87 commit 7b8631e
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 15 deletions.
24 changes: 24 additions & 0 deletions R/RushWorker.R
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,30 @@ RushWorker = R6::R6Class("RushWorker",
"heartbeat", as.character(!is.null(self$heartbeat))))
},

#' @description
#' Push a task to running tasks without queue.
#'
#' @param xss (list of named `list()`)\cr
#' Lists of arguments for the function e.g. `list(list(x1, x2), list(x1, x2)))`.
#' @param extra (`list`)\cr
#' List of additional information stored along with the task e.g. `list(list(timestamp), list(timestamp)))`.
#'
#' @return (`character()`)\cr
#' Keys of the tasks.
push_running_task = function(xss, extra = NULL) {
assert_list(xss, types = "list")
assert_list(extra, types = "list", null.ok = TRUE)
r = self$connector

lg$debug("Pushing %i running task(s).", length(xss))

keys = self$write_hashes(xs = xss, xs_extra = extra, status = "running")
r$command(c("SADD", private$.get_key("running_tasks"), keys))
r$command(c("SADD", private$.get_key("all_tasks"), keys))

return(invisible(keys))
},

#' @description
#' Pop a task from the queue.
#' Task is moved to the running tasks.
Expand Down
10 changes: 7 additions & 3 deletions README.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@ output: github_document

# rush

Rush is a package for asynchronous parallelization in R.
Tasks are queued and distributed to workers in the background.
It heavily uses Redis as a data base.
Rush is a package for parallel and distributed computing in R.
It parallelizes the evaluation of R functions on a cluster of workers and provides a shared storage between the workers.
The shared storage is a [Redis](https://redis.io) data base.
Rush offers the option to define a single manager that distributes tasks to the workers.
Alternatively, the workers can create tasks themselves and communicate the results with each other via Rush.

![](man/figures/README-flow.png)

Single manager with multiple workers strategy.

## Install

[Install Redis](https://redis.io/docs/getting-started/installation/)
Expand Down
11 changes: 6 additions & 5 deletions README.html

Large diffs are not rendered by default.

20 changes: 13 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@

# rush

Rush is a package for asynchronous parallelization in R. Tasks are
queued and distributed to workers in the background. It heavily uses
Redis as a data base.
Rush is a package for parallel and distributed computing in R. It
parallelizes the evaluation of R functions on a cluster of workers and
provides a shared storage between the workers. The shared storage is a
[Redis](https://redis.io) data base. Rush offers the option to define a
single manager who distributes tasks to the workers. Alternatively, the
workers can create tasks themselves and communicate the results with
each other via Rush.

![](man/figures/README-flow.png)

Single manager with multiple workers strategy.

## Install

[Install Redis](https://redis.io/docs/getting-started/installation/)
Expand Down Expand Up @@ -69,11 +75,11 @@ rush$fetch_finished_tasks()
```

## x1 x2 pid worker_id y status
## 1: 4 6 545135 f79a2cef-5e37-43f0-a91c-61f444295990 10 finished
## 2: 3 5 545136 7d03c5a0-f66b-49f1-9a09-38645342df02 8 finished
## 1: 4 6 189379 8219bdc4-a7e7-485e-b747-ba9ed83b8846 10 finished
## 2: 3 5 189380 d1719b60-d066-4fd9-93be-94dffcfb08a5 8 finished
## keys
## 1: dfdc1544-e6f8-4bce-9888-81c072595fdc
## 2: e8165475-2c2c-48b5-b7db-ac6d9557cff4
## 1: acbf50d1-cafe-4628-a05d-a3ed72317aef
## 2: 452e5a1b-8019-4ec9-98fe-ebf537878762

## Task States

Expand Down
37 changes: 37 additions & 0 deletions tests/testthat/test-Rush.R
Original file line number Diff line number Diff line change
Expand Up @@ -675,3 +675,40 @@ test_that("terminating workers on idle works", {
expect_reset_rush(rush)
clean_test_env(pids)
})

# rush network without controller ----------------------------------------------

test_that("network without controller works", {

config = start_flush_redis()
rush = Rush$new(instance_id = "test-rush", config = config)

fun = function(rush) {
while (rush$n_finished_tasks < 100) {
# ask
xs = list(
x1 = sample(seq(1000), 1),
x2 = sample(seq(1000), 1)
)
keys = rush$push_running_task(list(xs))

# evaluate
ys = list(y = xs$x1 + xs$x2)

# tell
rush$push_results(keys, list(ys))
}

return(NULL)
}

future::plan("multisession", workers = 2)
rush$start_workers(worker_loop = fun, n_workers = 2, await_workers = TRUE)

Sys.sleep(10)
expect_equal(rush$n_finished_tasks, 100)

pids = rush$worker_info$pid
expect_reset_rush(rush)
clean_test_env(pids)
})

0 comments on commit 7b8631e

Please sign in to comment.