Skip to content

Commit

Permalink
fix: redis config to string
Browse files Browse the repository at this point in the history
  • Loading branch information
be-marc committed May 21, 2024
1 parent 9d9bb32 commit 25dd8c7
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 3 deletions.
8 changes: 6 additions & 2 deletions R/Rush.R
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,15 @@ Rush = R6::R6Class("Rush",

lg$info("Starting %i worker(s)", n_workers)

# redis config to string
config = discard(self$config, is.null)
config = paste(imap(config, function(value, name) sprintf("%s = '%s'", name, value)), collapse = ", ")

worker_ids = uuid::UUIDgenerate(n = n_workers)
self$processes = c(self$processes, set_names(map(worker_ids, function(worker_id) {
processx::process$new("Rscript",
args = c("-e", sprintf("rush::start_worker(network_id = '%s', worker_id = '%s', hostname = '%s', url = '%s')",
self$network_id, worker_id, private$.hostname, self$config$url)),
args = c("-e", sprintf("rush::start_worker(network_id = '%s', worker_id = '%s', hostname = '%s', %s)",
self$network_id, worker_id, private$.hostname, config)),
supervise = supervise, stderr = "|") # , stdout = "|"
}), worker_ids))

Expand Down
5 changes: 4 additions & 1 deletion R/start_worker.R
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ start_worker = function(
checkmate::assert_string(hostname)

# connect to redis
config = redux::redis_config(config = list(...))
config = list(...)
if (!is.null(config$port)) config$port = as.integer(config$port)
if (!is.null(config$timeout)) config$timeout = as.integer(config$timeout)
config = redux::redis_config(config = config)
r = redux::hiredis(config)

# get start arguments
Expand Down
31 changes: 31 additions & 0 deletions tests/testthat/test-Rush.R
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,37 @@ test_that("workers are started", {
expect_rush_reset(rush)
})

test_that("workers are started with Redis on unix socket", {
skip_if(TRUE)

system(sprintf("redis-server --port 0 --unixsocket /tmp/redis.sock --daemonize yes --pidfile /tmp/redis.pid --dir %s", tempdir()))
Sys.sleep(5)

config = redux::redis_config(path = "/tmp/redis.sock")
r = redux::hiredis(config)
r$FLUSHDB()

rush = Rush$new(network_id = "test-rush", config = config)
fun = function(x1, x2, ...) list(y = x1 + x2)

worker_ids = rush$start_workers(fun = fun, n_workers = 2, wait_for_workers = TRUE)
expect_equal(rush$n_workers, 2)

# check fields
walk(rush$processes, function(process) expect_class(process, "process"))

# check meta data from redis
worker_info = rush$worker_info
expect_data_table(worker_info, nrows = 2)
expect_integer(worker_info$pid, unique = TRUE)
expect_set_equal(worker_info$host, "local")
expect_set_equal(worker_ids, worker_info$worker_id)
expect_set_equal(rush$worker_ids, worker_ids)
expect_set_equal(rush$worker_states$state, "running")

expect_rush_reset(rush)
})

test_that("workers are started with a heartbeat", {
skip_on_cran()
skip_on_ci()
Expand Down

0 comments on commit 25dd8c7

Please sign in to comment.