diff --git a/apps/io_tester/ioinfo.cc b/apps/io_tester/ioinfo.cc index d23f265e05b..d2393e6ec77 100644 --- a/apps/io_tester/ioinfo.cc +++ b/apps/io_tester/ioinfo.cc @@ -75,7 +75,6 @@ int main(int ac, char** av) { out << YAML::EndMap; const auto& fg = internal::get_fair_group(ioq, internal::io_direction_and_length::write_idx); - out << YAML::Key << "per_tick_grab_threshold" << YAML::Value << fg.per_tick_grab_threshold(); const auto& tb = fg.token_bucket(); out << YAML::Key << "token_bucket" << YAML::BeginMap; diff --git a/include/seastar/core/fair_queue.hh b/include/seastar/core/fair_queue.hh index 546cb00b601..ccd66c7f6dc 100644 --- a/include/seastar/core/fair_queue.hh +++ b/include/seastar/core/fair_queue.hh @@ -26,6 +26,7 @@ #include #include #include +#include #include #include @@ -108,6 +109,7 @@ public: // a 'normalized' form -- converted from floating-point to fixed-point number // and scaled accrding to fair-group's token-bucket duration using capacity_t = uint64_t; + using signed_capacity_t = std::make_signed_t; friend class fair_queue; private: @@ -138,6 +140,7 @@ public: class fair_group { public: using capacity_t = fair_queue_entry::capacity_t; + using signed_capacity_t = fair_queue_entry::signed_capacity_t; using clock_type = std::chrono::steady_clock; /* @@ -189,7 +192,7 @@ public: * time period for which the speeds from F (in above formula) are taken. */ - static constexpr float fixed_point_factor = float(1 << 24); + static constexpr float fixed_point_factor = float(1 << 21); using rate_resolution = std::milli; using token_bucket_t = internal::shared_token_bucket; @@ -215,10 +218,21 @@ private: */ token_bucket_t _token_bucket; - const capacity_t _per_tick_threshold; + + // Capacities accumulated by queues in this group. Each queue tries not + // to run too far ahead of the others, if it does -- it skips dispatch + // loop until next tick in the hope that other shards would grab the + // unused disk capacity and will move their counters forward. + std::vector _balance; public: + // Maximum value the _balance entry can get + // It's also set when a queue goes idle and doesn't need to participate + // in accumulated races. This value is still suitable for comparisons + // of "active" queues + static constexpr capacity_t max_balance = std::numeric_limits::max(); + // Convert internal capacity value back into the real token static double capacity_tokens(capacity_t cap) noexcept { return (double)cap / fixed_point_factor / token_bucket_t::rate_cast(std::chrono::seconds(1)).count(); @@ -251,7 +265,6 @@ public: fair_group(fair_group&&) = delete; capacity_t maximum_capacity() const noexcept { return _token_bucket.limit(); } - capacity_t per_tick_grab_threshold() const noexcept { return _per_tick_threshold; } capacity_t grab_capacity(capacity_t cap) noexcept; clock_type::time_point replenished_ts() const noexcept { return _token_bucket.replenished_ts(); } void replenish_capacity(clock_type::time_point now) noexcept; @@ -265,6 +278,10 @@ public: } const token_bucket_t& token_bucket() const noexcept { return _token_bucket; } + + capacity_t current_balance() const noexcept; + void update_balance(capacity_t) noexcept; + void reset_balance() noexcept; }; /// \brief Fair queuing class @@ -300,7 +317,7 @@ public: using class_id = unsigned int; class priority_class_data; using capacity_t = fair_group::capacity_t; - using signed_capacity_t = std::make_signed_t; + using signed_capacity_t = fair_queue_entry::signed_capacity_t; private: using clock_type = std::chrono::steady_clock; @@ -330,6 +347,23 @@ private: std::vector> _priority_classes; size_t _nr_classes = 0; capacity_t _last_accumulated = 0; + capacity_t _total_accumulated = 0; + + // Amortize balance checking by assuming that once balance achieved, + // it would remain such for the "quiscent" duration. Increase this + // duration every time the assumption keeps true, but not more than + // tau. When the balance is lost, reset back to frequent checks. + static constexpr auto minimal_quiscent_duration = std::chrono::microseconds(100); + std::chrono::microseconds _balance_quiscent_duration = minimal_quiscent_duration; + timer _balance_timer; + // Maximum capacity that a queue can stay behind other shards + // + // This is similar to priority classes fall-back deviation and it's + // calculated as the number of capacity points a group with 1 share + // accumulates over tau + // + // Check max_deviation math in push_priority_class_from_idle()) + const capacity_t _max_imbalance; /* * When the shared capacity os over the local queue delays @@ -360,6 +394,8 @@ private: enum class grab_result { grabbed, cant_preempt, pending }; grab_result grab_capacity(const fair_queue_entry& ent) noexcept; grab_result grab_pending_capacity(const fair_queue_entry& ent) noexcept; + + bool balanced() noexcept; public: /// Constructs a fair queue with configuration parameters \c cfg. /// diff --git a/src/core/fair_queue.cc b/src/core/fair_queue.cc index 1ff282d42b0..1f18a21987a 100644 --- a/src/core/fair_queue.cc +++ b/src/core/fair_queue.cc @@ -48,6 +48,8 @@ module seastar; namespace seastar { +logger fq_log("fair_queue"); + static_assert(sizeof(fair_queue_ticket) == sizeof(uint64_t), "unexpected fair_queue_ticket size"); static_assert(sizeof(fair_queue_entry) <= 3 * sizeof(void*), "unexpected fair_queue_entry::_hook size"); static_assert(sizeof(fair_queue_entry::container_list_t) == 2 * sizeof(void*), "unexpected priority_class::_queue size"); @@ -107,7 +109,7 @@ fair_group::fair_group(config cfg, unsigned nr_queues) std::max(fixed_point_factor * token_bucket_t::rate_cast(cfg.rate_limit_duration).count(), tokens_capacity(cfg.limit_min_tokens)), tokens_capacity(cfg.min_tokens) ) - , _per_tick_threshold(_token_bucket.limit() / nr_queues) + , _balance(smp::count, max_balance) { if (tokens_capacity(cfg.min_tokens) > _token_bucket.threshold()) { throw std::runtime_error("Fair-group replenisher limit is lower than threshold"); @@ -137,6 +139,21 @@ auto fair_group::capacity_deficiency(capacity_t from) const noexcept -> capacity return _token_bucket.deficiency(from); } +auto fair_group::current_balance() const noexcept -> capacity_t { + return *std::min_element(_balance.begin(), _balance.end()); +} + +void fair_group::update_balance(capacity_t acc) noexcept { + _balance[this_shard_id()] = acc; +} + +void fair_group::reset_balance() noexcept { + // Request cost can be up to half a million. Given 100K iops disk and a + // class with 1 share, the 64-bit accumulating counter would overflows + // once in few years. + on_internal_error_noexcept(fq_log, "cannot reset group balance"); +} + // Priority class, to be used with a given fair_queue class fair_queue::priority_class_data { friend class fair_queue; @@ -165,7 +182,15 @@ fair_queue::fair_queue(fair_group& group, config cfg) : _config(std::move(cfg)) , _group(group) , _group_replenish(clock_type::now()) + , _balance_timer([this] { + auto new_qd = _balance_quiscent_duration * 2; + _balance_quiscent_duration = std::min(new_qd, _config.tau); + }) + , _max_imbalance(fair_group::fixed_point_factor * fair_group::token_bucket_t::rate_cast(_config.tau).count()) { + if (fair_group::max_balance > std::numeric_limits::max() - _max_imbalance) { + throw std::runtime_error("Too large tau parameter"); + } } fair_queue::~fair_queue() { @@ -194,6 +219,12 @@ void fair_queue::push_priority_class_from_idle(priority_class_data& pc) noexcept // over signed maximum (see overflow check below) pc._accumulated = std::max(_last_accumulated - max_deviation, pc._accumulated); _handles.assert_enough_capacity(); + if (_handles.empty()) { + capacity_t balance = _group.current_balance(); + if (balance != fair_group::max_balance) { + _total_accumulated = std::max(balance - _max_imbalance, _total_accumulated); + } + } _handles.push(&pc); pc._queued = true; } @@ -330,11 +361,29 @@ fair_queue::clock_type::time_point fair_queue::next_pending_aio() const noexcept return std::chrono::steady_clock::time_point::max(); } +bool fair_queue::balanced() noexcept { + if (_balance_timer.armed()) { + return true; + } + + capacity_t balance = _group.current_balance(); + if (_total_accumulated > balance + _max_imbalance) { + _balance_quiscent_duration = minimal_quiscent_duration; + return false; + } + + _balance_timer.arm(_balance_quiscent_duration); + return true; +} + void fair_queue::dispatch_requests(std::function cb) { - capacity_t dispatched = 0; boost::container::small_vector preempt; - while (!_handles.empty() && (dispatched < _group.per_tick_grab_threshold())) { + if (!balanced()) { + return; + } + + while (!_handles.empty()) { priority_class_data& h = *_handles.top(); if (h._queue.empty() || !h._plugged) { pop_priority_class(h); @@ -362,7 +411,7 @@ void fair_queue::dispatch_requests(std::function cb) { // has chances to be translated into zero cost which, in turn, will make the // class show no progress and monopolize the queue. auto req_cap = req._capacity; - auto req_cost = std::max(req_cap / h._shares, (capacity_t)1); + auto req_cost = std::max((req_cap + h._shares - 1) / h._shares, (capacity_t)1); // signed overflow check to make push_priority_class_from_idle math work if (h._accumulated >= std::numeric_limits::max() - req_cost) { for (auto& pc : _priority_classes) { @@ -376,9 +425,9 @@ void fair_queue::dispatch_requests(std::function cb) { } _last_accumulated = 0; } + _total_accumulated += req_cost; h._accumulated += req_cost; h._pure_accumulated += req_cap; - dispatched += req_cap; cb(req); @@ -390,6 +439,13 @@ void fair_queue::dispatch_requests(std::function cb) { for (auto&& h : preempt) { push_priority_class(*h); } + + if (_total_accumulated >= fair_group::max_balance) [[unlikely]] { + _group.reset_balance(); + _total_accumulated = 0; + _balance_quiscent_duration = minimal_quiscent_duration; + } + _group.update_balance(_handles.empty() ? fair_group::max_balance : _total_accumulated); } std::vector fair_queue::metrics(class_id c) { diff --git a/tests/manual/iosched-smp.py b/tests/manual/iosched-smp.py new file mode 100755 index 00000000000..a9b58ee4409 --- /dev/null +++ b/tests/manual/iosched-smp.py @@ -0,0 +1,167 @@ +#!/bin/env python3 + +import os.path +import yaml +import shutil +import subprocess +import argparse +from functools import reduce + +parser = argparse.ArgumentParser(description='IO scheduler tester') +parser.add_argument('--directory', help='Directory to run on', default='/mnt') +parser.add_argument('--seastar-build-dir', help='Path to seastar build directory', default='./build/dev/', dest='bdir') +parser.add_argument('--duration', help='One run duration', default=60) +parser.add_argument('--test', help='Test name to run') +args = parser.parse_args() + +class iotune: + def __init__(self, args): + self._iotune = args.bdir + '/apps/iotune/iotune' + self._dir = args.directory + + def ensure_io_properties(self): + if os.path.exists('io_properties.yaml'): + print('Using existing io_properties file') + else: + print('Running iotune') + subprocess.check_call([self._iotune, '--evaluation-directory', self._dir, '-c1', '--properties-file', 'io_properties.yaml']) + +class ioinfo: + def __init__(self, args): + self._ioinfo = args.bdir + '/apps/io_tester/ioinfo' + self._dir = args.directory + res = subprocess.check_output([self._ioinfo, '--directory', self._dir, '--io-properties-file', 'io_properties.yaml']) + self._info = yaml.safe_load(res) + + def min_read_length(self): + return 4 + + def max_read_length(self): + return min(self._info['disk_read_max_length'] / 1024, 128) + + def min_write_length(self): + return 4 + + def max_write_length(self): + return min(self._info['disk_write_max_length'] / 1024, 64) + + +class job: + def __init__(self, typ, req_size_kb, prl, shards, shares, rps=None): + self._typ = typ + self._req_size = req_size_kb + self._prl = prl + self._shares = shares + self._shards = shards + self._rps = rps + + def to_conf_entry(self, name): + ret = { + 'name': name, + 'shards': self._shards, + 'type': self._typ, + 'shard_info': { + 'parallelism': self._prl, + 'reqsize': f'{self._req_size}kB', + 'shares': self._shares + } + } + if self._rps is not None: + ret['shard_info']['rps'] = self._rps + return ret + + def shares(self): + return self._shares + + +class io_tester: + def __init__(self, args, smp): + self._jobs = [] + self._duration = args.duration + self._io_tester = args.bdir + '/apps/io_tester/io_tester' + self._dir = args.directory + self._use_fraction = 0.1 + self._smp = smp + + def add_job(self, name, job): + self._jobs.append(job.to_conf_entry(name)) + + def _setup_data_sizes(self): + du = shutil.disk_usage(self._dir) + one_job_space_mb = int(du.free * self._use_fraction / len(self._jobs) / (100*1024*1024)) * 100 # round down to 100MB + for j in self._jobs: + j['data_size'] = f'{one_job_space_mb}MB' + + def run(self): + if not self._jobs: + raise 'Empty jobs' + + self._setup_data_sizes() + yaml.dump(self._jobs, open('conf.yaml', 'w')) + self._proc = subprocess.Popen([self._io_tester, '--storage', self._dir, f'-c{self._smp}', '--num-io-groups', '1', '--conf', 'conf.yaml', '--duration', f'{self._duration}', '--io-properties-file', 'io_properties.yaml'], stdout=subprocess.PIPE) + res = self._proc.communicate() + res = res[0].split(b'---\n')[1] + return yaml.safe_load(res) + + +def run_jobs(jobs, args, smp): + iot = io_tester(args, smp) + results = {} + for j in jobs: + iot.add_job(j, jobs[j]) + results[j] = { 'iops': 0, 'shares': jobs[j].shares() } + + out = iot.run() + statuses = {} + + for j in results: + for shard in out: + if j in shard: + results[j]['iops'] += shard[j]['IOPS'] + + return results + + +iotune(args).ensure_io_properties() +ioinf = ioinfo(args) + +def run_shares_balance_test(): + for s in [ 100, 200, 400, 800 ]: + def ratios(res, idx): + shares_ratio = float(res[f'shard_{idx}']['shares']) / float(res['shard_0']['shares']) + iops_ratio = float(res[f'shard_{idx}']['iops']) / float(res['shard_0']['iops']) + return (shares_ratio, iops_ratio) + + res = run_jobs({ + 'shard_0': job('randread', ioinf.min_read_length(), 16, ['0'], 100), + 'shard_1': job('randread', ioinf.min_read_length(), 24, ['1'], s), + }, args, 2) + print(f'{res}') + shares_ratio, iops_ratio = ratios(res, 1) + print(f'IOPS ratio {iops_ratio:.3}, expected {shares_ratio:.3}, deviation {int(abs(shares_ratio - iops_ratio)*100.0/shares_ratio)}%') + + for s2 in [ 200, 400, 800 ]: + if s2 <= s: + continue + + res = run_jobs({ + 'shard_0': job('randread', ioinf.min_read_length(), 16, ['0'], 100), + 'shard_1': job('randread', ioinf.min_read_length(), 24, ['1'], s), + 'shard_2': job('randread', ioinf.min_read_length(), 32, ['2'], s2), + }, args, 3) + print(f'{res}') + shares_ratio, iops_ratio = ratios(res, 1) + print(f'shard-1 IOPS ratio {iops_ratio:.3}, expected {shares_ratio:.3}, deviation {int(abs(shares_ratio - iops_ratio)*100.0/shares_ratio)}%') + shares_ratio, iops_ratio = ratios(res, 2) + print(f'shard-2 IOPS ratio {iops_ratio:.3}, expected {shares_ratio:.3}, deviation {int(abs(shares_ratio - iops_ratio)*100.0/shares_ratio)}%') + +def run_rps_balance_test(): + for rps in [ 1000, 2000, 4000 ]: + res = run_jobs({ + 'shard_0': job('randread', ioinf.min_read_length(), 1, ['0'], 100, rps=1000), + 'shard_1': job('randread', ioinf.min_read_length(), 1, ['1'], 100, rps=rps), + }, args, 2) + print(f'iops = {res["shard_0"]["iops"]}:{res["shard_1"]["iops"]} want 1000:{rps}') + +run_shares_balance_test() +run_rps_balance_test() diff --git a/tests/unit/fair_queue_test.cc b/tests/unit/fair_queue_test.cc index bbcf5f7942c..12ca5aec858 100644 --- a/tests/unit/fair_queue_test.cc +++ b/tests/unit/fair_queue_test.cc @@ -75,7 +75,16 @@ class test_env { } void drain() { - do {} while (tick() != 0); + while (true) { + if (tick()) { + continue; + } + auto ts = _fq.next_pending_aio(); + if (ts == std::chrono::steady_clock::time_point::max()) { + break; + } + sleep(ts - std::chrono::steady_clock::now()).get(); + } } public: test_env(unsigned capacity) @@ -90,27 +99,27 @@ class test_env { // Because of this property, one useful use of tick() is to implement a drain() // method (see above) in which all requests currently sent to the queue are drained // before the queue is destroyed. - unsigned tick(unsigned n = 1) { + unsigned tick(unsigned n = 0) { unsigned processed = 0; - _fg.replenish_capacity(_fg.replenished_ts() + std::chrono::microseconds(1)); - _fq.dispatch_requests([] (fair_queue_entry& ent) { - boost::intrusive::get_parent_from_member(&ent, &request::fqent)->submit(); - }); + while (true) { + _fg.replenish_capacity(_fg.replenished_ts() + std::chrono::microseconds(1)); + _fq.dispatch_requests([] (fair_queue_entry& ent) { + boost::intrusive::get_parent_from_member(&ent, &request::fqent)->submit(); + }); - for (unsigned i = 0; i < n; ++i) { std::vector curr; curr.swap(_inflight); for (auto& req : curr) { - processed++; - _results[req.index]++; + if (processed < n) { + _results[req.index]++; + } _fq.notify_request_finished(req.fqent.capacity()); + processed++; + } + if (processed >= n) { + break; } - - _fg.replenish_capacity(_fg.replenished_ts() + std::chrono::microseconds(1)); - _fq.dispatch_requests([] (fair_queue_entry& ent) { - boost::intrusive::get_parent_from_member(&ent, &request::fqent)->submit(); - }); } return processed; }