Skip to content

Commit

Permalink
io_queue: Oversubscribe to drain imbalance faster
Browse files Browse the repository at this point in the history
The natural lack of cross-shard fairness may lead to a nasty imbalance
problem. When a shard gets lots of requests queued (a spike) it will
try to drain its queue by dispatching requests on every tick. However,
if all other shards have something to do so that the disk capacity is
close to be exhausted, this overloaded shard will have little chance to
drain itself because every tick it will only get its "fair" amount of
capacity tokens, which is capacity/smp::count and that's it.

In order to drain the overloaded queue a shard should get more capacity
tokens than other shards. This will increase the pressure on other
shards, of course, "spreading" one shard queue among others thus
reducing the average latency of requests. When increasing the amount of
grabbed tokens there are two pitfals to avoid.

Both come from the fact that under described curcumstances shared
capacity is likely all exhausted and shards are "fighting" for tokens in
the "pending" state -- i.e. when they line up in the shared token bucket
for _future_ tokens, that will get there eventually as requests
complete. So...

1. If the capacity is all claimed by shards and shards continue to claim
   more, they will end-up in the "pending" state, which is -- they grab
   extra tokens from the shared capacity and "remember" their position
   in the shared queue when they are to get it. Thus, if an urgent
   request arrives at random shard in the worst case it will have to
   wait for this whole over-claimed line before it can get dispatched.
   Currently, the maximum length of the over-claimed queue is limited by
   one request per shard, which eventually equals to the
   io-latency-goal. If claiming _more_ than that, this would violate
   this time by the amount of over-claimed tokens, so it shouldn't be
   too large.

2. When increasing the pressure on the shared capacity, a shard has no
   idea if any other shard does the same. This means, that shard should
   try to avoid increasing the pressure "just because", there should be
   some yes-no reason for doing it, so that only "overloaded" shards try
   to grab more. If all shards suddenly get into this aggressive state,
   they will compensate each other, but according to p.1 the worst-case
   preemption latency would grow too high.

With the above two assumptions at hands, the proposed solution is to
introduce per-class capacity-claim measure which grows monotonically
with the class queue length and is proportional to class shares.

a. Over-claim at most one (1) request from the local queue

b. Start over-claim once the capacity claim goes above some threshold,
   and apply hysteresis on exisiting this state to avoid resonance

The capacity claim is deliberately selected to grow faster for high-prio
queues with short requests (scylla query class) and grow much slower for
low-prio queues with fat requests (scylla compaction/flush classes). So
it doesn't care about requests lengths, but depends on shares value.

Also, since several classes may fluctuate around claim thresholds, the
oversubscribing happens when there's at least one of that kind.

The thresholds are pretty-much random in this patch -- 12000 and 8000 --
and that's the biggest problem of it.

The issue can be reproduced with the help of recent io-tester over a
/dev/null storage :)

The io-properties.yaml:
```
disks:
  - mountpoint: /dev/null
    read_iops: 1200
    read_bandwidth: 1GB
    write_iops: 1200
    write_bandwidth: 1GB
```

The jobs conf.yaml:
```
- name: latency_reads_1
  shards: all
  type: randread
  data_size: 1GB
  shard_info:
    parallelism: 80
    rps: 1
    reqsize: 512
    shares: 1000

- name: latency_reads_1a
  shards: [0]
  type: randread
  data_size: 1GB
  shard_info:
    parallelism: 10
    limit: 100
    reqsize: 512
    class: latency_reads_1
```

Running it with 1 io group and 12 shards would result in shard 0
suffering from not-draining-ever queue and huge final latencies:

    shard p99 latency (usec)
       0: 1208561
       1: 14520
       2: 17456
       3: 15777
       4: 15488
       5: 14576
       6: 19251
       7: 20222
       8: 18338
       9: 21267
      10: 17083
      11: 16188

With this patch applied shard-0 would scatter its queue among other
shards within several ticks lowering its latency at the cost of other
shards's latencies:

    shard p99 latency (usec)
       0: 108345
       1: 102907
       2: 106900
       3: 105244
       4: 109214
       5: 107881
       6: 114278
       7: 114289
       8: 113560
       9: 105411
      10: 113898
      11: 112615

