Skip to content

Commit

Permalink
opt: speed up ReleaseTimeout() for multi-pool (#332)
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 authored Jun 17, 2024
1 parent 95dad45 commit da22980
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 16 deletions.
39 changes: 31 additions & 8 deletions multipool.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"strings"
"sync/atomic"
"time"

"golang.org/x/sync/errgroup"
)

// LoadBalancingStrategy represents the type of load-balancing algorithm.
Expand Down Expand Up @@ -182,22 +184,43 @@ func (mp *MultiPool) ReleaseTimeout(timeout time.Duration) error {
return ErrPoolClosed
}

var errStr strings.Builder
errCh := make(chan error, len(mp.pools))
var wg errgroup.Group
for i, pool := range mp.pools {
if err := pool.ReleaseTimeout(timeout); err != nil {
errStr.WriteString(fmt.Sprintf("pool %d: %v\n", i, err))
if i < len(mp.pools)-1 {
errStr.WriteString(" | ")
}
return err
func(p *Pool, idx int) {
wg.Go(func() error {
err := p.ReleaseTimeout(timeout)
if err != nil {
err = fmt.Errorf("pool %d: %v", idx, err)
}
errCh <- err
return err
})
}(pool, i)
}

_ = wg.Wait()

var (
i int
errStr strings.Builder
)
for err := range errCh {
i++
if i == len(mp.pools) {
break
}
if err != nil {
errStr.WriteString(err.Error())
errStr.WriteString(" | ")
}
}

if errStr.Len() == 0 {
return nil
}

return errors.New(errStr.String())
return errors.New(strings.TrimSuffix(errStr.String(), " | "))
}

// Reboot reboots a released multi-pool.
Expand Down
39 changes: 31 additions & 8 deletions multipool_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"strings"
"sync/atomic"
"time"

"golang.org/x/sync/errgroup"
)

// MultiPoolWithFunc consists of multiple pools, from which you will benefit the
Expand Down Expand Up @@ -172,22 +174,43 @@ func (mp *MultiPoolWithFunc) ReleaseTimeout(timeout time.Duration) error {
return ErrPoolClosed
}

var errStr strings.Builder
errCh := make(chan error, len(mp.pools))
var wg errgroup.Group
for i, pool := range mp.pools {
if err := pool.ReleaseTimeout(timeout); err != nil {
errStr.WriteString(fmt.Sprintf("pool %d: %v\n", i, err))
if i < len(mp.pools)-1 {
errStr.WriteString(" | ")
}
return err
func(p *PoolWithFunc, idx int) {
wg.Go(func() error {
err := p.ReleaseTimeout(timeout)
if err != nil {
err = fmt.Errorf("pool %d: %v", idx, err)
}
errCh <- err
return err
})
}(pool, i)
}

_ = wg.Wait()

var (
i int
errStr strings.Builder
)
for err := range errCh {
i++
if i == len(mp.pools) {
break
}
if err != nil {
errStr.WriteString(err.Error())
errStr.WriteString(" | ")
}
}

if errStr.Len() == 0 {
return nil
}

return errors.New(errStr.String())
return errors.New(strings.TrimSuffix(errStr.String(), " | "))
}

// Reboot reboots a released multi-pool.
Expand Down

0 comments on commit da22980

Please sign in to comment.