diff --git a/tests/unit/fair_queue_test.cc b/tests/unit/fair_queue_test.cc index bbcf5f7942c..12ca5aec858 100644 --- a/tests/unit/fair_queue_test.cc +++ b/tests/unit/fair_queue_test.cc @@ -75,7 +75,16 @@ class test_env { } void drain() { - do {} while (tick() != 0); + while (true) { + if (tick()) { + continue; + } + auto ts = _fq.next_pending_aio(); + if (ts == std::chrono::steady_clock::time_point::max()) { + break; + } + sleep(ts - std::chrono::steady_clock::now()).get(); + } } public: test_env(unsigned capacity) @@ -90,27 +99,27 @@ class test_env { // Because of this property, one useful use of tick() is to implement a drain() // method (see above) in which all requests currently sent to the queue are drained // before the queue is destroyed. - unsigned tick(unsigned n = 1) { + unsigned tick(unsigned n = 0) { unsigned processed = 0; - _fg.replenish_capacity(_fg.replenished_ts() + std::chrono::microseconds(1)); - _fq.dispatch_requests([] (fair_queue_entry& ent) { - boost::intrusive::get_parent_from_member(&ent, &request::fqent)->submit(); - }); + while (true) { + _fg.replenish_capacity(_fg.replenished_ts() + std::chrono::microseconds(1)); + _fq.dispatch_requests([] (fair_queue_entry& ent) { + boost::intrusive::get_parent_from_member(&ent, &request::fqent)->submit(); + }); - for (unsigned i = 0; i < n; ++i) { std::vector curr; curr.swap(_inflight); for (auto& req : curr) { - processed++; - _results[req.index]++; + if (processed < n) { + _results[req.index]++; + } _fq.notify_request_finished(req.fqent.capacity()); + processed++; + } + if (processed >= n) { + break; } - - _fg.replenish_capacity(_fg.replenished_ts() + std::chrono::microseconds(1)); - _fq.dispatch_requests([] (fair_queue_entry& ent) { - boost::intrusive::get_parent_from_member(&ent, &request::fqent)->submit(); - }); } return processed; }