However, the larger the testing time, the smaller latencies become for
the 2nd test (and for the 1st too, but not for shard-0)

refs: scylladb#1083

Signed-off-by: Pavel Emelyanov <[email protected]>
  • Loading branch information
xemul committed Mar 1, 2023
1 parent 9cbc1fe commit 59b0dc9
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 0 deletions.
3 changes: 3 additions & 0 deletions include/seastar/core/fair_queue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@ private:
};

std::optional<pending> _pending;
unsigned _oversubscribing = 0;
std::optional<pending> _oversubscribed;

void push_priority_class(priority_class_data& pc) noexcept;
void push_priority_class_from_idle(priority_class_data& pc) noexcept;
Expand All @@ -355,6 +357,7 @@ private:
enum class grab_result { grabbed, cant_preempt, pending };
grab_result grab_capacity(const fair_queue_entry& ent) noexcept;
grab_result grab_pending_capacity(const fair_queue_entry& ent) noexcept;
void oversubscribe_capacity(capacity_t cap) noexcept;
public:
/// Constructs a fair queue with configuration parameters \c cfg.
///
Expand Down
29 changes: 29 additions & 0 deletions src/core/fair_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class fair_queue::priority_class_data {
capacity_t _accumulated = 0;
capacity_t _pure_accumulated = 0;
fair_queue_entry::container_list_t _queue;
unsigned _capacity_claim = 0;
bool _queued = false;
bool _plugged = true;

Expand Down Expand Up @@ -266,6 +267,11 @@ auto fair_queue::grab_pending_capacity(const fair_queue_entry& ent) noexcept ->
}

_pending.reset();
if (_oversubscribed) {
_pending = *_oversubscribed;
_oversubscribed.reset();
}

return grab_result::grabbed;
}

Expand All @@ -284,6 +290,12 @@ auto fair_queue::grab_capacity(const fair_queue_entry& ent) noexcept -> grab_res
return grab_result::grabbed;
}

void fair_queue::oversubscribe_capacity(capacity_t cap) noexcept {
assert(_pending);
capacity_t want_head = _group.grab_capacity(cap);
_oversubscribed.emplace(want_head, cap);
}

void fair_queue::register_priority_class(class_id id, uint32_t shares) {
if (id >= _priority_classes.size()) {
_priority_classes.resize(id + 1);
Expand Down Expand Up @@ -326,6 +338,9 @@ fair_queue_ticket fair_queue::resources_currently_executing() const {
return _resources_executing;
}

static constexpr uint64_t oversubscribe_start = 12000;
static constexpr uint64_t oversubscribe_stop = 8000;

void fair_queue::queue(class_id id, fair_queue_entry& ent) noexcept {
priority_class_data& pc = *_priority_classes[id];
// We need to return a future in this function on which the caller can wait.
Expand All @@ -335,8 +350,12 @@ void fair_queue::queue(class_id id, fair_queue_entry& ent) noexcept {
push_priority_class_from_idle(pc);
}
pc._queue.push_back(ent);
pc._capacity_claim += pc._shares;
_resources_queued += ent._ticket;
_requests_queued++;
if (pc._capacity_claim >= oversubscribe_start) {
_oversubscribing++;
}
}

void fair_queue::notify_request_finished(fair_queue_ticket desc) noexcept {
Expand Down Expand Up @@ -384,6 +403,12 @@ void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) {
auto& req = h._queue.front();
auto gr = grab_capacity(req);
if (gr == grab_result::pending) {
if (_oversubscribing > 0 && !_oversubscribed) {
auto cap = _group.ticket_capacity(req._ticket);
if (cap > 0) {
oversubscribe_capacity(cap);
}
}
break;
}

Expand All @@ -396,6 +421,10 @@ void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) {
_last_accumulated = std::max(h._accumulated, _last_accumulated);
pop_priority_class(h);
h._queue.pop_front();
h._capacity_claim -= h._shares;
if (h._capacity_claim < oversubscribe_stop) {
_oversubscribing--;
}

_resources_executing += req._ticket;
_resources_queued -= req._ticket;
Expand Down

0 comments on commit 59b0dc9

Please sign in to comment.