Skip to content

Commit

Permalink
opt: make ReleaseTimeout() more efficient in waiting workers to exit (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 authored Jun 17, 2024
1 parent 3ffd3da commit 15e8961
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 89 deletions.
90 changes: 62 additions & 28 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ import (
syncx "github.com/panjf2000/ants/v2/internal/sync"
)

// Pool accepts the tasks and process them concurrently,
// it limits the total of goroutines to a given number by recycling goroutines.
type Pool struct {
type poolCommon struct {
// capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to
// avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool
// which submits a new task to the same pool.
Expand All @@ -54,25 +52,38 @@ type Pool struct {
// cond for waiting to get an idle worker.
cond *sync.Cond

// done is used to indicate that all workers are done.
allDone chan struct{}
// once is used to make sure the pool is closed just once.
once *sync.Once

// workerCache speeds up the obtainment of a usable worker in function:retrieveWorker.
workerCache sync.Pool

// waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lock
waiting int32

purgeDone int32
purgeCtx context.Context
stopPurge context.CancelFunc

ticktockDone int32
ticktockCtx context.Context
stopTicktock context.CancelFunc

now atomic.Value

options *Options
}

// Pool accepts the tasks and process them concurrently,
// it limits the total of goroutines to a given number by recycling goroutines.
type Pool struct {
poolCommon
}

// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger.
func (p *Pool) purgeStaleWorkers(ctx context.Context) {
func (p *Pool) purgeStaleWorkers() {
ticker := time.NewTicker(p.options.ExpiryDuration)

defer func() {
Expand All @@ -82,7 +93,7 @@ func (p *Pool) purgeStaleWorkers(ctx context.Context) {

for {
select {
case <-ctx.Done():
case <-p.purgeCtx.Done():
return
case <-ticker.C:
}
Expand Down Expand Up @@ -116,7 +127,7 @@ func (p *Pool) purgeStaleWorkers(ctx context.Context) {
}

// ticktock is a goroutine that updates the current time in the pool regularly.
func (p *Pool) ticktock(ctx context.Context) {
func (p *Pool) ticktock() {
ticker := time.NewTicker(nowTimeUpdateInterval)
defer func() {
ticker.Stop()
Expand All @@ -125,7 +136,7 @@ func (p *Pool) ticktock(ctx context.Context) {

for {
select {
case <-ctx.Done():
case <-p.ticktockCtx.Done():
return
case <-ticker.C:
}
Expand All @@ -144,16 +155,14 @@ func (p *Pool) goPurge() {
}

// Start a goroutine to clean up expired workers periodically.
var ctx context.Context
ctx, p.stopPurge = context.WithCancel(context.Background())
go p.purgeStaleWorkers(ctx)
p.purgeCtx, p.stopPurge = context.WithCancel(context.Background())
go p.purgeStaleWorkers()
}

func (p *Pool) goTicktock() {
p.now.Store(time.Now())
var ctx context.Context
ctx, p.stopTicktock = context.WithCancel(context.Background())
go p.ticktock(ctx)
p.ticktockCtx, p.stopTicktock = context.WithCancel(context.Background())
go p.ticktock()
}

func (p *Pool) nowTime() time.Time {
Expand All @@ -180,11 +189,13 @@ func NewPool(size int, options ...Option) (*Pool, error) {
opts.Logger = defaultLogger
}

p := &Pool{
p := &Pool{poolCommon: poolCommon{
capacity: int32(size),
allDone: make(chan struct{}),
lock: syncx.NewSpinLock(),
once: &sync.Once{},
options: opts,
}
}}
p.workerCache.New = func() interface{} {
return &goWorker{
pool: p,
Expand Down Expand Up @@ -281,8 +292,10 @@ func (p *Pool) Release() {
p.stopPurge()
p.stopPurge = nil
}
p.stopTicktock()
p.stopTicktock = nil
if p.stopTicktock != nil {
p.stopTicktock()
p.stopTicktock = nil
}

p.lock.Lock()
p.workers.reset()
Expand All @@ -297,19 +310,38 @@ func (p *Pool) ReleaseTimeout(timeout time.Duration) error {
if p.IsClosed() || (!p.options.DisablePurge && p.stopPurge == nil) || p.stopTicktock == nil {
return ErrPoolClosed
}

p.Release()

interval := timeout / releaseTimeoutCount
endTime := time.Now().Add(timeout)
for time.Now().Before(endTime) {
if p.Running() == 0 &&
(p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) &&
atomic.LoadInt32(&p.ticktockDone) == 1 {
return nil
var purgeCh <-chan struct{}
if !p.options.DisablePurge {
purgeCh = p.purgeCtx.Done()
} else {
purgeCh = p.allDone
}

if p.Running() == 0 {
p.once.Do(func() {
close(p.allDone)
})

Check warning on line 326 in pool.go

View check run for this annotation

Codecov / codecov/patch

pool.go#L324-L326

Added lines #L324 - L326 were not covered by tests
}

timer := time.NewTimer(timeout)
defer timer.Stop()
for {
select {
case <-timer.C:
return ErrTimeout

Check warning on line 334 in pool.go

View check run for this annotation

Codecov / codecov/patch

pool.go#L333-L334

Added lines #L333 - L334 were not covered by tests
case <-p.allDone:
<-purgeCh
<-p.ticktockCtx.Done()
if p.Running() == 0 &&
(p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) &&
atomic.LoadInt32(&p.ticktockDone) == 1 {
return nil
}
}
time.Sleep(interval)
}
return ErrTimeout
}

// Reboot reboots a closed pool.
Expand All @@ -319,11 +351,13 @@ func (p *Pool) Reboot() {
p.goPurge()
atomic.StoreInt32(&p.ticktockDone, 0)
p.goTicktock()
p.allDone = make(chan struct{})
p.once = &sync.Once{}
}
}

func (p *Pool) addRunning(delta int) {
atomic.AddInt32(&p.running, int32(delta))
func (p *Pool) addRunning(delta int) int {
return int(atomic.AddInt32(&p.running, int32(delta)))
}

func (p *Pool) addWaiting(delta int) {
Expand Down
111 changes: 52 additions & 59 deletions pool_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,46 +34,14 @@ import (
// PoolWithFunc accepts the tasks and process them concurrently,
// it limits the total of goroutines to a given number by recycling goroutines.
type PoolWithFunc struct {
// capacity of the pool.
capacity int32

// running is the number of the currently running goroutines.
running int32

// lock for protecting the worker queue.
lock sync.Locker

// workers is a slice that store the available workers.
workers workerQueue

// state is used to notice the pool to closed itself.
state int32

// cond for waiting to get an idle worker.
cond *sync.Cond
poolCommon

// poolFunc is the function for processing tasks.
poolFunc func(interface{})

// workerCache speeds up the obtainment of a usable worker in function:retrieveWorker.
workerCache sync.Pool

// waiting is the number of the goroutines already been blocked on pool.Invoke(), protected by pool.lock
waiting int32

purgeDone int32
stopPurge context.CancelFunc

ticktockDone int32
stopTicktock context.CancelFunc

now atomic.Value

options *Options
}

// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger.
func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) {
func (p *PoolWithFunc) purgeStaleWorkers() {
ticker := time.NewTicker(p.options.ExpiryDuration)
defer func() {
ticker.Stop()
Expand All @@ -82,7 +50,7 @@ func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) {

for {
select {
case <-ctx.Done():
case <-p.purgeCtx.Done():
return
case <-ticker.C:
}
Expand Down Expand Up @@ -116,7 +84,7 @@ func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) {
}

// ticktock is a goroutine that updates the current time in the pool regularly.
func (p *PoolWithFunc) ticktock(ctx context.Context) {
func (p *PoolWithFunc) ticktock() {
ticker := time.NewTicker(nowTimeUpdateInterval)
defer func() {
ticker.Stop()
Expand All @@ -125,7 +93,7 @@ func (p *PoolWithFunc) ticktock(ctx context.Context) {

for {
select {
case <-ctx.Done():
case <-p.ticktockCtx.Done():
return
case <-ticker.C:
}
Expand All @@ -144,16 +112,14 @@ func (p *PoolWithFunc) goPurge() {
}

// Start a goroutine to clean up expired workers periodically.
var ctx context.Context
ctx, p.stopPurge = context.WithCancel(context.Background())
go p.purgeStaleWorkers(ctx)
p.purgeCtx, p.stopPurge = context.WithCancel(context.Background())
go p.purgeStaleWorkers()
}

func (p *PoolWithFunc) goTicktock() {
p.now.Store(time.Now())
var ctx context.Context
ctx, p.stopTicktock = context.WithCancel(context.Background())
go p.ticktock(ctx)
p.ticktockCtx, p.stopTicktock = context.WithCancel(context.Background())
go p.ticktock()
}

func (p *PoolWithFunc) nowTime() time.Time {
Expand Down Expand Up @@ -185,10 +151,14 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
}

p := &PoolWithFunc{
capacity: int32(size),
poolCommon: poolCommon{
capacity: int32(size),
allDone: make(chan struct{}),
lock: syncx.NewSpinLock(),
once: &sync.Once{},
options: opts,
},
poolFunc: pf,
lock: syncx.NewSpinLock(),
options: opts,
}
p.workerCache.New = func() interface{} {
return &goWorkerWithFunc{
Expand Down Expand Up @@ -286,8 +256,10 @@ func (p *PoolWithFunc) Release() {
p.stopPurge()
p.stopPurge = nil
}
p.stopTicktock()
p.stopTicktock = nil
if p.stopTicktock != nil {
p.stopTicktock()
p.stopTicktock = nil
}

p.lock.Lock()
p.workers.reset()
Expand All @@ -302,19 +274,38 @@ func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error {
if p.IsClosed() || (!p.options.DisablePurge && p.stopPurge == nil) || p.stopTicktock == nil {
return ErrPoolClosed
}

p.Release()

interval := timeout / releaseTimeoutCount
endTime := time.Now().Add(timeout)
for time.Now().Before(endTime) {
if p.Running() == 0 &&
(p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) &&
atomic.LoadInt32(&p.ticktockDone) == 1 {
return nil
var purgeCh <-chan struct{}
if !p.options.DisablePurge {
purgeCh = p.purgeCtx.Done()
} else {
purgeCh = p.allDone
}

if p.Running() == 0 {
p.once.Do(func() {
close(p.allDone)
})
}

timer := time.NewTimer(timeout)
defer timer.Stop()
for {
select {
case <-timer.C:
return ErrTimeout

Check warning on line 298 in pool_func.go

View check run for this annotation

Codecov / codecov/patch

pool_func.go#L297-L298

Added lines #L297 - L298 were not covered by tests
case <-p.allDone:
<-purgeCh
<-p.ticktockCtx.Done()
if p.Running() == 0 &&
(p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) &&
atomic.LoadInt32(&p.ticktockDone) == 1 {
return nil
}
}
time.Sleep(interval)
}
return ErrTimeout
}

// Reboot reboots a closed pool.
Expand All @@ -324,11 +315,13 @@ func (p *PoolWithFunc) Reboot() {
p.goPurge()
atomic.StoreInt32(&p.ticktockDone, 0)
p.goTicktock()
p.allDone = make(chan struct{})
p.once = &sync.Once{}
}
}

func (p *PoolWithFunc) addRunning(delta int) {
atomic.AddInt32(&p.running, int32(delta))
func (p *PoolWithFunc) addRunning(delta int) int {
return int(atomic.AddInt32(&p.running, int32(delta)))
}

func (p *PoolWithFunc) addWaiting(delta int) {
Expand Down
6 changes: 5 additions & 1 deletion worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ func (w *goWorker) run() {
w.pool.addRunning(1)
go func() {
defer func() {
w.pool.addRunning(-1)
if w.pool.addRunning(-1) == 0 && w.pool.IsClosed() {
w.pool.once.Do(func() {
close(w.pool.allDone)
})
}
w.pool.workerCache.Put(w)
if p := recover(); p != nil {
if ph := w.pool.options.PanicHandler; ph != nil {
Expand Down
Loading

0 comments on commit 15e8961

Please sign in to comment.