Skip to content

Commit

Permalink
tests: Add perf test for shard_token_bucket
Browse files Browse the repository at this point in the history
The test checks if the token-bucket "rate" is held under various
circumstances:

- when shards sleep between grabbing tokens
- when shards poll the t.b. frequently
- when shards are disturbed with CPU hogs

So far the test shows four problems:

- With few shards tokens deficiency produces zero sleep time, so the
  "good" user that sleeps between grabs effectively converts into a
  polling ("bad") user (fixed by scylladb#1722)

- Sometimes replenishing rounding errors accumulate and render lower
  resulting rate than configured (fixed by scylladb#1723)

- When run with CPU hogs the individual shard's rates may differ too
  much (see scylladb#1083). E.g. the bucket configured with the rate of 100k
  tokens/sec, 48 shards, run 4 seconds.

  "Slowest" shard vs "fastest" shards get this amount of tokens:

    no hog:   6931 ... 9631
    with hog: 2135 ... 29412

  (sum rate is 100k with the aforementioned fixes)

- With "capped-release" token bucket and token releasing by-timer with
  the configured rate and hogs the resulting throughput can be as low as
  50% of the configured (see scylladb#1641)

  Created token-bucket 1000000.0 t/s
  perf_pure_context.sleeping_throughput_with_hog:   999149.3 t/s
  perf_capped_context.sleeping_throughput:          859995.9 t/s
  perf_capped_context.sleeping_throughput_with_hog: 512912.0 t/s

Signed-off-by: Pavel Emelyanov <[email protected]>
  • Loading branch information
xemul committed Jul 31, 2023
1 parent 0784da8 commit 252c349
Show file tree
Hide file tree
Showing 3 changed files with 299 additions and 0 deletions.
1 change: 1 addition & 0 deletions include/seastar/util/shared_token_bucket.hh
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class shared_token_bucket {
static constexpr rate_resolution max_delta = std::chrono::duration_cast<rate_resolution>(std::chrono::hours(1));
public:
static constexpr T max_rate = std::numeric_limits<T>::max() / 2 / max_delta.count();
static constexpr capped_release is_capped = Capped;

private:
static constexpr T accumulated(T rate, rate_resolution delta) noexcept {
Expand Down
3 changes: 3 additions & 0 deletions tests/perf/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ seastar_add_test (fstream
seastar_add_test (fair_queue
SOURCES fair_queue_perf.cc)

seastar_add_test (shared_token_bucket
SOURCES shared_token_bucket.cc)

seastar_add_test (future_util
SOURCES future_util_perf.cc)

Expand Down
295 changes: 295 additions & 0 deletions tests/perf/shared_token_bucket.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,295 @@
/*
* This file is open source software, licensed to you under the terms
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. You may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright (C) 2023 ScyllaDB Ltd.
*/


#include <random>
#include <seastar/testing/perf_tests.hh>
#include <seastar/testing/random.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/sleep.hh>
#include <seastar/util/later.hh>
#include <seastar/util/shared_token_bucket.hh>

using capped_token_bucket_t = internal::shared_token_bucket<uint64_t, std::ratio<1>, internal::capped_release::yes>;
using pure_token_bucket_t = internal::shared_token_bucket<uint64_t, std::ratio<1>, internal::capped_release::no>;
using clock_type = std::chrono::steady_clock;

struct work_result {
uint64_t tokens;
uint64_t released;
};

struct statistics {
uint64_t total = 0;
uint64_t min = std::numeric_limits<uint64_t>::max();
uint64_t max = std::numeric_limits<uint64_t>::min();
};

statistics accumulate(statistics acc, const work_result& val) {
return statistics {
.total = acc.total + val.tokens,
.min = std::min(acc.min, val.tokens),
.max = std::max(acc.max, val.tokens),
};
}

template <typename TokenBucket>
struct worker : public seastar::peering_sharded_service<worker<TokenBucket>> {
TokenBucket& tb;
uint64_t tokens = 0;
uint64_t available = 0;
uint64_t released = 0;
std::optional<std::pair<int, uint64_t>> head;
std::uniform_int_distribution<int> size;
static constexpr auto release_period = std::chrono::microseconds(500);
const uint64_t threshold; // this is to mimic the main user of shared-token-bucket -- a fair_queue::dispatch_requests
const uint64_t release_per_tick = 0;
clock_type::time_point last_release;
timer<> release_tokens;

struct tick_data {
std::chrono::microseconds delay;
std::chrono::microseconds sleep;
uint64_t defic;
uint64_t tokens;
uint64_t total;

template <typename D1, typename D2>
tick_data(D1 dur, D2 slp, uint64_t def, uint64_t lt, uint64_t tt) noexcept
: delay(std::chrono::duration_cast<std::chrono::microseconds>(dur))
, sleep(std::chrono::duration_cast<std::chrono::microseconds>(slp))
, defic(def)
, tokens(lt)
, total(tt)
{}
};
std::deque<tick_data> ticks;

worker(TokenBucket& tb_) noexcept
: tb(tb_)
, size(1, std::min<int>(threshold, 128))
, threshold(tb.limit() / smp::count)
, release_per_tick(double(tb.rate()) / smp::count * std::chrono::duration_cast<std::chrono::duration<double>>(release_period).count())
, last_release(clock_type::now())
, release_tokens([this] { do_release(); })
{
release_tokens.arm_periodic(std::chrono::duration_cast<std::chrono::microseconds>(release_period));
fmt::print("{} worker, threshold {}, release-per-tick {}\n", this_shard_id(), threshold, release_per_tick);
}

void do_release(uint64_t tokens) {
available -= tokens;
released += tokens;
if constexpr (TokenBucket::is_capped == internal::capped_release::yes) {
tb.release(tokens);
}
}

void do_release() {
auto now = clock_type::now();
auto real_delay = std::chrono::duration_cast<std::chrono::duration<double>>(now - last_release);
last_release = now;
uint64_t to_release = real_delay.count() * release_per_tick / std::chrono::duration_cast<std::chrono::duration<double>>(release_period).count();
do_release(std::min(to_release, available));
}

future<work_result> work(std::function<future<>(std::chrono::duration<double> d)> do_sleep) {
assert(tokens == 0);
auto start = clock_type::now();
return do_until([end = start + std::chrono::seconds(1)] { return clock_type::now() >= end; },
[this, start, do_sleep = std::move(do_sleep)] {
uint64_t d = 0;
uint64_t l_tokens = 0;
int sz;

while (l_tokens < threshold) {
if (head) {
tb.replenish(clock_type::now());
d = tb.deficiency(head->second);
if (d > 0) {
break;
}
sz = head->first;
head.reset();
} else {
sz = size(testing::local_random_engine);
auto h = tb.grab(sz);
d = tb.deficiency(h);
if (d > 0) {
head = std::make_pair(sz, h);
break;
}
}
tokens += sz;
l_tokens += sz;
available += sz;
}

auto p = tb.duration_for(d);
ticks.emplace_back(clock_type::now() - start, p, d, l_tokens, tokens);
if (ticks.size() > 2048) {
ticks.pop_front();
}
return do_sleep(p);
}
).then([this, start] {
auto delay = std::chrono::duration_cast<std::chrono::duration<double>>(clock_type::now() - start).count();
fmt::print("{} {}t/{:.3f}s, speed is {:.1f}t/s goal {:.1f}t/s, {} ticks, released {} (accumulated {})\n", this_shard_id(), tokens, delay,
double(tokens) / delay, double(tb.rate()) / smp::count, ticks.size(), released, available);
do_release(available);
work_result r {
.tokens = std::exchange(this->tokens, 0),
.released = std::exchange(this->released, 0),
};
return make_ready_future<work_result>(std::move(r));
});
}

future<work_result> work_sleeping() {
return work([this] (std::chrono::duration<double> d) {
return seastar::sleep(std::chrono::duration_cast<std::chrono::microseconds>(d));
});
}

future<work_result> work_yielding() {
return work([] (std::chrono::duration<double>) {
return seastar::yield();
});
}

future<> print_and_clear_ticks() {
fmt::print("{} {} ticks\n", this_shard_id(), ticks.size());
std::chrono::microseconds p(0);
for (auto& td : ticks) {
fmt::print(" {:8} +{:5} us {:5}/{:5} def {:3} sleep {:5} us\n", td.delay.count(), (td.delay - p).count(), td.tokens, td.total, td.defic, td.sleep.count());
p = td.delay;
}
ticks.clear();
if (this_shard_id() == smp::count - 1) {
return make_ready_future<>();
}

return this->container().invoke_on(this_shard_id() + 1, &worker::print_and_clear_ticks);
}
};

struct hog {
std::exponential_distribution<double> busy;
std::exponential_distribution<double> rest;
std::optional<future<>> stopped;
bool keep_going = false;
uint64_t _iterations = 0;

template <typename T1, typename T2>
hog(T1 b, T2 r) noexcept
: busy(1.0 / std::chrono::duration_cast<std::chrono::duration<double>>(b).count())
, rest(1.0 / std::chrono::duration_cast<std::chrono::duration<double>>(r).count())
{}

void work() {
assert(!stopped.has_value());
keep_going = true;
stopped = do_until([this] { return !keep_going; },
[this] {
auto p = std::chrono::duration<double>(rest(testing::local_random_engine));
return seastar::sleep(std::chrono::duration_cast<std::chrono::microseconds>(p)).then([this] {
_iterations++;
auto until = clock_type::now() + std::chrono::duration<double>(busy(testing::local_random_engine));
do {
} while (clock_type::now() < until && keep_going);
});
}
);
}

future<> terminate() {
assert(stopped.has_value());
keep_going = false;
auto f = std::move(*stopped);
stopped.reset();
return f.finally([this] {
fmt::print("{}-hog {} iters\n", this_shard_id(), std::exchange(_iterations, 0));
});
}
};

template <typename TokenBucket>
struct context {
using worker_t = worker<TokenBucket>;
TokenBucket tb;
seastar::sharded<worker_t> w;
seastar::sharded<hog> h;

static constexpr uint64_t rate = 1000000;
static constexpr uint64_t limit = rate / 2000;
static constexpr uint64_t threshold = 1;

context() : tb(rate, limit, threshold)
{
w.start(std::ref(tb)).get();
h.start(std::chrono::microseconds(300), std::chrono::microseconds(100)).get();
fmt::print("Created tb {}t/s (limit {} threshold {})\n", tb.rate(), tb.limit(), tb.threshold());
}

~context() {
h.stop().get();
w.stop().get();
}

template <typename Fn>
future<> run_workers(Fn&& fn) {
auto start = clock_type::now();
return w.map_reduce0(std::forward<Fn>(fn), statistics{}, accumulate).then([this, start] (statistics st) {
auto delay = std::chrono::duration_cast<std::chrono::duration<double>>(clock_type::now() - start).count();
fmt::print("effective rate is {:.1f}t/s, [{} ... {}]\n", st.total / delay, st.min, st.max);
});
}

future<> test_sleeping() {
fmt::print("---8<---\n");
return run_workers(&worker_t::work_sleeping);
}

future<> test_yielding() {
fmt::print("---8<---\n");
return run_workers(&worker_t::work_yielding);
}

future<> test_sleeping_with_hog() {
fmt::print("---8<---\n");
return h.invoke_on_all(&hog::work).then([this] {
return run_workers(&worker_t::work_sleeping).then([this] {
return h.invoke_on_all(&hog::terminate);
});
});
}
};

struct perf_capped_context : public context<capped_token_bucket_t> {};
struct perf_pure_context : public context<pure_token_bucket_t> {};

PERF_TEST_F(perf_capped_context, sleeping_throughput) { return test_sleeping(); }
PERF_TEST_F(perf_capped_context, yielding_throughput) { return test_yielding(); }
PERF_TEST_F(perf_capped_context, sleeping_throughput_with_hog) { return test_sleeping_with_hog(); }

PERF_TEST_F(perf_pure_context, sleeping_throughput) { return test_sleeping(); }
PERF_TEST_F(perf_pure_context, yielding_throughput) { return test_yielding(); }
PERF_TEST_F(perf_pure_context, sleeping_throughput_with_hog) { return test_sleeping_with_hog(); }

0 comments on commit 252c349

Please sign in to comment.