Skip to content

Commit

Permalink
benchmark: add priority and worker info benchmarks
Browse files Browse the repository at this point in the history
  • Loading branch information
be-marc committed Oct 16, 2023
1 parent 4b999ee commit 5075702
Showing 1 changed file with 132 additions and 65 deletions.
197 changes: 132 additions & 65 deletions benchmark/runner.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ runner = function(renv_project, times) {
system("redis-server --daemonize yes --save \"\" --appendonly no")

# initialize environment
res = list()
renv::load(renv_project)
project_id = renv::project()
res[["project_id"]] = renv::project()

library(rush)
library(data.table)
Expand Down Expand Up @@ -41,13 +42,13 @@ runner = function(renv_project, times) {
extra_1000 = replicate(1000, list(list(extra1 = runif(1))))
extra_10000 = replicate(10000, list(list(extra1 = runif(1))))

res = list()


# Initializing Rush Controller
config = start_flush_redis()

res[["bm_init_rush"]] = microbenchmark(
rush = Rush$new("benchmark", config),
controller = Rush$new("benchmark", config),
times = times,
unit = "ms"
)
Expand All @@ -56,7 +57,7 @@ runner = function(renv_project, times) {
config = start_flush_redis()

res[["bm_init_worker"]] = microbenchmark(
rush = RushWorker$new("benchmark", config, host = "local"),
worker = RushWorker$new("benchmark", config, host = "local"),
times = times,
unit = "ms"
)
Expand All @@ -72,7 +73,7 @@ runner = function(renv_project, times) {
}

res[["bm_init_future"]] = microbenchmark(
rush = rush$start_workers(fun, host = "local", await_workers = TRUE),
future = rush$start_workers(fun, host = "local", await_workers = TRUE),
times = times,
unit = "ms",
setup = setup()
Expand All @@ -89,7 +90,7 @@ runner = function(renv_project, times) {
}

res[["bm_init_heartbeat"]] = microbenchmark(
rush = rush$start_workers(fun, host = "local", heartbeat_period = 3, await_workers = TRUE),
heartbeat = rush$start_workers(fun, host = "local", heartbeat_period = 3, await_workers = TRUE),
times = times,
unit = "ms",
setup = setup()
Expand All @@ -100,26 +101,47 @@ runner = function(renv_project, times) {
rush = Rush$new("benchmark", config)

res[["bm_push_task"]] = microbenchmark(
push_1 = rush$push_tasks(list(xss_1)),
push_10 = rush$push_tasks(xss_10),
push_100 = rush$push_tasks(xss_100),
push_1000 = rush$push_tasks(xss_1000),
push_10000 = rush$push_tasks(xss_10000),
push_task_1 = rush$push_tasks(list(xss_1)),
push_task_10 = rush$push_tasks(xss_10),
push_task_100 = rush$push_tasks(xss_100),
push_task_1000 = rush$push_tasks(xss_1000),
push_task_10000 = rush$push_tasks(xss_10000),
times = times,
unit = "ms",
setup = rush$reset()
)

# Push Priority Task
config = start_flush_redis()
rush = Rush$new("benchmark", config)
worker_id = RushWorker$new("benchmark", config, host = "local")$worker_id

priority_queue_1 = rep(worker_id, 1)
priority_queue_10 = rep(worker_id, 10)
priority_queue_100 = rep(worker_id, 100)
priority_queue_1000 = rep(worker_id, 1000)
priority_queue_10000 = rep(worker_id, 10000)

res[["bm_push_task"]] = microbenchmark(
push_task_1 = rush$push_priority_tasks(list(xss_1), priority = priority_queue_1),
push_task_10 = rush$push_priority_tasks(xss_10, priority = priority_queue_10),
push_task_100 = rush$push_priority_tasks(xss_100, priority = priority_queue_100),
push_task_1000 = rush$push_priority_tasks(xss_1000, priority = priority_queue_1000),
push_task_10000 = rush$push_priority_tasks(xss_10000, priority = priority_queue_10000),
times = times,
unit = "ms"
)

# Push Task with Extra
config = start_flush_redis()
rush = Rush$new("benchmark", config)

res[["bm_push_extra"]] = microbenchmark(
push_1 = rush$push_tasks(list(xss_1), extra = extra_1),
push_10 = rush$push_tasks(xss_10, extra = extra_10),
push_100 = rush$push_tasks(xss_100, extra = extra_100),
push_1000 = rush$push_tasks(xss_1000, extra = extra_1000),
push_10000 = rush$push_tasks(xss_10000, extra = extra_10000),
push_extra_1 = rush$push_tasks(list(xss_1), extra = extra_1),
push_extra_10 = rush$push_tasks(xss_10, extra = extra_10),
push_extra_100 = rush$push_tasks(xss_100, extra = extra_100),
push_extra_1000 = rush$push_tasks(xss_1000, extra = extra_1000),
push_extra_10000 = rush$push_tasks(xss_10000, extra = extra_10000),
times = times,
setup = rush$reset(),
unit = "ms"
Expand Down Expand Up @@ -151,7 +173,7 @@ runner = function(renv_project, times) {

res[["bm_pop_10"]] = microbenchmark(
pop_10 = rush_10$pop_task(),
times = 10,
times = times,
unit = "ms",
setup = setup(rush_10, xss_10)
)
Expand Down Expand Up @@ -201,7 +223,7 @@ runner = function(renv_project, times) {
setup = setup(rush_10000, xss_10000)
)

# Fetch Queued Tasks
# Fetch Queued Tasks
config = start_flush_redis()
rush_1 = RushWorker$new("benchmark_1", config, host = "local")
rush_10 = RushWorker$new("benchmark_10", config, host = "local")
Expand Down Expand Up @@ -259,7 +281,6 @@ runner = function(renv_project, times) {
)

# Fetch Results

setup = function(rush, xss) {
rush$reset()
keys = rush$push_tasks(xss)
Expand Down Expand Up @@ -323,7 +344,6 @@ runner = function(renv_project, times) {
)

# Fetch Result with Cache

setup = function(rush, xss) {
rush$reset()
keys = rush$push_tasks(xss)
Expand All @@ -339,8 +359,7 @@ runner = function(renv_project, times) {
rush_1 = RushWorker$new("benchmark_1", config, host = "local")

res[["bm_cache_result_1"]] = microbenchmark(
latest_results_1 = rush_1$fetch_latest_results(),
fetch_results_1 = rush_1$fetch_results(),
fetch_cache_1 = rush_1$fetch_results(),
times = times,
unit = "ms",
setup = setup(rush_1, list(xss_1))
Expand All @@ -350,8 +369,7 @@ runner = function(renv_project, times) {
rush_10 = RushWorker$new("benchmark_10", config, host = "local")

res[["bm_cache_result_10"]] = microbenchmark(
latest_results_10 = rush_10$fetch_latest_results(),
fetch_results_10 = rush_10$fetch_results(),
fetch_cache_10 = rush_10$fetch_results(),
times = times,
unit = "ms",
setup = setup(rush_10, xss_10)
Expand All @@ -362,8 +380,7 @@ runner = function(renv_project, times) {
rush_100 = RushWorker$new("benchmark_100", config, host = "local")

res[["bm_cache_result_100"]] = microbenchmark(
latest_results_100 = rush_100$fetch_latest_results(),
fetch_results_100 = rush_100$fetch_results(),
fetch_cache_100 = rush_100$fetch_results(),
times = times,
unit = "ms",
setup = setup(rush_100, xss_100)
Expand All @@ -373,8 +390,7 @@ runner = function(renv_project, times) {
rush_1000 = RushWorker$new("benchmark_1000", config, host = "local")

res[["bm_cache_result_1000"]] = microbenchmark(
latest_results_1000 = rush_1000$fetch_latest_results(),
fetch_results_1000 = rush_1000$fetch_results(),
fetch_cache_1000 = rush_1000$fetch_results(),
times = times,
unit = "ms",
setup = setup(rush_1000, xss_1000)
Expand All @@ -384,49 +400,82 @@ runner = function(renv_project, times) {
rush_10000 = RushWorker$new("benchmark_10000", config, host = "local")

res[["bm_cache_result_10000"]] = microbenchmark(
latest_results_10000 = rush_10000$fetch_latest_results(),
fetch_results_10000 = rush_10000$fetch_results(),
fetch_cache_10000 = rush_10000$fetch_results(),
times = times,
unit = "ms",
setup = setup(rush_10000, xss_10000)
)


# Write Hashes
## Fetch Finished Tasks
setup = function(rush, xss) {
rush$reset()
keys = rush$push_tasks(xss)
rush$connector$command(c("SADD", get_private(rush)$.get_key("running_tasks"), keys))
walk(keys, function(key) rush$push_results(key, list(list(y = 10))))
}

config = start_flush_redis()
rush= Rush$new("benchmark", config)
rush$reset()
rush_1 = RushWorker$new("benchmark_1", config, host = "local")

xdt_1 = data.table(x1 = runif(1), x2 = runif(1))
xss_1 = transpose_list(xdt_1)
res[["bm_finished_1"]] = microbenchmark(
fetch_finished_tasks_1 = rush_1$fetch_finished_tasks(),
times = times,
unit = "ms",
setup = setup(rush_1, list(xss_1))
)

config = start_flush_redis()
rush_10 = RushWorker$new("benchmark_10", config, host = "local")

res[["bm_write_task"]] = microbenchmark(
write_hash = rush$write_hashes(xs = xss_1),
res[["bm_finished_10"]] = microbenchmark(
fetch_finished_tasks_10 = rush_10$fetch_finished_tasks(),
times = times,
unit = "ms"
unit = "ms",
setup = setup(rush_10, xss_10)
)

config = start_flush_redis()
rush= Rush$new("benchmark", config)
rush$reset()
rush_100 = RushWorker$new("benchmark_100", config, host = "local")

res[["bm_finished_100"]] = microbenchmark(
fetch_finished_tasks_100 = rush_100$fetch_finished_tasks(),
times = times,
unit = "ms",
setup = setup(rush_100, xss_100)
)

res[["bm_write_extra"]] = microbenchmark(
write_hash = rush$write_hashes(xs = xss_1, xs_extra = extra_1),
config = start_flush_redis()
rush_1000 = RushWorker$new("benchmark_1000", config, host = "local")

res[["bm_finished_1000"]] = microbenchmark(
fetch_finished_tasks_1000 = rush_1000$fetch_finished_tasks(),
times = times,
unit = "ms"
unit = "ms",
setup = setup(rush_1000, xss_1000)
)

config = start_flush_redis()
rush_10000 = RushWorker$new("benchmark_10000", config, host = "local")

res[["bm_finished_10000"]] = microbenchmark(
fetch_finished_tasks_10000 = rush_10000$fetch_finished_tasks(),
times = times,
unit = "ms",
setup = setup(rush_10000, xss_10000)
)

# Write Hashes
config = start_flush_redis()
rush= Rush$new("benchmark", config)
rush$reset()

res[["bm_cache_tasks"]] = microbenchmark(
write_hashes_10 = rush$write_hashes(xs = xss_10),
write_hashes_100 = rush$write_hashes(xs = xss_100),
write_hashes_1000 = rush$write_hashes(xs = xss_1000),
write_hashes_10000 = rush$write_hashes(xs = xss_10000),
res[["bm_write"]] = microbenchmark(
write_1 = rush$write_hashes(xs = xss_1),
write_10 = rush$write_hashes(xs = xss_10),
write_100 = rush$write_hashes(xs = xss_100),
write_1000 = rush$write_hashes(xs = xss_1000),
write_10000 = rush$write_hashes(xs = xss_10000),
times = times,
unit = "ms"
)
Expand All @@ -435,29 +484,29 @@ runner = function(renv_project, times) {
rush= Rush$new("benchmark", config)
rush$reset()

res[["bm_cache_extras"]] = microbenchmark(
write_hashes_10 = rush$write_hashes(xs = xss_10, xs_extra = extra_10),
write_hashes_100 = rush$write_hashes(xs = xss_100, xs_extra = extra_100),
write_hashes_1000 = rush$write_hashes(xs = xss_1000, xs_extra = extra_1000),
write_hashes_10000 = rush$write_hashes(xs = xss_10000, xs_extra = extra_10000),
res[["bm_write_extra"]] = microbenchmark(
write_extra_1 = rush$write_hashes(xs = xss_1, xs_extra = extra_1),
write_extra_10 = rush$write_hashes(xs = xss_10, xs_extra = extra_10),
write_extra_100 = rush$write_hashes(xs = xss_100, xs_extra = extra_100),
write_extra_1000 = rush$write_hashes(xs = xss_1000, xs_extra = extra_1000),
write_extra_10000 = rush$write_hashes(xs = xss_10000, xs_extra = extra_10000),
times = times,
unit = "ms"
)

# Read Hashes

config = start_flush_redis()
rush= Rush$new("benchmark", config)
rush$reset()

keys = rush$write_hashes(xs = xss_10000)

res[["bm_read"]] = microbenchmark(
read_hashes_1 = rush$read_hashes(keys[[1]], "xs"),
read_hashes_10 = rush$read_hashes(keys[seq(10)], "xs"),
read_hashes_100 = rush$read_hashes(keys[seq(100)], "xs"),
read_hashes_1000 = rush$read_hashes(keys[seq(1000)], "xs"),
read_hashes_10000 = rush$read_hashes(keys, "xs"),
read_1 = rush$read_hashes(keys[[1]], "xs"),
read_10 = rush$read_hashes(keys[seq(10)], "xs"),
read_100 = rush$read_hashes(keys[seq(100)], "xs"),
read_1000 = rush$read_hashes(keys[seq(1000)], "xs"),
read_10000 = rush$read_hashes(keys, "xs"),
times = times,
unit = "ms"
)
Expand All @@ -469,17 +518,35 @@ runner = function(renv_project, times) {
keys = rush$write_hashes(xs = xss_10000, xs_extra = extra_10000)

res[["bm_read_extra"]] = microbenchmark(
read_hashes_1 = rush$read_hashes(keys[[1]], c("xs", "xs_extra")),
read_hashes_10 = rush$read_hashes(keys[seq(10)], c("xs", "xs_extra")),
read_hashes_100 = rush$read_hashes(keys[seq(100)], c("xs", "xs_extra")),
read_hashes_1000 = rush$read_hashes(keys[seq(1000)], c("xs", "xs_extra")),
read_hashes_10000 = rush$read_hashes(keys, c("xs", "xs_extra")),
read_extra_1 = rush$read_hashes(keys[[1]], c("xs", "xs_extra")),
read_extra_10 = rush$read_hashes(keys[seq(10)], c("xs", "xs_extra")),
read_extra_100 = rush$read_hashes(keys[seq(100)], c("xs", "xs_extra")),
read_extra_1000 = rush$read_hashes(keys[seq(1000)], c("xs", "xs_extra")),
read_extra_10000 = rush$read_hashes(keys, c("xs", "xs_extra")),
times = times,
unit = "ms"
)

# Detect Lost Workers with ps_exists Function
# Worker Info
config = start_flush_redis()
rush = Rush$new("benchmark", config)

future::plan("multisession", workers = 10)
fun = function(x1, x2, ...) list(y = x1 + x2)
rush$start_workers(fun, n_workers = 10, host = "local", await_workers = TRUE)

setup = function(rush) {
rush$worker_info
}

microbenchmark(
worker_info = rush$worker_info,
times = 100,
unit = "ms",
setup = setup(rush)
)

# Detect Lost Workers with ps_exists Function
config = start_flush_redis()
rush = Rush$new("benchmark", config)

Expand Down

0 comments on commit 5075702

Please sign in to comment.