Skip to content

Commit

Permalink
feat: print log messages from workers
Browse files Browse the repository at this point in the history
  • Loading branch information
be-marc committed Feb 8, 2024
1 parent ee4d786 commit 836a89c
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 6 deletions.
31 changes: 28 additions & 3 deletions R/Rush.R
Original file line number Diff line number Diff line change
Expand Up @@ -520,10 +520,16 @@ Rush = R6::R6Class("Rush",
#' @param worker_ids (`character(1)`)\cr
#' Worker ids.
#' If `NULL` all worker ids are used.
read_log = function(worker_ids = NULL) {
#' @param first_event (`integer(1)`)\cr
#' First event to read.
#' @param last_event (`integer(1)`)\cr
#' Last event to read.
read_log = function(worker_ids = NULL, first_event = 0, last_event = -1) {
assert_int(first_event)
assert_int(last_event)
worker_ids = worker_ids %??% self$worker_ids
r = self$connector
cmds = map(worker_ids, function(worker_id) c("LRANGE", private$.get_worker_key("events", worker_id), 0, -1))
cmds = map(worker_ids, function(worker_id) c("LRANGE", private$.get_worker_key("events", worker_id), first_event, last_event))
worker_logs = set_names(r$pipeline(.commands = cmds), worker_ids)
tab = rbindlist(set_names(map(worker_logs, function(logs) {
rbindlist(map(logs, fromJSON))
Expand All @@ -532,6 +538,23 @@ Rush = R6::R6Class("Rush",
tab[]
},

#' @description
#' Print log messages written with the `lgr` package from a worker.
#'
#' @param worker_ids (`character(1)`)\cr
#' Worker ids.
#' If `NULL` all worker ids are used.
print_log = function(worker_ids = NULL) {
tab = self$read_log(worker_ids, first_event = private$.log_counter, last_event = -1)
if (nrow(tab)) {
pwalk(tab, function(level, worker_id, logger, timestamp, msg, ...) {
catf("%s [%s] [%s] %s", level, worker_id, timestamp, msg)
})
private$.log_counter = private$.log_counter + nrow(tab)
}
return(invisible(self))
},

#' @description
#' Pushes a task to the queue.
#' Task is added to queued tasks.
Expand Down Expand Up @@ -1303,6 +1326,8 @@ Rush = R6::R6Class("Rush",

.seed = NULL,

.log_counter = 0,

# prefix key with instance id
.get_key = function(key) {
sprintf("%s:%s", self$network_id, key)
Expand Down Expand Up @@ -1358,7 +1383,7 @@ Rush = R6::R6Class("Rush",
worker_loop = worker_loop,
worker_loop_args = dots,
globals = globals,
packages = packages,
packages = c("rush", packages),
worker_args = worker_args)

# serialize and push arguments to redis
Expand Down
10 changes: 7 additions & 3 deletions R/RushWorker.R
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ RushWorker = R6::R6Class("RushWorker",

lg$debug("Pushing %i running task(s).", length(xss))

keys = self$write_hashes(xs = xss, xs_extra = extra)
keys = self$write_hashes(xs = xss, xs_extra = extra, worker_extra = list(list(pid = Sys.getpid(), worker_id = self$worker_id)))
r$command(c("SADD", private$.get_key("running_tasks"), keys))
r$command(c("RPUSH", private$.get_key("all_tasks"), keys))

Expand Down Expand Up @@ -180,7 +180,7 @@ RushWorker = R6::R6Class("RushWorker",
#' @param extra (named `list()`)\cr
#' List of lists of additional information stored along with the results.
push_results = function(keys, yss, extra = NULL) {
assert_string(keys)
assert_character(keys)
assert_list(yss, types = "list")
assert_list(extra, types = "list", null.ok = TRUE)
r = self$connector
Expand Down Expand Up @@ -211,7 +211,11 @@ RushWorker = R6::R6Class("RushWorker",
set_terminated = function() {
r = self$connector
lg$debug("Worker %s terminated", self$worker_id)
r$command(c("SMOVE", private$.get_key("running_worker_ids"), private$.get_key("terminated_worker_ids"), self$worker_id))
r$pipeline(.commands = list(
c("SMOVE", private$.get_key("running_worker_ids"), private$.get_key("terminated_worker_ids"), self$worker_id),
c("SREM", private$.get_key("local_workers"), self$worker_id),
c("SREM", private$.get_key("heartbeat_keys"), private$.get_worker_key("heartbeat")
)))
return(invisible(self))
}
),
Expand Down
31 changes: 31 additions & 0 deletions tests/testthat/test-RushWorker.R
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,37 @@ test_that("saving logs with redis appender works", {
expect_rush_reset(rush, type = "terminate")
})

test_that("printing logs with redis appender works", {
skip_on_cran()
skip_on_ci()
appenders = lgr::get_logger("root")$appenders

on.exit({
lgr::get_logger("root")$set_appenders(appenders)
})

config = start_flush_redis()
rush = RushWorker$new(
network_id = "test-rush",
config = config,
host = "local",
lgr_thresholds = c(rush = "info"),
lgr_buffer_size = 0)
lg = lgr::get_logger("rush")

lg$info("test-1")

expect_output(rush$print_log(), ".*test-1")
expect_silent(rush$print_log())

lg$info("test-2")
lg$info("test-3")

expect_output(rush$print_log(), ".*test-2.*test-3")

expect_rush_reset(rush, type = "terminate")
})

test_that("settings the buffer size in redis appender works", {
skip_on_cran()
skip_on_ci()
Expand Down

0 comments on commit 836a89c

Please sign in to comment.