diff --git a/include/seastar/util/shared_token_bucket.hh b/include/seastar/util/shared_token_bucket.hh index 1792441cb13..d4042f67463 100644 --- a/include/seastar/util/shared_token_bucket.hh +++ b/include/seastar/util/shared_token_bucket.hh @@ -134,6 +134,7 @@ class shared_token_bucket { static constexpr rate_resolution max_delta = std::chrono::duration_cast(std::chrono::hours(1)); public: static constexpr T max_rate = std::numeric_limits::max() / 2 / max_delta.count(); + static constexpr capped_release is_capped = Capped; private: static constexpr T accumulated(T rate, rate_resolution delta) noexcept { diff --git a/tests/perf/CMakeLists.txt b/tests/perf/CMakeLists.txt index 4b77c4bc0e2..08366c3224b 100644 --- a/tests/perf/CMakeLists.txt +++ b/tests/perf/CMakeLists.txt @@ -67,6 +67,9 @@ seastar_add_test (fstream seastar_add_test (fair_queue SOURCES fair_queue_perf.cc) +seastar_add_test (shared_token_bucket + SOURCES shared_token_bucket.cc) + seastar_add_test (future_util SOURCES future_util_perf.cc) diff --git a/tests/perf/shared_token_bucket.cc b/tests/perf/shared_token_bucket.cc new file mode 100644 index 00000000000..d1dd1b3d84c --- /dev/null +++ b/tests/perf/shared_token_bucket.cc @@ -0,0 +1,286 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2023 ScyllaDB Ltd. + */ + + +#include +#include +#include +#include +#include +#include +#include + +using capped_token_bucket_t = internal::shared_token_bucket, internal::capped_release::yes>; +using pure_token_bucket_t = internal::shared_token_bucket, internal::capped_release::no>; +using clock_type = std::chrono::steady_clock; + +struct work_result { + uint64_t tokens; +}; + +struct statistics { + uint64_t total = 0; + uint64_t min = std::numeric_limits::max(); + uint64_t max = std::numeric_limits::min(); +}; + +statistics accumulate(statistics acc, const work_result& val) { + return statistics { + .total = acc.total + val.tokens, + .min = std::min(acc.min, val.tokens), + .max = std::max(acc.max, val.tokens), + }; +} + +template +struct worker : public seastar::peering_sharded_service> { + TokenBucket& tb; + uint64_t tokens = 0; + uint64_t tokens_per_tick_min = std::numeric_limits::max(); + uint64_t tokens_per_tick_max = std::numeric_limits::min(); + struct tick_data { + std::chrono::microseconds delay; + std::chrono::microseconds sleep; + uint64_t defic; + uint64_t tokens; + uint64_t total; + + template + tick_data(D1 dur, D2 slp, uint64_t def, uint64_t lt, uint64_t tt) noexcept + : delay(std::chrono::duration_cast(dur)) + , sleep(std::chrono::duration_cast(slp)) + , defic(def) + , tokens(lt) + , total(tt) + {} + }; + std::deque ticks; + const uint64_t threshold; // this is to mimic the main user of shared-token-bucket -- a fair_queue::dispatch_requests + std::optional> head; + std::uniform_int_distribution size; + uint64_t available = 0; + uint64_t release_per_tick = 0; + timer<> release_tokens; + + worker(TokenBucket& tb_) noexcept + : tb(tb_) + , threshold(tb.limit() / smp::count) + , size(1, 128) + , release_tokens([this] { do_release(); }) + { + if constexpr (TokenBucket::is_capped == internal::capped_release::yes) { + release_tokens.arm_periodic(std::chrono::microseconds(500)); + release_per_tick = double(tb.rate()) / smp::count / 2000; + } + fmt::print("{} worker, threshold {}, release-per-tick {}\n", this_shard_id(), threshold, release_per_tick); + } + + void do_release() { + auto release = std::min(release_per_tick, available); + available -= release; + tb.release(release); + } + + future work(std::function(std::chrono::duration d)> do_sleep) { + assert(tokens == 0); + auto start = clock_type::now(); + return do_until([end = start + std::chrono::seconds(1)] { return clock_type::now() >= end; }, + [this, start, do_sleep = std::move(do_sleep)] { + uint64_t d = 0; + uint64_t l_tokens = 0; + int sz; + + while (l_tokens < threshold) { + if (head) { + tb.replenish(clock_type::now()); + d = tb.deficiency(head->second); + if (d > 0) { + break; + } + sz = head->first; + head.reset(); + } else { + sz = size(testing::local_random_engine); + auto h = tb.grab(sz); + d = tb.deficiency(h); + if (d > 0) { + head = std::make_pair(sz, h); + break; + } + } + tokens += sz; + l_tokens += sz; + available += sz; + } + + tokens_per_tick_min = std::min(tokens_per_tick_min, l_tokens); + tokens_per_tick_max = std::max(tokens_per_tick_max, l_tokens); + auto p = tb.duration_for(d); + ticks.emplace_back(clock_type::now() - start, p, d, l_tokens, tokens); + if (ticks.size() > 2048) { + ticks.pop_front(); + } + return do_sleep(p); + } + ).then([this, start] { + auto delay = std::chrono::duration_cast>(clock_type::now() - start).count(); + fmt::print("{} {}t/{:.3f}s, speed is {:.1f}t/s goal {:.1f}t/s, {} ticks, per-tick [{}..{}]\n", this_shard_id(), tokens, delay, + double(tokens) / delay, double(tb.rate()) / smp::count, + ticks.size(), tokens_per_tick_min, tokens_per_tick_max); + tokens_per_tick_min = std::numeric_limits::max(); + tokens_per_tick_max = std::numeric_limits::min(); + work_result r { + .tokens = std::exchange(this->tokens, 0), + }; + return make_ready_future(std::move(r)); + }); + } + + future work_sleeping() { + return work([this] (std::chrono::duration d) { + return seastar::sleep(std::chrono::duration_cast(d)); + }); + } + + future work_yielding() { + return work([] (std::chrono::duration) { + return seastar::yield(); + }); + } + + future<> print_and_clear_ticks() { + fmt::print("{} {} ticks\n", this_shard_id(), ticks.size()); + std::chrono::microseconds p(0); + for (auto& td : ticks) { + fmt::print(" {:8} +{:5} us {:5}/{:5} def {:3} sleep {:5} us\n", td.delay.count(), (td.delay - p).count(), td.tokens, td.total, td.defic, td.sleep.count()); + p = td.delay; + } + ticks.clear(); + if (this_shard_id() == smp::count - 1) { + return make_ready_future<>(); + } + + return this->container().invoke_on(this_shard_id() + 1, &worker::print_and_clear_ticks); + } +}; + +struct hog { + std::exponential_distribution busy; + std::exponential_distribution rest; + std::optional> stopped; + bool keep_going = false; + uint64_t _iterations = 0; + + template + hog(T1 b, T2 r) noexcept + : busy(1.0 / std::chrono::duration_cast>(b).count()) + , rest(1.0 / std::chrono::duration_cast>(r).count()) + {} + + void work() { + assert(!stopped.has_value()); + keep_going = true; + stopped = do_until([this] { return !keep_going; }, + [this] { + auto p = std::chrono::duration(rest(testing::local_random_engine)); + return seastar::sleep(std::chrono::duration_cast(p)).then([this] { + _iterations++; + auto until = clock_type::now() + std::chrono::duration(busy(testing::local_random_engine)); + do { + } while (clock_type::now() < until && keep_going); + }); + } + ); + } + + future<> terminate() { + assert(stopped.has_value()); + keep_going = false; + auto f = std::move(*stopped); + stopped.reset(); + return f.finally([this] { + fmt::print("{}-hog {} iters\n", this_shard_id(), std::exchange(_iterations, 0)); + }); + } +}; + +template +struct context { + using worker_t = worker; + TokenBucket tb; + seastar::sharded w; + seastar::sharded h; + + static constexpr uint64_t rate = 1000000; + static constexpr uint64_t limit = rate / 2000; + static constexpr uint64_t threshold = 1; + + context() : tb(rate, limit, threshold) + { + w.start(std::ref(tb)).get(); + h.start(std::chrono::microseconds(500), std::chrono::microseconds(10)).get(); + fmt::print("Created tb {}t/s (limit {} threshold {})\n", tb.rate(), tb.limit(), tb.threshold()); + } + + ~context() { + h.stop().get(); + w.stop().get(); + } + + template + future<> run_workers(Fn&& fn) { + auto start = clock_type::now(); + return w.map_reduce0(std::forward(fn), statistics{}, accumulate).then([this, start] (statistics st) { + auto delay = std::chrono::duration_cast>(clock_type::now() - start).count(); + fmt::print("effective rate is {:.1f}t/s, [{} ... {}]\n", st.total / delay, st.min, st.max); + }); + } + + future<> test_sleeping() { + fmt::print("---8<---\n"); + return run_workers(&worker_t::work_sleeping); + } + + future<> test_yielding() { + fmt::print("---8<---\n"); + return run_workers(&worker_t::work_yielding); + } + + future<> test_sleeping_with_hog() { + fmt::print("---8<---\n"); + return h.invoke_on_all(&hog::work).then([this] { + return run_workers(&worker_t::work_sleeping).then([this] { + return h.invoke_on_all(&hog::terminate); + }); + }); + } +}; + +struct perf_capped_context : public context {}; +struct perf_pure_context : public context {}; + +PERF_TEST_F(perf_capped_context, sleeping_throughput) { return test_sleeping(); } +PERF_TEST_F(perf_capped_context, yielding_throughput) { return test_yielding(); } +PERF_TEST_F(perf_capped_context, sleeping_throughput_with_hog) { return test_sleeping_with_hog(); } + +PERF_TEST_F(perf_pure_context, sleeping_throughput) { return test_sleeping(); } +PERF_TEST_F(perf_pure_context, yielding_throughput) { return test_yielding(); } +PERF_TEST_F(perf_pure_context, sleeping_throughput_with_hog) { return test_sleeping_with_hog(); }