Skip to content

Commit

Permalink
fix: print logger
Browse files Browse the repository at this point in the history
  • Loading branch information
be-marc committed Feb 9, 2024
1 parent 68cc248 commit a483f52
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 23 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ Suggests:
withr
Encoding: UTF-8
Roxygen: list(markdown = TRUE)
RoxygenNote: 7.2.3.9000
RoxygenNote: 7.3.1
Config/testthat/edition: 3
Config/testthat/parallel: false
40 changes: 20 additions & 20 deletions R/Rush.R
Original file line number Diff line number Diff line change
Expand Up @@ -524,16 +524,12 @@ Rush = R6::R6Class("Rush",
#' @param worker_ids (`character(1)`)\cr
#' Worker ids.
#' If `NULL` all worker ids are used.
#' @param first_event (`integer(1)`)\cr
#' First event to read.
#' @param last_event (`integer(1)`)\cr
#' Last event to read.
read_log = function(worker_ids = NULL, first_event = 0, last_event = -1) {
read_log = function(worker_ids = NULL) {
assert_int(first_event)
assert_int(last_event)
worker_ids = worker_ids %??% self$worker_ids
r = self$connector
cmds = map(worker_ids, function(worker_id) c("LRANGE", private$.get_worker_key("events", worker_id), first_event, last_event))
cmds = map(worker_ids, function(worker_id) c("LRANGE", private$.get_worker_key("events", worker_id), 0, -1))
worker_logs = set_names(r$pipeline(.commands = cmds), worker_ids)
tab = rbindlist(set_names(map(worker_logs, function(logs) {
rbindlist(map(logs, fromJSON))
Expand All @@ -544,19 +540,22 @@ Rush = R6::R6Class("Rush",

#' @description
#' Print log messages written with the `lgr` package from a worker.
#'
#' @param worker_ids (`character(1)`)\cr
#' Worker ids.
#' If `NULL` all worker ids are used.
print_log = function(worker_ids = NULL) {
tab = self$read_log(worker_ids, first_event = private$.log_counter, last_event = -1)
if (nrow(tab)) {
pwalk(tab, function(level, worker_id, logger, timestamp, msg, ...) {
pkg_logger = lgr::get_logger(logger)
pkg_logger$log(level, "[%s] [%s] %s", worker_id, timestamp, msg)
})
private$.log_counter = private$.log_counter + nrow(tab)
}
print_log = function() {
r = self$connector

cmds = walk(self$worker_ids, function(worker_id) {
first_event = private$.log_counter[[worker_id]] %??% 0L
log = r$command(c("LRANGE", private$.get_worker_key("events", worker_id), first_event, -1L))
if (length(log)) {
tab = rbindlist(map(log, fromJSON))
set(tab, j = "worker_id", value = worker_id)
pwalk(tab, function(level, logger, timestamp, msg, ...) {
pkg_logger = lgr::get_logger(logger)
pkg_logger$log(level, "[%s] [%s] %s", worker_id, timestamp, msg)
})
private$.log_counter[[worker_id]] = nrow(tab) + first_event
}
})
return(invisible(self))
},

Expand Down Expand Up @@ -1331,7 +1330,8 @@ Rush = R6::R6Class("Rush",

.seed = NULL,

.log_counter = 0,
# zero based
.log_counter = list(),

# prefix key with instance id
.get_key = function(key) {
Expand Down
11 changes: 11 additions & 0 deletions man/Rush.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions man/RushWorker.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions tests/testthat/test-Rush.R
Original file line number Diff line number Diff line change
Expand Up @@ -1015,14 +1015,19 @@ test_that("printing logs with redis appender works", {
lg$error("test-1-error")
list(y = x1 + x2)
}
worker_ids = rush$start_workers(fun = fun, n_workers = 1, wait_for_workers = TRUE, lgr_thresholds = c(rush = "info"))
xss = list(list(x1 = 1, x2 = 2))
worker_ids = rush$start_workers(fun = fun, n_workers = 2, wait_for_workers = TRUE, lgr_thresholds = c(rush = "info"))
xss = list(list(x1 = 1, x2 = 2), list(x1 = 2, x2 = 2))
keys = rush$push_tasks(xss)

Sys.sleep(1)

expect_output(rush$print_log(), ".*test-1-info.*test-1-warn.*test-1-error")
expect_silent(rush$print_log())

xss = list(list(x1 = 3, x2 = 2))
keys = rush$push_tasks(xss)

expect_output(rush$print_log(), ".*test-1-info.*test-1-warn.*test-1-error")

expect_rush_reset(rush, type = "terminate")
})

0 comments on commit a483f52

Please sign in to comment.