Skip to content

Commit

Permalink
Recycle task tokens for invalid matching tasks (#6599)
Browse files Browse the repository at this point in the history
## What changed?
Add a `RecycleToken` function to our custom rate limiters. It seems that
most of the rate limiters just wrap the `ClockedRateLimiter`, so let the
`ClockedRateLimiter`'s `WaitN` function unblock the waiter if recycle is
called, thus allowing us to recycle tokens and give them to any waiters
if they exist.

## Why?
Lots of invalid tasks combined with Activity task dispatch rate limiting
can cause the actual rate to be noticeably below the maximum rate. This
will make the actual rate match the max rps, as long as
`time_to_recycle` is reasonably short and `recycle_rate` is reasonable
(see discussion in the comments)

## How did you test it?
ClockedRateLimiter unit test.

## Potential risks
The max Activity RPS could break.

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->

---------

Co-authored-by: David Reiss <[email protected]>
  • Loading branch information
carlydf and dnr authored Oct 12, 2024
1 parent c20f6a1 commit 51c562a
Show file tree
Hide file tree
Showing 16 changed files with 238 additions and 55 deletions.
74 changes: 68 additions & 6 deletions common/quotas/clocked_rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
type ClockedRateLimiter struct {
rateLimiter *rate.Limiter
timeSource clock.TimeSource
recycleCh chan struct{}
}

var (
Expand All @@ -51,6 +52,7 @@ func NewClockedRateLimiter(rateLimiter *rate.Limiter, timeSource clock.TimeSourc
return ClockedRateLimiter{
rateLimiter: rateLimiter,
timeSource: timeSource,
recycleCh: make(chan struct{}),
}
}

Expand Down Expand Up @@ -131,12 +133,35 @@ func (l ClockedRateLimiter) WaitN(ctx context.Context, token int) error {
close(waitExpired)
})
defer timer.Stop()
select {
case <-ctx.Done():
reservation.Cancel()
return fmt.Errorf("%w: %v", ErrRateLimiterWaitInterrupted, ctx.Err())
case <-waitExpired:
return nil

for {
select {
case <-ctx.Done():
reservation.Cancel()
return fmt.Errorf("%w: %v", ErrRateLimiterWaitInterrupted, ctx.Err())
case <-waitExpired:
return nil
case <-l.recycleCh:
if token > 1 {
break // recycling 1 token to a process requesting >1 tokens is a no-op
}

// Cancel() reverses the effects of this Reservation on the rate limit as much as possible,
// considering that other reservations may have already been made. Normally, Cancel() indicates
// that the reservation holder will not perform the reserved action, so it would make the most
// sense to cancel the reservation whose token was just recycled. However, we don't have access
// to the recycled reservation anymore, and even if we did, Cancel on a reservation that
// has fully waited is a no-op, so instead we cancel the current reservation as a proxy.
//
// Since Cancel() just restores tokens to the rate limiter, cancelling the current 1-token
// reservation should have approximately the same effect on the actual rate as cancelling the
// recycled reservation.
//
// If the recycled reservation was for >1 token, cancelling the current 1-token reservation will
// lead to a slower actual rate than cancelling the original, so the approximation is conservative.
reservation.Cancel()
return nil
}
}
}

Expand All @@ -151,3 +176,40 @@ func (l ClockedRateLimiter) SetBurstAt(t time.Time, newBurst int) {
func (l ClockedRateLimiter) TokensAt(t time.Time) int {
return int(l.rateLimiter.TokensAt(t))
}

// RecycleToken should be called when the action being rate limited was not completed
// for some reason (i.e. a task is not dispatched because it was invalid).
// In this case, we want to immediately unblock another process that is waiting for one token
// so that the actual rate of completed actions is as close to the intended rate limit as possible.
// If no process is waiting for a token when RecycleToken is called, this is a no-op.
//
// Since we don't know how many tokens were reserved by the process calling recycle, we will only unblock
// new reservations that are for one token (otherwise we could recycle a 1-token-reservation and unblock
// a 100-token-reservation). If all waiting processes are waiting for >1 tokens, this is a no-op.
//
// Because recycleCh is an unbuffered channel, the token will be reused for the next waiter as long
// as there exists a waiter at the time RecycleToken is called. Usually the attempted rate is consistently
// above or below the limit for a period of time, so if rate limiting is in effect and recycling matters,
// most likely there will be a waiter. If the actual rate is erratically bouncing to either side of the
// rate limit AND we perform many recycles, this will drop some recycled tokens.
// If that situation turns out to be common, we may want to make it a buffered channel instead.
//
// Our goal is to ensure that each token in our bucket is used every second, meaning the time between
// taking and successfully using a token must be <= 1s. For this to be true, we must have:
//
// time_to_recycle * number_of_recycles_per_second <= 1s
// time_to_recycle * probability_of_recycle * number_of_attempts_per_second <= 1s
//
// Therefore, it is also possible for this strategy to be inaccurate if the delay between taking and
// successfully using a token is greater than one second.
//
// Currently, RecycleToken is called when we take a token to attempt a matching task dispatch and
// then later find out (usually via RPC to History) that the task should not be dispatched.
// If history rpc takes 10ms --> 100 opportunities for the token to be used that second --> 99% recycle probability is ok.
// If recycle probability is 50% --> need at least 2 opportunities for token to be used --> 500ms history rpc time is ok.
func (l ClockedRateLimiter) RecycleToken() {
select {
case l.recycleCh <- struct{}{}:
default:
}
}
63 changes: 63 additions & 0 deletions common/quotas/clocked_rate_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package quotas_test

import (
"context"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -151,3 +152,65 @@ func TestClockedRateLimiter_Wait_DeadlineWouldExceed(t *testing.T) {
t.Cleanup(cancel)
assert.ErrorIs(t, rl.Wait(ctx), quotas.ErrRateLimiterReservationWouldExceedContextDeadline)
}

// test that reservations for 1 token ARE unblocked by RecycleToken
func TestClockedRateLimiter_Wait_Recycle(t *testing.T) {
t.Parallel()
ts := clock.NewEventTimeSource()
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 1), ts)
ctx := context.Background()

// take first token
assert.NoError(t, rl.Wait(ctx))

// wait for next token and report when success
var asserted atomic.Bool
asserted.Store(false)
go func() {
assert.NoError(t, rl.Wait(ctx))
asserted.Store(true)
}()
// wait for rl.Wait() to start and get to the select statement
time.Sleep(10 * time.Millisecond) // nolint

// once a waiter exists, recycle the token instead of advancing time
rl.RecycleToken()

// wait until done so we know assert.NoError was called
assert.Eventually(t, func() bool { return asserted.Load() }, time.Second, time.Millisecond)
}

// test that reservations for >1 token are NOT unblocked by RecycleToken
func TestClockedRateLimiter_WaitN_NoRecycle(t *testing.T) {
t.Parallel()
ts := clock.NewEventTimeSource()

// set burst to 2 so that the reservation succeeds and WaitN gets to the select statement
rl := quotas.NewClockedRateLimiter(rate.NewLimiter(1, 2), ts)
ctx, cancel := context.WithCancel(context.Background())

// take first token
assert.NoError(t, rl.Wait(ctx))

// wait for 2 tokens, which will never get a recycle
// expect a context cancel error instead once we advance time
// wait for next token and report when success
var asserted atomic.Bool
asserted.Store(false)
go func() {
err := rl.WaitN(ctx, 2)
assert.ErrorContains(t, err, quotas.ErrRateLimiterWaitInterrupted.Error())
asserted.Store(true)
}()
// wait for rl.Wait() to start and get to the select statement
time.Sleep(10 * time.Millisecond) // nolint

// once a waiter exists, recycle the token instead of advancing time
rl.RecycleToken()

// cancel the context so that we return an error
cancel()

// wait until done so we know assert.NoError was called
assert.Eventually(t, func() bool { return asserted.Load() }, time.Second, time.Millisecond)
}
5 changes: 5 additions & 0 deletions common/quotas/dynamic_rate_limiter_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,8 @@ func (d *DynamicRateLimiterImpl) maybeRefresh() {
func (d *DynamicRateLimiterImpl) TokensAt(t time.Time) int {
return d.rateLimiter.TokensAt(t)
}

// RecycleToken returns a token to the rate limiter
func (d *DynamicRateLimiterImpl) RecycleToken() {
d.rateLimiter.RecycleToken()
}
20 changes: 14 additions & 6 deletions common/quotas/multi_rate_limiter_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func (rl *MultiRateLimiterImpl) Reserve() Reservation {
return rl.ReserveN(time.Now(), 1)
}

// ReserveN returns a Reservation that indicates how long the caller
// must wait before event happen.
// ReserveN calls ReserveN on its list of rate limiters and returns a MultiReservation that is a list of the
// individual reservation objects indicating how long the caller must wait before the event can happen.
func (rl *MultiRateLimiterImpl) ReserveN(now time.Time, numToken int) Reservation {
length := len(rl.rateLimiters)
reservations := make([]Reservation, 0, length)
Expand All @@ -116,12 +116,12 @@ func (rl *MultiRateLimiterImpl) ReserveN(now time.Time, numToken int) Reservatio
return NewMultiReservation(true, reservations)
}

// Wait waits up till deadline for a rate limit token
// Wait waits up till maximum deadline for a rate limit token
func (rl *MultiRateLimiterImpl) Wait(ctx context.Context) error {
return rl.WaitN(ctx, 1)
}

// WaitN waits up till deadline for n rate limit token
// WaitN waits up till maximum deadline for n rate limit tokens
func (rl *MultiRateLimiterImpl) WaitN(ctx context.Context, numToken int) error {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -160,7 +160,7 @@ func (rl *MultiRateLimiterImpl) WaitN(ctx context.Context, numToken int) error {
}
}

// Rate returns the rate per second for this rate limiter
// Rate returns the minimum rate per second for this rate limiter
func (rl *MultiRateLimiterImpl) Rate() float64 {
result := rl.rateLimiters[0].Rate()
for _, rateLimiter := range rl.rateLimiters {
Expand All @@ -172,7 +172,7 @@ func (rl *MultiRateLimiterImpl) Rate() float64 {
return result
}

// Burst returns the burst for this rate limiter
// Burst returns the minimum burst for this rate limiter
func (rl *MultiRateLimiterImpl) Burst() int {
result := rl.rateLimiters[0].Burst()
for _, rateLimiter := range rl.rateLimiters {
Expand All @@ -191,3 +191,11 @@ func (rl *MultiRateLimiterImpl) TokensAt(t time.Time) int {
}
return tokens
}

// RecycleToken returns a token to each sub-rate-limiter, unblocking each
// sub-rate-limiter's WaitN callers.
func (rl *MultiRateLimiterImpl) RecycleToken() {
for _, rateLimiter := range rl.rateLimiters {
rateLimiter.RecycleToken()
}
}
1 change: 1 addition & 0 deletions common/quotas/multi_reservation_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (r *MultiReservationImpl) Delay() time.Duration {

// DelayFrom returns the duration for which the reservation holder must wait
// before taking the reserved action. Zero duration means act immediately.
// MultiReservation DelayFrom returns the maximum delay of all its sub-reservations.
func (r *MultiReservationImpl) DelayFrom(now time.Time) time.Duration {
if !r.ok {
return InfDuration
Expand Down
5 changes: 5 additions & 0 deletions common/quotas/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,10 @@ type (

// TokensAt returns the number of tokens that will be available at time t
TokensAt(t time.Time) int

// RecycleToken immediately unblocks another process that is waiting for a token, if
// a waiter exists. A token should be recycled when the action being rate limited was
// not completed for some reason (i.e. a task is not dispatched because it was invalid).
RecycleToken()
}
)
11 changes: 8 additions & 3 deletions common/quotas/rate_limiter_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ func NewRateLimiter(newRPS float64, newBurst int) *RateLimiterImpl {
return rl
}

// SetRate set the rate of the rate limiter
// SetRPS sets the rate of the rate limiter
func (rl *RateLimiterImpl) SetRPS(rps float64) {
rl.refreshInternalRateLimiterImpl(&rps, nil)
}

// SetBurst set the burst of the rate limiter
// SetBurst sets the burst of the rate limiter
func (rl *RateLimiterImpl) SetBurst(burst int) {
rl.refreshInternalRateLimiterImpl(nil, &burst)
}
Expand All @@ -78,7 +78,7 @@ func (rl *RateLimiterImpl) ReserveN(now time.Time, n int) Reservation {
return rl.ClockedRateLimiter.ReserveN(now, n)
}

// SetRateBurst set the rps & burst of the rate limiter
// SetRateBurst sets the rps & burst of the rate limiter
func (rl *RateLimiterImpl) SetRateBurst(rps float64, burst int) {
rl.refreshInternalRateLimiterImpl(&rps, &burst)
}
Expand Down Expand Up @@ -132,3 +132,8 @@ func (rl *RateLimiterImpl) refreshInternalRateLimiterImpl(
rl.SetBurstAt(now, rl.burst)
}
}

// RecycleToken returns a token to the rate limiter
func (rl *RateLimiterImpl) RecycleToken() {
rl.ClockedRateLimiter.RecycleToken()
}
12 changes: 12 additions & 0 deletions common/quotas/rate_limiter_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 14 additions & 3 deletions service/matching/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ func (tm *TaskMatcher) Offer(ctx context.Context, task *internalTask) (bool, err
metrics.SyncThrottlePerTaskQueueCounter.With(tm.metricsHandler).Record(1)
return false, err
}
// because we waited on the rate limiter to offer this task,
// attach the rate limiter's RecycleToken func to the task
// so that if the task is later determined to be invalid,
// we can recycle the token it used.
task.recycleToken = tm.rateLimiter.RecycleToken
}

select {
Expand Down Expand Up @@ -317,6 +322,12 @@ func (tm *TaskMatcher) MustOffer(ctx context.Context, task *internalTask, interr
return err
}

// because we waited on the rate limiter to offer this task,
// attach the rate limiter's RecycleToken func to the task
// so that if the task is later determined to be invalid,
// we can recycle the token it used.
task.recycleToken = tm.rateLimiter.RecycleToken

// attempt a match with local poller first. When that
// doesn't succeed, try both local match and remote match
select {
Expand Down Expand Up @@ -393,9 +404,9 @@ forLoop:
}
cancel()
// at this point, we forwarded the task to a parent partition which
// in turn dispatched the task to a poller. Make sure we delete the
// task from the database
task.finish(nil)
// in turn dispatched the task to a poller, because there was no error.
// Make sure we delete the task from the database.
task.finish(nil, true)
tm.emitDispatchLatency(task, true)
return nil
case <-ctx.Done():
Expand Down
Loading

0 comments on commit 51c562a

Please sign in to comment.