diff --git a/collect.go b/collect.go index 66ae471..f23dcb0 100644 --- a/collect.go +++ b/collect.go @@ -324,10 +324,10 @@ func (pg *pipeGroup[T, R]) doWait() { }() // Runs first: Wait for inputs. Wait "quietly", not canceling the // context yet so if there is an error later we can still see it - pg.g.quietWait() + pg.g.waitWithoutCanceling() }() // Runs third: Wait for outputs to be done - pg.pipeWorkers.quietWait() + pg.pipeWorkers.waitWithoutCanceling() } var _ CollectingExecutor[int] = collectingGroup[int]{} diff --git a/group.go b/group.go index 15e42bc..ec9ddfb 100644 --- a/group.go +++ b/group.go @@ -82,7 +82,7 @@ type Executor interface { getContext() (context.Context, context.CancelCauseFunc) // Waits without canceling the context with errGroupDone. The caller of this // function promises that they will be responsible for canceling the context - quietWait() + waitWithoutCanceling() } // Creates a basic executor which runs all the functions given in one goroutine @@ -159,11 +159,11 @@ func (n *runner) Go(op func(context.Context)) { } func (n *runner) Wait() { - n.quietWait() + n.waitWithoutCanceling() n.cancel(errGroupDone) } -func (n *runner) quietWait() { +func (n *runner) waitWithoutCanceling() { n.awaited.Store(true) runtime.SetFinalizer(n, nil) } @@ -243,10 +243,10 @@ func (g *group) Go(op func(context.Context)) { func (g *group) Wait() { defer g.cancel(errGroupDone) - g.quietWait() + g.waitWithoutCanceling() } -func (g *group) quietWait() { +func (g *group) waitWithoutCanceling() { g.awaited.Store(true) runtime.SetFinalizer(g, nil) g.wg.Wait() @@ -327,12 +327,12 @@ func (lg *limitedGroup) Wait() { lg.g.Wait() } -func (lg *limitedGroup) quietWait() { +func (lg *limitedGroup) waitWithoutCanceling() { if !lg.awaited.Swap(true) { close(lg.ops) runtime.SetFinalizer(lg, nil) // Don't try to close this chan again :) } - lg.g.quietWait() + lg.g.waitWithoutCanceling() } func (lg *limitedGroup) getContext() (context.Context, context.CancelCauseFunc) {