diff --git a/ants.go b/ants.go index 4ec81fad..4b61ba2b 100644 --- a/ants.go +++ b/ants.go @@ -66,6 +66,12 @@ var ( // ErrTimeout will be returned after the operations timed out. ErrTimeout = errors.New("operation timed out") + // ErrInvalidPoolIndex will be returned when trying to retrieve a pool with an invalid index. + ErrInvalidPoolIndex = errors.New("invalid pool index") + + // ErrInvalidLoadBalancingStrategy will be returned when trying to create a MultiPool with an invalid load-balancing strategy. + ErrInvalidLoadBalancingStrategy = errors.New("invalid load-balancing strategy") + // workerChanCap determines whether the channel of a worker should be a buffered channel // to get the best performance. Inspired by fasthttp at // https://github.com/valyala/fasthttp/blob/master/workerpool.go#L139 diff --git a/ants_benchmark_test.go b/ants_benchmark_test.go index 360443a8..8f1280bb 100644 --- a/ants_benchmark_test.go +++ b/ants_benchmark_test.go @@ -25,6 +25,7 @@ package ants import ( "runtime" "sync" + "sync/atomic" "testing" "time" @@ -47,18 +48,22 @@ func demoPoolFunc(args interface{}) { time.Sleep(time.Duration(n) * time.Millisecond) } +var stopLongRunningFunc int32 + func longRunningFunc() { - for { + for atomic.LoadInt32(&stopLongRunningFunc) == 0 { runtime.Gosched() } } +var stopLongRunningPoolFunc int32 + func longRunningPoolFunc(arg interface{}) { if ch, ok := arg.(chan struct{}); ok { <-ch return } - for { + for atomic.LoadInt32(&stopLongRunningPoolFunc) == 0 { runtime.Gosched() } } diff --git a/ants_test.go b/ants_test.go index c216066d..a78ff2f4 100644 --- a/ants_test.go +++ b/ants_test.go @@ -985,3 +985,113 @@ func TestDefaultPoolReleaseTimeout(t *testing.T) { err := ReleaseTimeout(2 * time.Second) assert.NoError(t, err) } + +func TestMultiPool(t *testing.T) { + _, err := NewMultiPool(10, -1, 8) + assert.ErrorIs(t, err, ErrInvalidLoadBalancingStrategy) + + mp, err := NewMultiPool(10, 5, RoundRobin) + testFn := func() { + for i := 0; i < 50; i++ { + err = mp.Submit(longRunningFunc) + assert.NoError(t, err) + } + assert.EqualValues(t, mp.Waiting(), 0) + _, err = mp.WaitingByIndex(-1) + assert.ErrorIs(t, err, ErrInvalidPoolIndex) + _, err = mp.WaitingByIndex(11) + assert.ErrorIs(t, err, ErrInvalidPoolIndex) + assert.EqualValues(t, 50, mp.Running()) + _, err = mp.RunningByIndex(-1) + assert.ErrorIs(t, err, ErrInvalidPoolIndex) + _, err = mp.RunningByIndex(11) + assert.ErrorIs(t, err, ErrInvalidPoolIndex) + assert.EqualValues(t, 0, mp.Free()) + _, err = mp.FreeByIndex(-1) + assert.ErrorIs(t, err, ErrInvalidPoolIndex) + _, err = mp.FreeByIndex(11) + assert.ErrorIs(t, err, ErrInvalidPoolIndex) + assert.EqualValues(t, 50, mp.Cap()) + assert.False(t, mp.IsClosed()) + for i := 0; i < 10; i++ { + n, _ := mp.WaitingByIndex(i) + assert.EqualValues(t, 0, n) + n, _ = mp.RunningByIndex(i) + assert.EqualValues(t, 5, n) + n, _ = mp.FreeByIndex(i) + assert.EqualValues(t, 0, n) + } + atomic.StoreInt32(&stopLongRunningFunc, 1) + assert.NoError(t, mp.ReleaseTimeout(3*time.Second)) + assert.Zero(t, mp.Running()) + assert.True(t, mp.IsClosed()) + atomic.StoreInt32(&stopLongRunningFunc, 0) + } + testFn() + + mp.Reboot() + testFn() + + mp, err = NewMultiPool(10, 5, LeastTasks) + testFn() + + mp.Reboot() + testFn() + + mp.Tune(10) +} + +func TestMultiPoolWithFunc(t *testing.T) { + _, err := NewMultiPoolWithFunc(10, -1, longRunningPoolFunc, 8) + assert.ErrorIs(t, err, ErrInvalidLoadBalancingStrategy) + + mp, err := NewMultiPoolWithFunc(10, 5, longRunningPoolFunc, RoundRobin) + testFn := func() { + for i := 0; i < 50; i++ { + err = mp.Invoke(i) + assert.NoError(t, err) + } + assert.EqualValues(t, mp.Waiting(), 0) + _, err = mp.WaitingByIndex(-1) + assert.ErrorIs(t, err, ErrInvalidPoolIndex) + _, err = mp.WaitingByIndex(11) + assert.ErrorIs(t, err, ErrInvalidPoolIndex) + assert.EqualValues(t, 50, mp.Running()) + _, err = mp.RunningByIndex(-1) + assert.ErrorIs(t, err, ErrInvalidPoolIndex) + _, err = mp.RunningByIndex(11) + assert.ErrorIs(t, err, ErrInvalidPoolIndex) + assert.EqualValues(t, 0, mp.Free()) + _, err = mp.FreeByIndex(-1) + assert.ErrorIs(t, err, ErrInvalidPoolIndex) + _, err = mp.FreeByIndex(11) + assert.ErrorIs(t, err, ErrInvalidPoolIndex) + assert.EqualValues(t, 50, mp.Cap()) + assert.False(t, mp.IsClosed()) + for i := 0; i < 10; i++ { + n, _ := mp.WaitingByIndex(i) + assert.EqualValues(t, 0, n) + n, _ = mp.RunningByIndex(i) + assert.EqualValues(t, 5, n) + n, _ = mp.FreeByIndex(i) + assert.EqualValues(t, 0, n) + } + atomic.StoreInt32(&stopLongRunningPoolFunc, 1) + assert.NoError(t, mp.ReleaseTimeout(3*time.Second)) + assert.Zero(t, mp.Running()) + assert.True(t, mp.IsClosed()) + atomic.StoreInt32(&stopLongRunningPoolFunc, 0) + } + testFn() + + mp.Reboot() + testFn() + + mp, err = NewMultiPoolWithFunc(10, 5, longRunningPoolFunc, LeastTasks) + testFn() + + mp.Reboot() + testFn() + + mp.Tune(10) +} diff --git a/multipool.go b/multipool.go new file mode 100644 index 00000000..4e17f27f --- /dev/null +++ b/multipool.go @@ -0,0 +1,205 @@ +// MIT License + +// Copyright (c) 2023 Andy Pan + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package ants + +import ( + "errors" + "fmt" + "strings" + "sync/atomic" + "time" +) + +// LoadBalancingStrategy represents the type of load-balancing algorithm. +type LoadBalancingStrategy int + +const ( + // RoundRobin distributes task to a list of pools in rotation. + RoundRobin LoadBalancingStrategy = 1 << (iota + 1) + + // LeastTasks always selects the pool with the least number of pending tasks. + LeastTasks +) + +// MultiPool consists of multiple pools, from which you will benefit the +// performance improvement on basis of the fine-grained locking that reduces +// the lock contention. +// MultiPool is a good fit for the scenario that you have a large number of +// tasks to submit, and you don't want the single pool to be the bottleneck. +type MultiPool struct { + pools []*Pool + index uint32 + state int32 + lbs LoadBalancingStrategy +} + +// NewMultiPool instantiates a MultiPool with a size of the pool list and a size +// per pool, and the load-balancing strategy. +func NewMultiPool(size, sizePerPool int, lbs LoadBalancingStrategy, options ...Option) (*MultiPool, error) { + pools := make([]*Pool, size) + for i := 0; i < size; i++ { + pool, err := NewPool(sizePerPool, options...) + if err != nil { + return nil, err + } + pools[i] = pool + } + if lbs != RoundRobin && lbs != LeastTasks { + return nil, ErrInvalidLoadBalancingStrategy + } + return &MultiPool{pools: pools, lbs: lbs}, nil +} + +func (mp *MultiPool) next() (idx int) { + switch mp.lbs { + case RoundRobin: + if idx = int((atomic.AddUint32(&mp.index, 1) - 1) % uint32(len(mp.pools))); idx == -1 { + idx = 0 + } + return + case LeastTasks: + leastTasks := 1<<31 - 1 + for i, pool := range mp.pools { + if n := pool.Running(); n < leastTasks { + leastTasks = n + idx = i + } + } + return + } + return -1 +} + +// Submit submits a task to a pool selected by the load-balancing strategy. +func (mp *MultiPool) Submit(task func()) error { + if mp.IsClosed() { + return ErrPoolClosed + } + return mp.pools[mp.next()].Submit(task) +} + +// Running returns the number of the currently running workers across all pools. +func (mp *MultiPool) Running() (n int) { + for _, pool := range mp.pools { + n += pool.Running() + } + return +} + +// RunningByIndex returns the number of the currently running workers in the specific pool. +func (mp *MultiPool) RunningByIndex(idx int) (int, error) { + if idx < 0 || idx >= len(mp.pools) { + return -1, ErrInvalidPoolIndex + } + return mp.pools[idx].Running(), nil +} + +// Free returns the number of available workers across all pools. +func (mp *MultiPool) Free() (n int) { + for _, pool := range mp.pools { + n += pool.Free() + } + return +} + +// FreeByIndex returns the number of available workers in the specific pool. +func (mp *MultiPool) FreeByIndex(idx int) (int, error) { + if idx < 0 || idx >= len(mp.pools) { + return -1, ErrInvalidPoolIndex + } + return mp.pools[idx].Free(), nil +} + +// Waiting returns the number of the currently waiting tasks across all pools. +func (mp *MultiPool) Waiting() (n int) { + for _, pool := range mp.pools { + n += pool.Waiting() + } + return +} + +// WaitingByIndex returns the number of the currently waiting tasks in the specific pool. +func (mp *MultiPool) WaitingByIndex(idx int) (int, error) { + if idx < 0 || idx >= len(mp.pools) { + return -1, ErrInvalidPoolIndex + } + return mp.pools[idx].Waiting(), nil +} + +// Cap returns the capacity of this multi-pool. +func (mp *MultiPool) Cap() (n int) { + for _, pool := range mp.pools { + n += pool.Cap() + } + return +} + +// Tune resizes each pool in multi-pool. +// +// Note that this method doesn't resize the overall +// capacity of multi-pool. +func (mp *MultiPool) Tune(size int) { + for _, pool := range mp.pools { + pool.Tune(size) + } +} + +// IsClosed indicates whether the multi-pool is closed. +func (mp *MultiPool) IsClosed() bool { + return atomic.LoadInt32(&mp.state) == CLOSED +} + +// ReleaseTimeout closes the multi-pool with a timeout, +// it waits all pools to be closed before timing out. +func (mp *MultiPool) ReleaseTimeout(timeout time.Duration) error { + if !atomic.CompareAndSwapInt32(&mp.state, OPENED, CLOSED) { + return ErrPoolClosed + } + + var errStr strings.Builder + for i, pool := range mp.pools { + if err := pool.ReleaseTimeout(timeout); err != nil { + errStr.WriteString(fmt.Sprintf("pool %d: %v\n", i, err)) + if i < len(mp.pools)-1 { + errStr.WriteString(" | ") + } + return err + } + } + + if errStr.Len() == 0 { + return nil + } + + return errors.New(errStr.String()) +} + +// Reboot reboots a released multi-pool. +func (mp *MultiPool) Reboot() { + if atomic.CompareAndSwapInt32(&mp.state, CLOSED, OPENED) { + atomic.StoreUint32(&mp.index, 0) + for _, pool := range mp.pools { + pool.Reboot() + } + } +} diff --git a/multipool_func.go b/multipool_func.go new file mode 100644 index 00000000..627ab78e --- /dev/null +++ b/multipool_func.go @@ -0,0 +1,194 @@ +// MIT License + +// Copyright (c) 2023 Andy Pan + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package ants + +import ( + "errors" + "fmt" + "strings" + "sync/atomic" + "time" +) + +// MultiPoolWithFunc consists of multiple pools, from which you will benefit the +// performance improvement on basis of the fine-grained locking that reduces +// the lock contention. +// MultiPoolWithFunc is a good fit for the scenario that you have a large number of +// tasks to submit, and you don't want the single pool to be the bottleneck. +type MultiPoolWithFunc struct { + pools []*PoolWithFunc + index uint32 + state int32 + lbs LoadBalancingStrategy +} + +// NewMultiPoolWithFunc instantiates a MultiPoolWithFunc with a size of the pool list and a size +// per pool, and the load-balancing strategy. +func NewMultiPoolWithFunc(size, sizePerPool int, fn func(interface{}), lbs LoadBalancingStrategy, options ...Option) (*MultiPoolWithFunc, error) { + pools := make([]*PoolWithFunc, size) + for i := 0; i < size; i++ { + pool, err := NewPoolWithFunc(sizePerPool, fn, options...) + if err != nil { + return nil, err + } + pools[i] = pool + } + if lbs != RoundRobin && lbs != LeastTasks { + return nil, ErrInvalidLoadBalancingStrategy + } + return &MultiPoolWithFunc{pools: pools, lbs: lbs}, nil +} + +func (mp *MultiPoolWithFunc) next() (idx int) { + switch mp.lbs { + case RoundRobin: + if idx = int((atomic.AddUint32(&mp.index, 1) - 1) % uint32(len(mp.pools))); idx == -1 { + idx = 0 + } + return + case LeastTasks: + leastTasks := 1<<31 - 1 + for i, pool := range mp.pools { + if n := pool.Running(); n < leastTasks { + leastTasks = n + idx = i + } + } + return + } + return -1 +} + +// Invoke submits a task to a pool selected by the load-balancing strategy. +func (mp *MultiPoolWithFunc) Invoke(args interface{}) error { + if mp.IsClosed() { + return ErrPoolClosed + } + return mp.pools[mp.next()].Invoke(args) +} + +// Running returns the number of the currently running workers across all pools. +func (mp *MultiPoolWithFunc) Running() (n int) { + for _, pool := range mp.pools { + n += pool.Running() + } + return +} + +// RunningByIndex returns the number of the currently running workers in the specific pool. +func (mp *MultiPoolWithFunc) RunningByIndex(idx int) (int, error) { + if idx < 0 || idx >= len(mp.pools) { + return -1, ErrInvalidPoolIndex + } + return mp.pools[idx].Running(), nil +} + +// Free returns the number of available workers across all pools. +func (mp *MultiPoolWithFunc) Free() (n int) { + for _, pool := range mp.pools { + n += pool.Free() + } + return +} + +// FreeByIndex returns the number of available workers in the specific pool. +func (mp *MultiPoolWithFunc) FreeByIndex(idx int) (int, error) { + if idx < 0 || idx >= len(mp.pools) { + return -1, ErrInvalidPoolIndex + } + return mp.pools[idx].Free(), nil +} + +// Waiting returns the number of the currently waiting tasks across all pools. +func (mp *MultiPoolWithFunc) Waiting() (n int) { + for _, pool := range mp.pools { + n += pool.Waiting() + } + return +} + +// WaitingByIndex returns the number of the currently waiting tasks in the specific pool. +func (mp *MultiPoolWithFunc) WaitingByIndex(idx int) (int, error) { + if idx < 0 || idx >= len(mp.pools) { + return -1, ErrInvalidPoolIndex + } + return mp.pools[idx].Waiting(), nil +} + +// Cap returns the capacity of this multi-pool. +func (mp *MultiPoolWithFunc) Cap() (n int) { + for _, pool := range mp.pools { + n += pool.Cap() + } + return +} + +// Tune resizes each pool in multi-pool. +// +// Note that this method doesn't resize the overall +// capacity of multi-pool. +func (mp *MultiPoolWithFunc) Tune(size int) { + for _, pool := range mp.pools { + pool.Tune(size) + } +} + +// IsClosed indicates whether the multi-pool is closed. +func (mp *MultiPoolWithFunc) IsClosed() bool { + return atomic.LoadInt32(&mp.state) == CLOSED +} + +// ReleaseTimeout closes the multi-pool with a timeout, +// it waits all pools to be closed before timing out. +func (mp *MultiPoolWithFunc) ReleaseTimeout(timeout time.Duration) error { + if !atomic.CompareAndSwapInt32(&mp.state, OPENED, CLOSED) { + return ErrPoolClosed + } + + var errStr strings.Builder + for i, pool := range mp.pools { + if err := pool.ReleaseTimeout(timeout); err != nil { + errStr.WriteString(fmt.Sprintf("pool %d: %v\n", i, err)) + if i < len(mp.pools)-1 { + errStr.WriteString(" | ") + } + return err + } + } + + if errStr.Len() == 0 { + return nil + } + + return errors.New(errStr.String()) +} + +// Reboot reboots a released multi-pool. +func (mp *MultiPoolWithFunc) Reboot() { + if atomic.CompareAndSwapInt32(&mp.state, CLOSED, OPENED) { + atomic.StoreUint32(&mp.index, 0) + for _, pool := range mp.pools { + pool.Reboot() + } + } +} diff --git a/pool.go b/pool.go index 497d4cfa..04754306 100644 --- a/pool.go +++ b/pool.go @@ -159,7 +159,7 @@ func (p *Pool) nowTime() time.Time { return p.now.Load().(time.Time) } -// NewPool generates an instance of ants pool. +// NewPool instantiates a Pool with customized options. func NewPool(size int, options ...Option) (*Pool, error) { if size <= 0 { size = -1 diff --git a/pool_func.go b/pool_func.go index 413f1873..08fd31cb 100644 --- a/pool_func.go +++ b/pool_func.go @@ -160,7 +160,7 @@ func (p *PoolWithFunc) nowTime() time.Time { return p.now.Load().(time.Time) } -// NewPoolWithFunc generates an instance of ants pool with a specific function. +// NewPoolWithFunc instantiates a PoolWithFunc with customized options. func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWithFunc, error) { if size <= 0 { size = -1