From 467247cd466c67c5d4150b42a5422b52653d40b4 Mon Sep 17 00:00:00 2001 From: Kent Ross Date: Mon, 23 Sep 2024 16:04:32 -0700 Subject: [PATCH 01/19] always cancel executors' contexts --- group.go | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/group.go b/group.go index 0cf295d..b155f7b 100644 --- a/group.go +++ b/group.go @@ -61,7 +61,11 @@ func Unlimited(ctx context.Context) Executor { func Limited(ctx context.Context, maxGoroutines int) Executor { if maxGoroutines < 1 { gctx, cancel := context.WithCancelCause(ctx) - return &runner{ctx: gctx, cancel: cancel} + g := &runner{ctx: gctx, cancel: cancel} + runtime.SetFinalizer(g, func(doomed *runner) { + doomed.cancel(nil) + }) + return g } making := &limitedGroup{ g: makeGroup(context.WithCancelCause(ctx)), @@ -96,6 +100,10 @@ func (n *runner) Go(op func(context.Context)) { func (n *runner) Wait() { n.awaited.Store(true) + // We are ending the group's lifetime within this function call; defer the + // cancelation and unset our finalizer. + n.cancel(nil) + runtime.SetFinalizer(n, nil) } func (n *runner) getContext() (context.Context, context.CancelCauseFunc) { @@ -103,7 +111,11 @@ func (n *runner) getContext() (context.Context, context.CancelCauseFunc) { } func makeGroup(ctx context.Context, cancel context.CancelCauseFunc) *group { - return &group{runner: runner{ctx: ctx, cancel: cancel}} + g := &group{runner: runner{ctx: ctx, cancel: cancel}} + runtime.SetFinalizer(g, func(doomed *group) { + doomed.cancel(nil) + }) + return g } // Base concurrent executor @@ -147,6 +159,10 @@ func (g *group) Go(op func(context.Context)) { func (g *group) Wait() { g.awaited.Store(true) + // We are ending the group's lifetime within this function call; defer the + // cancelation and unset our finalizer. + defer g.cancel(nil) + runtime.SetFinalizer(g, nil) g.wg.Wait() g.checkPanic() } From fa0441ee051bbe52faa4b836452a28f65933d292 Mon Sep 17 00:00:00 2001 From: Kent Ross Date: Mon, 23 Sep 2024 16:18:23 -0700 Subject: [PATCH 02/19] sentinel error for group done so we can still cancel --- collect.go | 22 ++++++++++++++++------ group.go | 19 +++++++++++++------ 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/collect.go b/collect.go index 902341f..75a6c04 100644 --- a/collect.go +++ b/collect.go @@ -223,6 +223,16 @@ func FeedWithErrs[T any](executor Executor, receiver func(context.Context, T) er return making } +// groupError returns the error associated with a group's context; if the error +// was errGroupDone, that doesn't count as an error and nil is returned instead. +func groupError(ctx context.Context) error { + err := context.Cause(ctx) + if err == errGroupDone { + return nil + } + return err +} + var _ ErrGroupExecutor = &errGroup{} type errGroup struct { @@ -242,7 +252,7 @@ func (eg *errGroup) Go(op func(context.Context) error) { func (eg *errGroup) Wait() error { eg.g.Wait() ctx, _ := eg.g.getContext() - return context.Cause(ctx) + return groupError(ctx) } func makePipeGroup[T any, R any](executor Executor) *pipeGroup[T, R] { @@ -329,7 +339,7 @@ func (cg collectingGroup[T]) Go(op func(context.Context) (T, error)) { func (cg collectingGroup[T]) Wait() ([]T, error) { cg.doWait() ctx, _ := cg.g.getContext() - if err := context.Cause(ctx); err != nil { + if err := groupError(ctx); err != nil { // We have an error; return it return nil, err } @@ -358,7 +368,7 @@ func (fg feedingGroup[T]) Go(op func(context.Context) (T, error)) { func (fg feedingGroup[T]) Wait() error { fg.doWait() ctx, _ := fg.g.getContext() - return context.Cause(ctx) + return groupError(ctx) } var _ AllErrsExecutor = multiErrGroup{} @@ -381,7 +391,7 @@ func (meg multiErrGroup) Wait() MultiError { meg.doWait() err := CombineErrors(*meg.res...) ctx, _ := meg.g.getContext() - if cause := context.Cause(ctx); cause != nil { + if cause := groupError(ctx); cause != nil { return CombineErrors(cause, err) } return err @@ -415,7 +425,7 @@ func (ceg collectingMultiErrGroup[T]) Wait() ([]T, MultiError) { ceg.doWait() res, err := ceg.res.values, CombineErrors(ceg.res.errs...) ctx, _ := ceg.g.getContext() - if cause := context.Cause(ctx); cause != nil { + if cause := groupError(ctx); cause != nil { return res, CombineErrors(cause, err) } return res, err @@ -439,7 +449,7 @@ func (feg feedingMultiErrGroup[T]) Wait() MultiError { feg.doWait() err := CombineErrors(*feg.res...) ctx, _ := feg.g.getContext() - if cause := context.Cause(ctx); cause != nil { + if cause := groupError(ctx); cause != nil { return CombineErrors(cause, err) } return err diff --git a/group.go b/group.go index b155f7b..befcde0 100644 --- a/group.go +++ b/group.go @@ -15,7 +15,14 @@ const bufferSize = 8 const misuseMessage = "parallel executor misuse: don't reuse executors" -var errPanicked = errors.New("panicked") +var ( + errPanicked = errors.New("panicked") + errGroupDone = errors.New("executor done") + errGroupAbandoned = errors.New("executor abandoned") + + // Contexts are canceled with this error when executors are awaited. + GroupDoneError = errGroupDone +) // NOTE: If you want to really get crazy with it, it IS permissible and safe to // call Go(...) from multiple threads without additional synchronization, on @@ -63,7 +70,7 @@ func Limited(ctx context.Context, maxGoroutines int) Executor { gctx, cancel := context.WithCancelCause(ctx) g := &runner{ctx: gctx, cancel: cancel} runtime.SetFinalizer(g, func(doomed *runner) { - doomed.cancel(nil) + doomed.cancel(errGroupAbandoned) }) return g } @@ -74,7 +81,7 @@ func Limited(ctx context.Context, maxGoroutines int) Executor { } runtime.SetFinalizer(making, func(doomed *limitedGroup) { close(doomed.ops) - doomed.g.cancel(nil) + doomed.g.cancel(errGroupAbandoned) }) return making } @@ -102,7 +109,7 @@ func (n *runner) Wait() { n.awaited.Store(true) // We are ending the group's lifetime within this function call; defer the // cancelation and unset our finalizer. - n.cancel(nil) + n.cancel(errGroupDone) runtime.SetFinalizer(n, nil) } @@ -113,7 +120,7 @@ func (n *runner) getContext() (context.Context, context.CancelCauseFunc) { func makeGroup(ctx context.Context, cancel context.CancelCauseFunc) *group { g := &group{runner: runner{ctx: ctx, cancel: cancel}} runtime.SetFinalizer(g, func(doomed *group) { - doomed.cancel(nil) + doomed.cancel(errGroupAbandoned) }) return g } @@ -161,7 +168,7 @@ func (g *group) Wait() { g.awaited.Store(true) // We are ending the group's lifetime within this function call; defer the // cancelation and unset our finalizer. - defer g.cancel(nil) + defer g.cancel(errGroupDone) runtime.SetFinalizer(g, nil) g.wg.Wait() g.checkPanic() From d896d58364d8f053b08be756f32242c91d254edd Mon Sep 17 00:00:00 2001 From: Kent Ross Date: Mon, 23 Sep 2024 18:16:13 -0700 Subject: [PATCH 03/19] add testing that contexts are always canceled --- safety_test.go | 177 +++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 148 insertions(+), 29 deletions(-) diff --git a/safety_test.go b/safety_test.go index 487ed96..0ef1a3e 100644 --- a/safety_test.go +++ b/safety_test.go @@ -12,8 +12,45 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) +type contextLeak struct { + lock sync.Mutex + ctxs []context.Context +} + +func (c *contextLeak) leak(ctx context.Context) { + c.lock.Lock() + defer c.lock.Unlock() + c.ctxs = append(c.ctxs, ctx) +} + +func (c *contextLeak) assertAllCanceled(t *testing.T, expected ...error) { + t.Helper() + if len(expected) > 1 { + panic("please just provide 1 expected error for all the contexts") + } + c.lock.Lock() + defer c.lock.Unlock() + for _, ctx := range c.ctxs { + cause := context.Cause(ctx) + if cause == nil { + t.Fatal("context was not canceled") + } + if len(expected) == 1 { + require.ErrorIs(t, cause, expected[0]) + } + } +} + +// Wait for all contexts to be done +func (c *contextLeak) join() { + for _, ctx := range c.ctxs { + <-ctx.Done() + } +} + // The tests in this file can be detected as racy by the race condition checker // because we are reaching under the hood to look at the group's channel, so we // can see when the group's functions have started running. There's no good @@ -24,11 +61,20 @@ import ( func TestLimitedGroupCleanup(t *testing.T) { t.Parallel() var counter int64 + var leak contextLeak + opsQueue := func() chan func(context.Context) { g := Limited(context.Background(), 10) for i := 0; i < 100; i++ { - g.Go(func(context.Context) { + g.Go(func(ctx context.Context) { + defer func() { + p := recover() + if p != nil { + println(p) + } + }() atomic.AddInt64(&counter, 1) + leak.leak(ctx) }) } return g.(*limitedGroup).ops @@ -41,90 +87,128 @@ func TestLimitedGroupCleanup(t *testing.T) { } // The channel should get closed! assert.Equal(t, int64(100), counter) + leak.assertAllCanceled(t, errGroupAbandoned) } func TestCollectorCleanup(t *testing.T) { t.Parallel() + var leak contextLeak valuePipe := func() chan int { g := Collect[int](Unlimited(context.Background())) - g.Go(func(context.Context) (int, error) { return 1, nil }) + g.Go(func(ctx context.Context) (int, error) { + leak.leak(ctx) + return 1, nil + }) return g.(collectingGroup[int]).pipe // leak the un-awaited group }() assert.NotNil(t, valuePipe) - runtime.GC() // Trigger cleanups for leaked resources + runtime.GC() // Trigger cleanup of the collector + runtime.GC() // Trigger cleanup of the executor it owned + runtime.GC() // One more for good measure for range valuePipe { + // The channel should get closed! } - // The channel should get closed! + leak.assertAllCanceled(t) // the cancelation error is inconsistent here } func TestFeederCleanup(t *testing.T) { t.Parallel() + var leak contextLeak valuePipe := func() chan int { g := Feed[int](Unlimited(context.Background()), func(context.Context, int) error { return nil }) - g.Go(func(context.Context) (int, error) { return 1, nil }) + g.Go(func(ctx context.Context) (int, error) { + leak.leak(ctx) + return 1, nil + }) return g.(feedingGroup[int]).pipe // leak the un-awaited group }() assert.NotNil(t, valuePipe) - runtime.GC() // Trigger cleanups for leaked resources + runtime.GC() // Trigger cleanup of the feeder + runtime.GC() // Trigger cleanup of the executor it owned + runtime.GC() // One more for good measure for range valuePipe { + // The channel should get closed! } - // The channel should get closed! + leak.assertAllCanceled(t) // the cancelation error is inconsistent here } func TestGatherErrCleanup(t *testing.T) { t.Parallel() + var leak contextLeak valuePipe := func() chan error { g := GatherErrs(Unlimited(context.Background())) - g.Go(func(context.Context) error { return nil }) + g.Go(func(ctx context.Context) error { + leak.leak(ctx) + return nil + }) return g.(multiErrGroup).pipe // leak the un-awaited group }() assert.NotNil(t, valuePipe) - runtime.GC() // Trigger cleanups for leaked resources + runtime.GC() // Trigger cleanup of the gatherer + runtime.GC() // Trigger cleanup of the executor it owned + runtime.GC() // One more for good measure for range valuePipe { + // The channel should get closed! } - // The channel should get closed! + leak.assertAllCanceled(t) // the cancelation error is inconsistent here } func TestCollectWithErrsCleanup(t *testing.T) { t.Parallel() + var leak contextLeak valuePipe := func() chan withErr[int] { g := CollectWithErrs[int](Unlimited(context.Background())) - g.Go(func(context.Context) (int, error) { return 1, nil }) + g.Go(func(ctx context.Context) (int, error) { + leak.leak(ctx) + return 1, nil + }) return g.(collectingMultiErrGroup[int]).pipe // leak the un-awaited group }() assert.NotNil(t, valuePipe) - runtime.GC() // Trigger cleanups for leaked resources + runtime.GC() // Trigger cleanup of the collector + runtime.GC() // Trigger cleanup of the executor it owned + runtime.GC() // One more for good measure for range valuePipe { + // The channel should get closed! } - // The channel should get closed! + leak.assertAllCanceled(t) // the cancelation error is inconsistent here } func TestFeedWithErrsCleanup(t *testing.T) { t.Parallel() + var leak contextLeak valuePipe := func() chan withErr[int] { g := FeedWithErrs(Unlimited(context.Background()), func(context.Context, int) error { return nil }) - g.Go(func(context.Context) (int, error) { return 1, nil }) + g.Go(func(ctx context.Context) (int, error) { + leak.leak(ctx) + return 1, nil + }) return g.(feedingMultiErrGroup[int]).pipe // leak the un-awaited group }() assert.NotNil(t, valuePipe) - runtime.GC() // Trigger cleanups for leaked resources + runtime.GC() // Trigger cleanup of the collector + runtime.GC() // Trigger cleanup of the executor it owned + runtime.GC() // One more for good measure for range valuePipe { + // The channel should get closed! } - // The channel should get closed! + leak.assertAllCanceled(t) // the cancelation error is inconsistent here } func TestPanicGroup(t *testing.T) { t.Parallel() + var leak contextLeak g := Unlimited(context.Background()) var blocker sync.WaitGroup blocker.Add(1) - g.Go(func(context.Context) { + g.Go(func(ctx context.Context) { + leak.leak(ctx) blocker.Wait() panic("wow") }) @@ -137,14 +221,17 @@ func TestPanicGroup(t *testing.T) { assert.PanicsWithValue(t, "wow", func() { g.Wait() }) + leak.assertAllCanceled(t, errPanicked) } func TestPanicGroupSecondPath(t *testing.T) { t.Parallel() + var leak contextLeak g := Unlimited(context.Background()) var blocker sync.WaitGroup blocker.Add(1) - g.Go(func(context.Context) { + g.Go(func(ctx context.Context) { + leak.leak(ctx) blocker.Wait() panic("wow") }) @@ -159,16 +246,19 @@ func TestPanicGroupSecondPath(t *testing.T) { t.Fatal("this op should never run") }) }) + leak.assertAllCanceled(t, errPanicked) } func TestPanicLimitedGroup(t *testing.T) { t.Parallel() + var leak contextLeak var waitForNonPanic, unblockInnocent, block sync.WaitGroup waitForNonPanic.Add(1) unblockInnocent.Add(1) block.Add(1) g := Limited(context.Background(), 10) - g.Go(func(context.Context) { // Innocent function + g.Go(func(ctx context.Context) { // Innocent function + leak.leak(ctx) waitForNonPanic.Done() unblockInnocent.Wait() }) @@ -182,16 +272,19 @@ func TestPanicLimitedGroup(t *testing.T) { assert.PanicsWithValue(t, "lol", func() { g.Wait() }) + leak.assertAllCanceled(t, errPanicked) } func TestPanicLimitedGroupSecondPath(t *testing.T) { t.Parallel() + var leak contextLeak var waitForNonPanic, unblockInnocent, block sync.WaitGroup waitForNonPanic.Add(1) unblockInnocent.Add(1) block.Add(1) g := Limited(context.Background(), 10) - g.Go(func(context.Context) { // Innocent function + g.Go(func(ctx context.Context) { // Innocent function + leak.leak(ctx) waitForNonPanic.Done() unblockInnocent.Wait() }) @@ -208,38 +301,47 @@ func TestPanicLimitedGroupSecondPath(t *testing.T) { g.Go(func(context.Context) {}) } }) + leak.assertAllCanceled(t, errPanicked) } func TestPanicFeedFunction(t *testing.T) { t.Parallel() - g := Feed(Unlimited(context.Background()), func(context.Context, int) error { + var leak contextLeak + g := Feed(Unlimited(context.Background()), func(ctx context.Context, _ int) error { + leak.leak(ctx) panic("oh no!") }) g.Go(func(context.Context) (int, error) { return 1, nil }) assert.PanicsWithValue(t, "oh no!", func() { _ = g.Wait() }) + leak.assertAllCanceled(t) // the cancelation error is inconsistent here } func TestPanicFeedWork(t *testing.T) { t.Parallel() + var leak contextLeak g := Feed(Unlimited(context.Background()), func(context.Context, int) error { t.Fatal("should not get called") return nil }) - g.Go(func(context.Context) (int, error) { + g.Go(func(ctx context.Context) (int, error) { + leak.leak(ctx) panic("oh no!") }) assert.PanicsWithValue(t, "oh no!", func() { _ = g.Wait() }) + leak.assertAllCanceled(t, errPanicked) } func TestPanicFeedWorkSecondPath(t *testing.T) { t.Parallel() + var leak contextLeak g := Feed(Unlimited(context.Background()), func(context.Context, int) error { t.Fatal("should not get called") return nil }) - g.Go(func(context.Context) (int, error) { + g.Go(func(ctx context.Context) (int, error) { + leak.leak(ctx) panic("oh no!") }) ctx, _ := g.(feedingGroup[int]).g.getContext() @@ -247,51 +349,63 @@ func TestPanicFeedWorkSecondPath(t *testing.T) { assert.PanicsWithValue(t, "oh no!", func() { g.Go(func(context.Context) (int, error) { return 2, nil }) }) + leak.assertAllCanceled(t, errPanicked) } func TestPanicFeedFunctionNotCalled(t *testing.T) { t.Parallel() + var leak contextLeak g := Feed(Unlimited(context.Background()), func(context.Context, int) error { panic("oh no!") }) - g.Go(func(context.Context) (int, error) { + g.Go(func(ctx context.Context) (int, error) { + leak.leak(ctx) return 0, errors.New("foo") }) assert.NotPanics(t, func() { assert.Errorf(t, g.Wait(), "foo") }) + leak.assertAllCanceled(t) } func TestPanicFeedErrFunction(t *testing.T) { t.Parallel() + var leak contextLeak g := FeedWithErrs(Unlimited(context.Background()), func(context.Context, int) error { panic("oh no!") }) - g.Go(func(context.Context) (int, error) { + g.Go(func(ctx context.Context) (int, error) { + leak.leak(ctx) return 1, nil }) assert.PanicsWithValue(t, "oh no!", func() { _ = g.Wait() }) + leak.assertAllCanceled(t) // the cancelation error is inconsistent here } func TestPanicFeedErrWork(t *testing.T) { t.Parallel() + var leak contextLeak g := FeedWithErrs(Unlimited(context.Background()), func(context.Context, int) error { t.Fatal("should not get a value") return nil }) - g.Go(func(context.Context) (int, error) { + g.Go(func(ctx context.Context) (int, error) { + leak.leak(ctx) panic("oh no!") }) assert.PanicsWithValue(t, "oh no!", func() { _ = g.Wait() }) + leak.assertAllCanceled(t, errPanicked) } func TestPanicFeedErrWorkSecondPath(t *testing.T) { t.Parallel() + var leak contextLeak g := FeedWithErrs(Unlimited(context.Background()), func(context.Context, int) error { t.Fatal("should not get a value") return nil }) - g.Go(func(context.Context) (int, error) { + g.Go(func(ctx context.Context) (int, error) { + leak.leak(ctx) panic("oh no!") }) ctx, _ := g.(feedingMultiErrGroup[int]).g.getContext() @@ -299,17 +413,22 @@ func TestPanicFeedErrWorkSecondPath(t *testing.T) { assert.PanicsWithValue(t, "oh no!", func() { g.Go(func(context.Context) (int, error) { return 2, nil }) }) + leak.assertAllCanceled(t, errPanicked) } func TestPanicFeedErrFunctionNoValues(t *testing.T) { t.Parallel() + var leak contextLeak g := FeedWithErrs(Unlimited(context.Background()), func(context.Context, int) error { - panic("oh no!") + t.Fatal("should not get a value") + return nil }) - g.Go(func(context.Context) (int, error) { + g.Go(func(ctx context.Context) (int, error) { + leak.leak(ctx) return 0, errors.New("regular error") }) assert.Errorf(t, g.Wait(), "regular error") + leak.assertAllCanceled(t, errGroupDone) } func TestMisuseReuse(t *testing.T) { From f86e867616bc43e65e6368cf917c2bb69bd79a11 Mon Sep 17 00:00:00 2001 From: Kent Ross Date: Mon, 23 Sep 2024 18:16:22 -0700 Subject: [PATCH 04/19] add context cancelation to cleanups demo --- cleanupsdemo/cleanups_demo.go | 60 +++++++++++++++++++++++++++++++---- 1 file changed, 53 insertions(+), 7 deletions(-) diff --git a/cleanupsdemo/cleanups_demo.go b/cleanupsdemo/cleanups_demo.go index 2b95539..6b4e868 100644 --- a/cleanupsdemo/cleanups_demo.go +++ b/cleanupsdemo/cleanups_demo.go @@ -13,6 +13,8 @@ func main() { const cycles = 100 const batchSize = 100 + ctx := context.Background() + // This demonstrates the library's cleanup functionality, where forgotten // executors that own goroutines and have been discarded without being awaited // will clean themselves up without permanently leaking goroutines. This is @@ -22,14 +24,24 @@ func main() { // of the thunks that are registered with `runtime.SetFinalizer()` in // group.go and collect.go, which close channels and call cancel functions. - println("leaking ~", cycles*(batchSize+2), "goroutines from abandoned group executors...") + println("leaking goroutines from group executors...") + + // Dependent contexts are always canceled, too + leakDependent := func(ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + _ = cancel + go func() { + <-ctx.Done() + }() + } // Leak just a crazy number of goroutines for i := 0; i < cycles; i++ { func() { - g := parallel.Collect[int](parallel.Limited(context.Background(), batchSize)) + g := parallel.Collect[int](parallel.Limited(ctx, batchSize)) for j := 0; j < batchSize; j++ { - g.Go(func(context.Context) (int, error) { + g.Go(func(ctx context.Context) (int, error) { + leakDependent(ctx) return 1, nil }) } @@ -38,10 +50,11 @@ func main() { func() { defer func() { _ = recover() }() - g := parallel.Feed(parallel.Unlimited(context.Background()), func(context.Context, int) error { + g := parallel.Feed(parallel.Unlimited(ctx), func(context.Context, int) error { panic("feed function panics") }) - g.Go(func(context.Context) (int, error) { + g.Go(func(ctx context.Context) (int, error) { + leakDependent(ctx) return 1, nil }) // Leak the group without awaiting it @@ -49,12 +62,45 @@ func main() { func() { defer func() { _ = recover() }() - g := parallel.Collect[int](parallel.Unlimited(context.Background())) - g.Go(func(context.Context) (int, error) { + g := parallel.Collect[int](parallel.Unlimited(ctx)) + g.Go(func(ctx context.Context) (int, error) { + leakDependent(ctx) panic("op panics") }) // Leak the group without awaiting it }() + + // Start some executors that complete normally without error + { + g := parallel.Unlimited(ctx) + g.Go(func(ctx context.Context) { + leakDependent(ctx) + }) + g.Wait() + } + { + g := parallel.Collect[int](parallel.Limited(ctx, batchSize)) + g.Go(func(ctx context.Context) (int, error) { + return 1, nil + }) + _, err := g.Wait() + if err != nil { + panic(err) + } + } + { + g := parallel.Feed(parallel.Unlimited(ctx), func(context.Context, int) error { + return nil + }) + g.Go(func(ctx context.Context) (int, error) { + leakDependent(ctx) + return 1, nil + }) + err := g.Wait() + if err != nil { + panic(err) + } + } } println("monitoring and running GC...") From 87dd694c1fc214b27b35efb3b0c7cec0b14c8fba Mon Sep 17 00:00:00 2001 From: Kent Ross Date: Mon, 23 Sep 2024 18:36:24 -0700 Subject: [PATCH 05/19] context cancelation in normal group tests --- group_test.go | 35 ++++++++++++++++++++++++++++++++++- safety_test.go | 37 ------------------------------------- 2 files changed, 34 insertions(+), 38 deletions(-) diff --git a/group_test.go b/group_test.go index eb23cf0..cbaaef2 100644 --- a/group_test.go +++ b/group_test.go @@ -8,8 +8,38 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) +type contextLeak struct { + lock sync.Mutex + ctxs []context.Context +} + +func (c *contextLeak) leak(ctx context.Context) { + c.lock.Lock() + defer c.lock.Unlock() + c.ctxs = append(c.ctxs, ctx) +} + +func (c *contextLeak) assertAllCanceled(t *testing.T, expected ...error) { + t.Helper() + if len(expected) > 1 { + panic("please just provide 1 expected error for all the contexts") + } + c.lock.Lock() + defer c.lock.Unlock() + for _, ctx := range c.ctxs { + cause := context.Cause(ctx) + if cause == nil { + t.Fatal("context was not canceled") + } + if len(expected) == 1 { + require.ErrorIs(t, cause, expected[0]) + } + } +} + func TestGroup(t *testing.T) { for _, test := range []struct { name string @@ -43,14 +73,17 @@ func testGroup(t *testing.T, makeExec func(context.Context) Executor) { t.Run("sum 100", func(t *testing.T) { t.Parallel() var counter int64 + var leak contextLeak g := makeExec(context.Background()) for i := 0; i < 100; i++ { - g.Go(func(context.Context) { + g.Go(func(ctx context.Context) { + leak.leak(ctx) atomic.AddInt64(&counter, 1) }) } g.Wait() assert.Equal(t, int64(100), counter) + leak.assertAllCanceled(t, errGroupDone) }) t.Run("sum canceled", func(t *testing.T) { t.Parallel() diff --git a/safety_test.go b/safety_test.go index 0ef1a3e..747c72a 100644 --- a/safety_test.go +++ b/safety_test.go @@ -12,45 +12,8 @@ import ( "testing" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) -type contextLeak struct { - lock sync.Mutex - ctxs []context.Context -} - -func (c *contextLeak) leak(ctx context.Context) { - c.lock.Lock() - defer c.lock.Unlock() - c.ctxs = append(c.ctxs, ctx) -} - -func (c *contextLeak) assertAllCanceled(t *testing.T, expected ...error) { - t.Helper() - if len(expected) > 1 { - panic("please just provide 1 expected error for all the contexts") - } - c.lock.Lock() - defer c.lock.Unlock() - for _, ctx := range c.ctxs { - cause := context.Cause(ctx) - if cause == nil { - t.Fatal("context was not canceled") - } - if len(expected) == 1 { - require.ErrorIs(t, cause, expected[0]) - } - } -} - -// Wait for all contexts to be done -func (c *contextLeak) join() { - for _, ctx := range c.ctxs { - <-ctx.Done() - } -} - // The tests in this file can be detected as racy by the race condition checker // because we are reaching under the hood to look at the group's channel, so we // can see when the group's functions have started running. There's no good From 0b12ff83ebde2a178d31214c8b7e9bbc42984d44 Mon Sep 17 00:00:00 2001 From: Kent Ross Date: Mon, 23 Sep 2024 18:46:34 -0700 Subject: [PATCH 06/19] test context cleanup for the trivial runner group as well --- cleanupsdemo/cleanups_demo.go | 7 +++++++ safety_test.go | 26 ++++++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/cleanupsdemo/cleanups_demo.go b/cleanupsdemo/cleanups_demo.go index 6b4e868..b7dba56 100644 --- a/cleanupsdemo/cleanups_demo.go +++ b/cleanupsdemo/cleanups_demo.go @@ -78,6 +78,13 @@ func main() { }) g.Wait() } + { + g := parallel.Limited(ctx, 0) + g.Go(func(ctx context.Context) { + leakDependent(ctx) + }) + g.Wait() + } { g := parallel.Collect[int](parallel.Limited(ctx, batchSize)) g.Go(func(ctx context.Context) (int, error) { diff --git a/safety_test.go b/safety_test.go index 747c72a..237826e 100644 --- a/safety_test.go +++ b/safety_test.go @@ -53,6 +53,32 @@ func TestLimitedGroupCleanup(t *testing.T) { leak.assertAllCanceled(t, errGroupAbandoned) } +func TestTrivialGroupCleanup(t *testing.T) { + t.Parallel() + var counter int64 + var leak contextLeak + + func() { + g := Limited(context.Background(), 0) + for i := 0; i < 100; i++ { + g.Go(func(ctx context.Context) { + defer func() { + p := recover() + if p != nil { + println(p) + } + }() + atomic.AddInt64(&counter, 1) + leak.leak(ctx) + }) + } + }() + runtime.GC() // Trigger cleanups for leaked resources + assert.Equal(t, int64(100), counter) + // The context should be canceled! + leak.assertAllCanceled(t, errGroupAbandoned) +} + func TestCollectorCleanup(t *testing.T) { t.Parallel() var leak contextLeak From 8d28f4a2d9c4e4a44f83797b44ee60c3d8c53c84 Mon Sep 17 00:00:00 2001 From: Kent Ross Date: Mon, 23 Sep 2024 19:16:47 -0700 Subject: [PATCH 07/19] implement "quietWait" internal api --- collect.go | 5 +++-- collect_test.go | 21 +++++++++++++++++++++ group.go | 27 +++++++++++++++++++++------ 3 files changed, 45 insertions(+), 8 deletions(-) diff --git a/collect.go b/collect.go index 75a6c04..fe7e347 100644 --- a/collect.go +++ b/collect.go @@ -310,8 +310,9 @@ func (pg *pipeGroup[T, R]) doWait() { runtime.SetFinalizer(pg, nil) } }() - // Runs first: Wait for inputs - pg.g.Wait() + // Runs first: Wait for inputs. Wait "quietly", not canceling the + // context yet so if there is an error later we can still see it + pg.g.quietWait() }() // Runs third: Wait for outputs to be done pg.pipeWorkers.Wait() diff --git a/collect_test.go b/collect_test.go index a553603..13a9584 100644 --- a/collect_test.go +++ b/collect_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestErrGroup(t *testing.T) { @@ -149,6 +150,26 @@ func TestFeedErroring(t *testing.T) { assert.Subset(t, []int{1, 2, 3}, res) } +func TestFeedLastReceiverErrs(t *testing.T) { + t.Parallel() + // Even when the very very last item through the pipe group causes an error, + // the group's context shouldn't be canceled yet and it should still be able + // to set the error. + g := Feed(Limited(context.Background(), 0), func(ctx context.Context, val int) error { + if val == 10 { + return errors.New("boom") + } else { + return nil + } + }) + for i := 1; i <= 10; i++ { + g.Go(func(ctx context.Context) (int, error) { + return i, nil + }) + } + require.Error(t, g.Wait()) +} + func TestFeedErroringInReceiver(t *testing.T) { t.Parallel() g := Feed(Unlimited(context.Background()), func(ctx context.Context, val int) error { diff --git a/group.go b/group.go index 28dc71f..794283b 100644 --- a/group.go +++ b/group.go @@ -61,6 +61,9 @@ type Executor interface { // internal getContext() (context.Context, context.CancelCauseFunc) + // Waits without canceling the context with errGroupDone. The caller of this + // function promises that they will be responsible for canceling the context + quietWait() } // Creates a basic executor which runs all the functions given in one goroutine @@ -119,10 +122,12 @@ func (n *runner) Go(op func(context.Context)) { } func (n *runner) Wait() { - n.awaited.Store(true) - // We are ending the group's lifetime within this function call; defer the - // cancelation and unset our finalizer. + n.quietWait() n.cancel(errGroupDone) +} + +func (n *runner) quietWait() { + n.awaited.Store(true) runtime.SetFinalizer(n, nil) } @@ -200,10 +205,12 @@ func (g *group) Go(op func(context.Context)) { } func (g *group) Wait() { - g.awaited.Store(true) - // We are ending the group's lifetime within this function call; defer the - // cancelation and unset our finalizer. defer g.cancel(errGroupDone) + g.quietWait() +} + +func (g *group) quietWait() { + g.awaited.Store(true) runtime.SetFinalizer(g, nil) g.wg.Wait() g.checkPanic() @@ -283,6 +290,14 @@ func (lg *limitedGroup) Wait() { lg.g.Wait() } +func (lg *limitedGroup) quietWait() { + if !lg.awaited.Swap(true) { + close(lg.ops) + runtime.SetFinalizer(lg, nil) // Don't try to close this chan again :) + } + lg.g.quietWait() +} + func (lg *limitedGroup) getContext() (context.Context, context.CancelCauseFunc) { return lg.g.getContext() } From 728060357424c52c558792f6fddfbb49c53fca8e Mon Sep 17 00:00:00 2001 From: Kent Ross Date: Mon, 23 Sep 2024 19:31:04 -0700 Subject: [PATCH 08/19] refine pipegroup waiting to take clearer ownership of context cancelation --- collect.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/collect.go b/collect.go index fe7e347..60e3851 100644 --- a/collect.go +++ b/collect.go @@ -300,6 +300,14 @@ func (pg *pipeGroup[T, R]) doWait() { // even in case of a panic by deferring it, and that always only happens at // the end of the function... so, we just put an inner function here to make // it happen "early." + + // Runs last: We must make completely certain that we cancel the context + // owned by the pipeGroup. This context is shared between the executor and + // the pipeWorkers; we take charge of making sure this cancelation happens + // as soon as possible here, and we want it to happen at the very end after + // everything else in case something else wanted to set the cancel cause of + // the context to an actual error instead of our "no error" sentinel value. + defer pg.pipeWorkers.cancel(errGroupDone) func() { // Runs second: Close the results chan and unblock the pipe worker. // Because we're deferring this, it will happen even if there is a panic @@ -315,7 +323,7 @@ func (pg *pipeGroup[T, R]) doWait() { pg.g.quietWait() }() // Runs third: Wait for outputs to be done - pg.pipeWorkers.Wait() + pg.pipeWorkers.quietWait() } var _ CollectingExecutor[int] = collectingGroup[int]{} From ac6112946aec8d51dda3babd59b4b3d0190dcc10 Mon Sep 17 00:00:00 2001 From: Kent Ross Date: Tue, 24 Sep 2024 13:35:03 -0700 Subject: [PATCH 09/19] missed a dependent goroutine leak in cleanups_demo --- cleanupsdemo/cleanups_demo.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cleanupsdemo/cleanups_demo.go b/cleanupsdemo/cleanups_demo.go index b7dba56..fcc0009 100644 --- a/cleanupsdemo/cleanups_demo.go +++ b/cleanupsdemo/cleanups_demo.go @@ -88,6 +88,7 @@ func main() { { g := parallel.Collect[int](parallel.Limited(ctx, batchSize)) g.Go(func(ctx context.Context) (int, error) { + leakDependent(ctx) return 1, nil }) _, err := g.Wait() From cfd280630dc19af0e1f9ddf9b0353932a80a09ef Mon Sep 17 00:00:00 2001 From: Kent Ross Date: Tue, 24 Sep 2024 13:41:24 -0700 Subject: [PATCH 10/19] test nits --- safety_test.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/safety_test.go b/safety_test.go index 4c2aa4d..ae2b437 100644 --- a/safety_test.go +++ b/safety_test.go @@ -325,7 +325,7 @@ func TestPanicFeedWorkSecondPath(t *testing.T) { t.Parallel() var leak contextLeak g := Feed(Unlimited(context.Background()), func(context.Context, int) error { - t.Fatal("should not get called") + t.Fatal("should not get a value") return nil }) g.Go(func(ctx context.Context) (int, error) { @@ -344,16 +344,18 @@ func TestPanicFeedFunctionNotCalled(t *testing.T) { t.Parallel() var leak contextLeak g := Feed(Unlimited(context.Background()), func(context.Context, int) error { - panic("oh no!") + t.Fatal("should not get a value") + return nil }) + fooError := errors.New("foo") g.Go(func(ctx context.Context) (int, error) { leak.leak(ctx) - return 0, errors.New("foo") + return 0, fooError }) assert.NotPanics(t, func() { - assert.Errorf(t, g.Wait(), "foo") + assert.ErrorIs(t, g.Wait(), fooError) }) - leak.assertAllCanceled(t) + leak.assertAllCanceled(t, fooError) } func TestPanicFeedErrFunction(t *testing.T) { From 401dbd721f2f964a38b2a80bb721d4960d27e741 Mon Sep 17 00:00:00 2001 From: Kent Ross Date: Tue, 24 Sep 2024 13:47:12 -0700 Subject: [PATCH 11/19] with quietWait(), we can guarantee the end state of panicked executor contexts --- safety_test.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/safety_test.go b/safety_test.go index ae2b437..aa87683 100644 --- a/safety_test.go +++ b/safety_test.go @@ -97,7 +97,8 @@ func TestCollectorCleanup(t *testing.T) { for range valuePipe { // The channel should get closed! } - leak.assertAllCanceled(t) // the cancelation error is inconsistent here + leak.assertAllCanceled(t) // the cancelation error is inconsistent here, + // depending on whether the pipe group or the executor was reaped first } func TestFeederCleanup(t *testing.T) { @@ -119,7 +120,8 @@ func TestFeederCleanup(t *testing.T) { for range valuePipe { // The channel should get closed! } - leak.assertAllCanceled(t) // the cancelation error is inconsistent here + leak.assertAllCanceled(t) // the cancelation error is inconsistent here, + // depending on whether the pipe group or the executor was reaped first } func TestGatherErrCleanup(t *testing.T) { @@ -141,7 +143,8 @@ func TestGatherErrCleanup(t *testing.T) { for range valuePipe { // The channel should get closed! } - leak.assertAllCanceled(t) // the cancelation error is inconsistent here + leak.assertAllCanceled(t) // the cancelation error is inconsistent here, + // depending on whether the pipe group or the executor was reaped first } func TestCollectWithErrsCleanup(t *testing.T) { @@ -163,7 +166,8 @@ func TestCollectWithErrsCleanup(t *testing.T) { for range valuePipe { // The channel should get closed! } - leak.assertAllCanceled(t) // the cancelation error is inconsistent here + leak.assertAllCanceled(t) // the cancelation error is inconsistent here, + // depending on whether the pipe group or the executor was reaped first } func TestFeedWithErrsCleanup(t *testing.T) { @@ -186,7 +190,8 @@ func TestFeedWithErrsCleanup(t *testing.T) { for range valuePipe { // The channel should get closed! } - leak.assertAllCanceled(t) // the cancelation error is inconsistent here + leak.assertAllCanceled(t) // the cancelation error is inconsistent here, + // depending on whether the pipe group or the executor was reaped first } func TestPanicGroup(t *testing.T) { @@ -303,7 +308,7 @@ func TestPanicFeedFunction(t *testing.T) { return 1, nil }) assertPanicsWithValue(t, "oh no!", func() { _ = g.Wait() }) - leak.assertAllCanceled(t) // the cancelation error is inconsistent here + leak.assertAllCanceled(t, errPanicked) } func TestPanicFeedWork(t *testing.T) { @@ -369,7 +374,7 @@ func TestPanicFeedErrFunction(t *testing.T) { return 1, nil }) assertPanicsWithValue(t, "oh no!", func() { _ = g.Wait() }) - leak.assertAllCanceled(t) // the cancelation error is inconsistent here + leak.assertAllCanceled(t, errPanicked) } func TestPanicFeedErrWork(t *testing.T) { From 04979428050bd3192dce64eeff43a21f69cdf78c Mon Sep 17 00:00:00 2001 From: Kent Ross Date: Tue, 24 Sep 2024 14:11:05 -0700 Subject: [PATCH 12/19] fix rare nil deref flake in TestLimitedGroupCleanup --- safety_test.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/safety_test.go b/safety_test.go index aa87683..e84daf6 100644 --- a/safety_test.go +++ b/safety_test.go @@ -44,8 +44,15 @@ func TestLimitedGroupCleanup(t *testing.T) { }() assert.NotNil(t, opsQueue) runtime.GC() // Trigger cleanups for leaked resources + + // In the event that we need to drain the ops queue below, we need to have + // a context to leak that satisfies our test predicate. + fakeCtx, cancel := context.WithCancelCause(context.Background()) + // We can cancel the context immediately since we don't really use it + cancel(errGroupAbandoned) + for op := range opsQueue { - op(nil) // have mercy and run those ops anyway, just so we get a full count + op(fakeCtx) // have mercy and run those ops anyway, just so we get a full count } // The channel should get closed! assert.Equal(t, int64(100), atomic.LoadInt64(&counter)) From 106728608b66dd8ffd7bd0a21a18fe07d78bed72 Mon Sep 17 00:00:00 2001 From: Kent Ross Date: Tue, 24 Sep 2024 16:32:14 -0700 Subject: [PATCH 13/19] comment unusual exact error comparison --- collect.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/collect.go b/collect.go index 60e3851..66ae471 100644 --- a/collect.go +++ b/collect.go @@ -227,6 +227,10 @@ func FeedWithErrs[T any](executor Executor, receiver func(context.Context, T) er // was errGroupDone, that doesn't count as an error and nil is returned instead. func groupError(ctx context.Context) error { err := context.Cause(ctx) + // We are explicitly using == here to check for the exact value of our + // sentinel error, not using errors.Is(), because we don't actually want to + // find it if it's in wrapped errors. We *only* want to know whether the + // cancelation error is *exactly* errGroupDone. if err == errGroupDone { return nil } From 8c540a23705e319e24f1f64b02098802f439111d Mon Sep 17 00:00:00 2001 From: Kent Ross Date: Tue, 24 Sep 2024 16:53:13 -0700 Subject: [PATCH 14/19] more nits and cleanups for limited group cleanup it's still flaking rarely on ci, i think because the GC wasn't always hitting it now that it has another finalizer. extra cycles ought to take care of that. also removed some extra debugging that wasn't doing anything --- safety_test.go | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/safety_test.go b/safety_test.go index e84daf6..f8525e6 100644 --- a/safety_test.go +++ b/safety_test.go @@ -29,12 +29,6 @@ func TestLimitedGroupCleanup(t *testing.T) { g := Limited(context.Background(), 10) for i := 0; i < 100; i++ { g.Go(func(ctx context.Context) { - defer func() { - p := recover() - if p != nil { - println(p) - } - }() atomic.AddInt64(&counter, 1) leak.leak(ctx) }) @@ -44,6 +38,8 @@ func TestLimitedGroupCleanup(t *testing.T) { }() assert.NotNil(t, opsQueue) runtime.GC() // Trigger cleanups for leaked resources + runtime.GC() // Trigger cleanups for leaked resources + runtime.GC() // Trigger cleanups for leaked resources // In the event that we need to drain the ops queue below, we need to have // a context to leak that satisfies our test predicate. @@ -68,13 +64,7 @@ func TestTrivialGroupCleanup(t *testing.T) { g := Limited(context.Background(), 0) for i := 0; i < 100; i++ { g.Go(func(ctx context.Context) { - defer func() { - p := recover() - if p != nil { - println(p) - } - }() - atomic.AddInt64(&counter, 1) + counter++ leak.leak(ctx) }) } From 9bc23eecafc9a224784951878a2b1d601d891a7a Mon Sep 17 00:00:00 2001 From: Kent Ross Date: Tue, 24 Sep 2024 16:58:28 -0700 Subject: [PATCH 15/19] little more GC for the trivial group --- safety_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/safety_test.go b/safety_test.go index f8525e6..70d066a 100644 --- a/safety_test.go +++ b/safety_test.go @@ -70,6 +70,8 @@ func TestTrivialGroupCleanup(t *testing.T) { } }() runtime.GC() // Trigger cleanups for leaked resources + runtime.GC() // Trigger cleanups for leaked resources + runtime.GC() // Trigger cleanups for leaked resources assert.Equal(t, int64(100), counter) // The context should be canceled! leak.assertAllCanceled(t, errGroupAbandoned) From 31f41409b72d0c4034b2375d1512f1df6c4b57f2 Mon Sep 17 00:00:00 2001 From: Kent Ross Date: Tue, 24 Sep 2024 17:13:31 -0700 Subject: [PATCH 16/19] one more --- safety_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/safety_test.go b/safety_test.go index 70d066a..7e4ea37 100644 --- a/safety_test.go +++ b/safety_test.go @@ -525,6 +525,8 @@ func TestForgottenPipeLegiblePanic(t *testing.T) { }() assert.NotNil(t, valuePipe) runtime.GC() // Trigger cleanups for leaked resources + runtime.GC() // Trigger cleanups for leaked resources + runtime.GC() // Trigger cleanups for leaked resources for range valuePipe { } // The collector's pipe is now closed. Unblock the task we submitted to the From 911dee93feb79f5e5792c76719f8cf2def4afc08 Mon Sep 17 00:00:00 2001 From: Kent Ross Date: Wed, 25 Sep 2024 12:21:41 -0700 Subject: [PATCH 17/19] remove superfluous additional context cancel --- group.go | 1 - 1 file changed, 1 deletion(-) diff --git a/group.go b/group.go index 794283b..8d29491 100644 --- a/group.go +++ b/group.go @@ -97,7 +97,6 @@ func Limited(ctx context.Context, maxGoroutines int) Executor { } runtime.SetFinalizer(making, func(doomed *limitedGroup) { close(doomed.ops) - doomed.g.cancel(errGroupAbandoned) }) return making } From 750e386164acd71a2dba9b137011fba7a1273205 Mon Sep 17 00:00:00 2001 From: Kent Ross Date: Wed, 25 Sep 2024 12:23:02 -0700 Subject: [PATCH 18/19] improve comments around context lifecycle --- group.go | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/group.go b/group.go index 8d29491..453e0d5 100644 --- a/group.go +++ b/group.go @@ -17,7 +17,9 @@ const bufferSize = 8 const misuseMessage = "parallel executor misuse: don't reuse executors" var ( - errPanicked = errors.New("panicked") + errPanicked = errors.New("panicked") + // errGroupDone is a sentinel error value used to cancel an execution + // context when it has completed without error. errGroupDone = errors.New("executor done") errGroupAbandoned = errors.New("executor abandoned") @@ -83,8 +85,12 @@ func Unlimited(ctx context.Context) Executor { // should still be cleaned up. func Limited(ctx context.Context, maxGoroutines int) Executor { if maxGoroutines < 1 { + // When maxGoroutines is non-positive, we return the trivial executor + // type directly. gctx, cancel := context.WithCancelCause(ctx) g := &runner{ctx: gctx, cancel: cancel} + // This executor still needs to make certain that its context always + // gets canceled! runtime.SetFinalizer(g, func(doomed *runner) { doomed.cancel(errGroupAbandoned) }) @@ -101,10 +107,25 @@ func Limited(ctx context.Context, maxGoroutines int) Executor { return making } -// Base executor with an interface that runs everything serially. +// Base executor with an interface that runs everything serially. This can be +// returned directly from Limited in a special case, and otherwise it is just +// composed as inner struct fields for the base concurrent group struct. +// +// The lifecycle of the context is important: When the executor is set up we +// create a cancelable context, and we need to guarantee that it is eventually +// canceled or it can stay resident indefinitely in the known children of a +// parent context, effectively leaking memory. To do this, we guarantee that the +// context is canceled in one of a couple ways: +// 1. if the executor is abandoned without awaiting, a runtime finalizer that +// is registered immediately after we create the executor will cancel it +// 2. if the executor is awaited and completes normally, after everything else +// has completed the context will be canceled with the errGroupDone sentinel +// 3. if there is a panic or another kind of error that causes the executor to +// terminate early (such as with ErrGroup), the context is canceled with +// error normally in this way. type runner struct { - ctx context.Context // Closed when we panic or get garbage collected - cancel context.CancelCauseFunc // Only close the dying channel one time + ctx context.Context // Execution context + cancel context.CancelCauseFunc // Cancel for the ctx; must always be called awaited atomic.Bool // Set when Wait() is called } From 80752bbb24197e289f8df401bec5cff019745c21 Mon Sep 17 00:00:00 2001 From: Kent Ross Date: Wed, 25 Sep 2024 15:19:21 -0700 Subject: [PATCH 19/19] comment leakDependent --- cleanupsdemo/cleanups_demo.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/cleanupsdemo/cleanups_demo.go b/cleanupsdemo/cleanups_demo.go index fcc0009..e3a01c0 100644 --- a/cleanupsdemo/cleanups_demo.go +++ b/cleanupsdemo/cleanups_demo.go @@ -26,10 +26,20 @@ func main() { println("leaking goroutines from group executors...") - // Dependent contexts are always canceled, too + // Dependent contexts are always canceled, too. + // + // This binary generally tests that executors and collectors always clean up + // after themselves, that any extra goroutines and channels and contexts + // they open will be shut down. The easiest thing we can measure is running + // goroutines, so to measure things that aren't goroutines (like unclosed + // contexts) we can just start new goroutines that wait for contexts we + // expect to be canceled. leakDependent := func(ctx context.Context) { + // All canceleable child contexts are also canceled ctx, cancel := context.WithCancel(ctx) _ = cancel + // Leak a goroutine whose halting depends on the given context being + // canceled. go func() { <-ctx.Done() }()