diff --git a/group.go b/group.go index 0cf295d..b566421 100644 --- a/group.go +++ b/group.go @@ -4,6 +4,7 @@ import ( "context" "errors" "runtime" + "runtime/debug" "sync" "sync/atomic" ) @@ -17,6 +18,18 @@ const misuseMessage = "parallel executor misuse: don't reuse executors" var errPanicked = errors.New("panicked") +// WorkerPanic represents a panic value propagated from a task within a parallel +// executor, and is the main type of panic that you might expect to receive. +type WorkerPanic struct { + // Panic contains the originally panic()ed value. + Panic any + // Stacktraces contains the stacktraces of the panics. The stack trace of + // the line that threw the original Panic value appears first, and any other + // stack traces from other parallel groups that received this panic and re- + // threw it appear in order afterwards. + Stacktraces []string +} + // NOTE: If you want to really get crazy with it, it IS permissible and safe to // call Go(...) from multiple threads without additional synchronization, on // every kind of executor. HOWEVER: the caller always assumes full @@ -110,7 +123,7 @@ func makeGroup(ctx context.Context, cancel context.CancelCauseFunc) *group { type group struct { runner wg sync.WaitGroup - panicked atomic.Pointer[any] // Stores panic values + panicked atomic.Pointer[WorkerPanic] // Stores panic values } func (g *group) Go(op func(context.Context)) { @@ -131,12 +144,34 @@ func (g *group) Go(op func(context.Context)) { // When the function call has exited without returning, hoist // the recover()ed panic value and a stack trace so it can be // re-panicked. - // - // Currently we do not store stack traces from the panicked - // goroutine or distinguish between panics and runtime.Goexit(). p := recover() - g.panicked.CompareAndSwap(nil, &p) - g.cancel(errPanicked) + if p == nil { + // This is a runtime.Goexit(), such as from a process + // termination or a test failure; let that propagate instead + g.cancel(context.Canceled) + } else { + // If we are propagating a panic that is already a + // WorkerPanic (for example, if we have panics propagating + // through multiple parallel groups), just add our + // stacktrace onto the end of the slice; otherwise make a + // new WorkerPanic value. + var wp WorkerPanic + switch tp := p.(type) { + case WorkerPanic: + wp = WorkerPanic{ + Panic: tp.Panic, + Stacktraces: append(tp.Stacktraces, string(debug.Stack())), + } + default: + wp = WorkerPanic{ + Panic: p, + Stacktraces: []string{string(debug.Stack())}, + } + } + + g.panicked.CompareAndSwap(nil, &wp) + g.cancel(errPanicked) + } } g.wg.Done() }() diff --git a/group_test.go b/group_test.go index eb23cf0..9a3c972 100644 --- a/group_test.go +++ b/group_test.go @@ -156,7 +156,7 @@ func testLimitedGroupMaxConcurrency(t *testing.T, name string, g Executor, limit if shouldSucceed { assert.NotPanics(t, jobInserter.Wait) } else { - assert.PanicsWithValue(t, "poison pill", jobInserter.Wait) + assertPanicsWithValue(t, "poison pill", jobInserter.Wait) } }) } @@ -213,3 +213,21 @@ func testConcurrentGroupWaitReallyWaits(t *testing.T, name string, g Executor) { testingGroup.Wait() }) } + +func TestCanGoexit(t *testing.T) { + g := Unlimited(context.Background()) + g.Go(func(context.Context) { + // Ideally we would test t.Fatal() here to show that parallel now plays + // nicely with the testing lib, but there doesn't seem to be any good + // way to xfail a golang test. As it happens t.Fatal() just sets a fail + // flag and then calls Goexit() anyway; if we treat nil recover() values + // as Goexit() (guaranteed since 1.21 with the advent of PanicNilError) + // we can handle this very simply, without needing a "double defer + // sandwich". + // + // Either way, we expect Goexit() to work normally in tests now and not + // fail or re-panic. + runtime.Goexit() + }) + g.Wait() +} diff --git a/safety_test.go b/safety_test.go index 487ed96..8bfc36c 100644 --- a/safety_test.go +++ b/safety_test.go @@ -14,6 +14,20 @@ import ( "github.com/stretchr/testify/assert" ) +func assertPanicsWithValue(t *testing.T, expectedValue any, f func()) { + t.Helper() + + defer func() { + p := recover() + if p == nil { + t.Fatal("didn't panic but should have") + } + assert.Equal(t, expectedValue, p.(WorkerPanic).Panic) + }() + + f() +} + // The tests in this file can be detected as racy by the race condition checker // because we are reaching under the hood to look at the group's channel, so we // can see when the group's functions have started running. There's no good @@ -134,7 +148,7 @@ func TestPanicGroup(t *testing.T) { // Wait for the group to "die" when the panic hits ctx, _ := g.getContext() <-ctx.Done() - assert.PanicsWithValue(t, "wow", func() { + assertPanicsWithValue(t, "wow", func() { g.Wait() }) } @@ -154,7 +168,7 @@ func TestPanicGroupSecondPath(t *testing.T) { // Wait for the group to "die" when the panic hits ctx, _ := g.getContext() <-ctx.Done() - assert.PanicsWithValue(t, "wow", func() { + assertPanicsWithValue(t, "wow", func() { g.Go(func(context.Context) { t.Fatal("this op should never run") }) @@ -179,7 +193,7 @@ func TestPanicLimitedGroup(t *testing.T) { }) waitForNonPanic.Wait() block.Done() - assert.PanicsWithValue(t, "lol", func() { + assertPanicsWithValue(t, "lol", func() { g.Wait() }) } @@ -202,7 +216,7 @@ func TestPanicLimitedGroupSecondPath(t *testing.T) { }) waitForNonPanic.Wait() block.Done() - assert.PanicsWithValue(t, "lol", func() { + assertPanicsWithValue(t, "lol", func() { // Eventually :) for { g.Go(func(context.Context) {}) @@ -218,7 +232,7 @@ func TestPanicFeedFunction(t *testing.T) { g.Go(func(context.Context) (int, error) { return 1, nil }) - assert.PanicsWithValue(t, "oh no!", func() { _ = g.Wait() }) + assertPanicsWithValue(t, "oh no!", func() { _ = g.Wait() }) } func TestPanicFeedWork(t *testing.T) { @@ -230,7 +244,7 @@ func TestPanicFeedWork(t *testing.T) { g.Go(func(context.Context) (int, error) { panic("oh no!") }) - assert.PanicsWithValue(t, "oh no!", func() { _ = g.Wait() }) + assertPanicsWithValue(t, "oh no!", func() { _ = g.Wait() }) } func TestPanicFeedWorkSecondPath(t *testing.T) { @@ -244,7 +258,7 @@ func TestPanicFeedWorkSecondPath(t *testing.T) { }) ctx, _ := g.(feedingGroup[int]).g.getContext() <-ctx.Done() - assert.PanicsWithValue(t, "oh no!", func() { + assertPanicsWithValue(t, "oh no!", func() { g.Go(func(context.Context) (int, error) { return 2, nil }) }) } @@ -270,7 +284,7 @@ func TestPanicFeedErrFunction(t *testing.T) { g.Go(func(context.Context) (int, error) { return 1, nil }) - assert.PanicsWithValue(t, "oh no!", func() { _ = g.Wait() }) + assertPanicsWithValue(t, "oh no!", func() { _ = g.Wait() }) } func TestPanicFeedErrWork(t *testing.T) { @@ -282,7 +296,7 @@ func TestPanicFeedErrWork(t *testing.T) { g.Go(func(context.Context) (int, error) { panic("oh no!") }) - assert.PanicsWithValue(t, "oh no!", func() { _ = g.Wait() }) + assertPanicsWithValue(t, "oh no!", func() { _ = g.Wait() }) } func TestPanicFeedErrWorkSecondPath(t *testing.T) { @@ -296,7 +310,7 @@ func TestPanicFeedErrWorkSecondPath(t *testing.T) { }) ctx, _ := g.(feedingMultiErrGroup[int]).g.getContext() <-ctx.Done() - assert.PanicsWithValue(t, "oh no!", func() { + assertPanicsWithValue(t, "oh no!", func() { g.Go(func(context.Context) (int, error) { return 2, nil }) }) } @@ -380,10 +394,10 @@ func TestGroupsPanicAgain(t *testing.T) { innerGroup.Go(func(context.Context) { panic("at the disco") }) innerGroup.Wait() }) - assert.PanicsWithValue(t, "at the disco", outerGroup.Wait) - assert.PanicsWithValue(t, "at the disco", innerGroup.Wait) - assert.PanicsWithValue(t, "at the disco", outerGroup.Wait) - assert.PanicsWithValue(t, "at the disco", innerGroup.Wait) + assertPanicsWithValue(t, "at the disco", outerGroup.Wait) + assertPanicsWithValue(t, "at the disco", innerGroup.Wait) + assertPanicsWithValue(t, "at the disco", outerGroup.Wait) + assertPanicsWithValue(t, "at the disco", innerGroup.Wait) }) } } @@ -392,8 +406,8 @@ func TestPipeGroupPanicsAgain(t *testing.T) { t.Parallel() g := Feed(Unlimited(context.Background()), func(context.Context, int) error { return nil }) g.Go(func(context.Context) (int, error) { panic("at the disco") }) - assert.PanicsWithValue(t, "at the disco", func() { _ = g.Wait() }) - assert.PanicsWithValue(t, "at the disco", func() { _ = g.Wait() }) + assertPanicsWithValue(t, "at the disco", func() { _ = g.Wait() }) + assertPanicsWithValue(t, "at the disco", func() { _ = g.Wait() }) } func TestForgottenPipeLegiblePanic(t *testing.T) { @@ -419,7 +433,7 @@ func TestForgottenPipeLegiblePanic(t *testing.T) { // happens the panic will be stored in the executor, so we re-panic that // specific error with a more diagnostic message. blocker.Done() - assert.PanicsWithValue(t, "parallel executor pipe error: a "+ + assertPanicsWithValue(t, "parallel executor pipe error: a "+ "collector using this same executor was probably not awaited", exec.Wait) } @@ -447,5 +461,5 @@ func TestPanicNil(t *testing.T) { // they aren't detectable without some trickery (prior to go1.21) panic(nil) }) - assert.PanicsWithValue(t, nil, g.Wait) + assertPanicsWithValue(t, nil, g.Wait) }