diff --git a/.github/workflows/gotest.yml b/.github/workflows/gotest.yml index 0c336e3..aedf60f 100644 --- a/.github/workflows/gotest.yml +++ b/.github/workflows/gotest.yml @@ -14,7 +14,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go-version: [ '1.20', '1.21.x', '1.22.x' ] + go-version: [ '1.21.x', '1.22.x', '1.23.x' ] steps: - uses: actions/checkout@v4 diff --git a/go.mod b/go.mod index c66b43f..05ecaea 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/wandb/parallel -go 1.21 +go 1.23.1 require github.com/stretchr/testify v1.9.0 diff --git a/group.go b/group.go index 0cf295d..b566421 100644 --- a/group.go +++ b/group.go @@ -4,6 +4,7 @@ import ( "context" "errors" "runtime" + "runtime/debug" "sync" "sync/atomic" ) @@ -17,6 +18,18 @@ const misuseMessage = "parallel executor misuse: don't reuse executors" var errPanicked = errors.New("panicked") +// WorkerPanic represents a panic value propagated from a task within a parallel +// executor, and is the main type of panic that you might expect to receive. +type WorkerPanic struct { + // Panic contains the originally panic()ed value. + Panic any + // Stacktraces contains the stacktraces of the panics. The stack trace of + // the line that threw the original Panic value appears first, and any other + // stack traces from other parallel groups that received this panic and re- + // threw it appear in order afterwards. + Stacktraces []string +} + // NOTE: If you want to really get crazy with it, it IS permissible and safe to // call Go(...) from multiple threads without additional synchronization, on // every kind of executor. HOWEVER: the caller always assumes full @@ -110,7 +123,7 @@ func makeGroup(ctx context.Context, cancel context.CancelCauseFunc) *group { type group struct { runner wg sync.WaitGroup - panicked atomic.Pointer[any] // Stores panic values + panicked atomic.Pointer[WorkerPanic] // Stores panic values } func (g *group) Go(op func(context.Context)) { @@ -131,12 +144,34 @@ func (g *group) Go(op func(context.Context)) { // When the function call has exited without returning, hoist // the recover()ed panic value and a stack trace so it can be // re-panicked. - // - // Currently we do not store stack traces from the panicked - // goroutine or distinguish between panics and runtime.Goexit(). p := recover() - g.panicked.CompareAndSwap(nil, &p) - g.cancel(errPanicked) + if p == nil { + // This is a runtime.Goexit(), such as from a process + // termination or a test failure; let that propagate instead + g.cancel(context.Canceled) + } else { + // If we are propagating a panic that is already a + // WorkerPanic (for example, if we have panics propagating + // through multiple parallel groups), just add our + // stacktrace onto the end of the slice; otherwise make a + // new WorkerPanic value. + var wp WorkerPanic + switch tp := p.(type) { + case WorkerPanic: + wp = WorkerPanic{ + Panic: tp.Panic, + Stacktraces: append(tp.Stacktraces, string(debug.Stack())), + } + default: + wp = WorkerPanic{ + Panic: p, + Stacktraces: []string{string(debug.Stack())}, + } + } + + g.panicked.CompareAndSwap(nil, &wp) + g.cancel(errPanicked) + } } g.wg.Done() }() diff --git a/group_test.go b/group_test.go index eb23cf0..3dbdf5e 100644 --- a/group_test.go +++ b/group_test.go @@ -10,6 +10,20 @@ import ( "github.com/stretchr/testify/assert" ) +func assertPanicsWithValue(t *testing.T, expectedValue any, f func()) { + t.Helper() + + defer func() { + p := recover() + if p == nil { + t.Fatal("didn't panic but should have") + } + assert.Equal(t, expectedValue, p.(WorkerPanic).Panic) + }() + + f() +} + func TestGroup(t *testing.T) { for _, test := range []struct { name string @@ -156,7 +170,7 @@ func testLimitedGroupMaxConcurrency(t *testing.T, name string, g Executor, limit if shouldSucceed { assert.NotPanics(t, jobInserter.Wait) } else { - assert.PanicsWithValue(t, "poison pill", jobInserter.Wait) + assertPanicsWithValue(t, "poison pill", jobInserter.Wait) } }) } @@ -213,3 +227,21 @@ func testConcurrentGroupWaitReallyWaits(t *testing.T, name string, g Executor) { testingGroup.Wait() }) } + +func TestCanGoexit(t *testing.T) { + g := Unlimited(context.Background()) + g.Go(func(context.Context) { + // Ideally we would test t.Fatal() here to show that parallel now plays + // nicely with the testing lib, but there doesn't seem to be any good + // way to xfail a golang test. As it happens t.Fatal() just sets a fail + // flag and then calls Goexit() anyway; if we treat nil recover() values + // as Goexit() (guaranteed since 1.21 with the advent of PanicNilError) + // we can handle this very simply, without needing a "double defer + // sandwich". + // + // Either way, we expect Goexit() to work normally in tests now and not + // fail or re-panic. + runtime.Goexit() + }) + g.Wait() +} diff --git a/safety_test.go b/safety_test.go index 487ed96..bc4c61f 100644 --- a/safety_test.go +++ b/safety_test.go @@ -5,7 +5,6 @@ package parallel import ( "context" "errors" - "reflect" "runtime" "sync" "sync/atomic" @@ -40,7 +39,7 @@ func TestLimitedGroupCleanup(t *testing.T) { op(nil) // have mercy and run those ops anyway, just so we get a full count } // The channel should get closed! - assert.Equal(t, int64(100), counter) + assert.Equal(t, int64(100), atomic.LoadInt64(&counter)) } func TestCollectorCleanup(t *testing.T) { @@ -134,7 +133,7 @@ func TestPanicGroup(t *testing.T) { // Wait for the group to "die" when the panic hits ctx, _ := g.getContext() <-ctx.Done() - assert.PanicsWithValue(t, "wow", func() { + assertPanicsWithValue(t, "wow", func() { g.Wait() }) } @@ -154,7 +153,7 @@ func TestPanicGroupSecondPath(t *testing.T) { // Wait for the group to "die" when the panic hits ctx, _ := g.getContext() <-ctx.Done() - assert.PanicsWithValue(t, "wow", func() { + assertPanicsWithValue(t, "wow", func() { g.Go(func(context.Context) { t.Fatal("this op should never run") }) @@ -179,7 +178,7 @@ func TestPanicLimitedGroup(t *testing.T) { }) waitForNonPanic.Wait() block.Done() - assert.PanicsWithValue(t, "lol", func() { + assertPanicsWithValue(t, "lol", func() { g.Wait() }) } @@ -202,7 +201,7 @@ func TestPanicLimitedGroupSecondPath(t *testing.T) { }) waitForNonPanic.Wait() block.Done() - assert.PanicsWithValue(t, "lol", func() { + assertPanicsWithValue(t, "lol", func() { // Eventually :) for { g.Go(func(context.Context) {}) @@ -218,7 +217,7 @@ func TestPanicFeedFunction(t *testing.T) { g.Go(func(context.Context) (int, error) { return 1, nil }) - assert.PanicsWithValue(t, "oh no!", func() { _ = g.Wait() }) + assertPanicsWithValue(t, "oh no!", func() { _ = g.Wait() }) } func TestPanicFeedWork(t *testing.T) { @@ -230,7 +229,7 @@ func TestPanicFeedWork(t *testing.T) { g.Go(func(context.Context) (int, error) { panic("oh no!") }) - assert.PanicsWithValue(t, "oh no!", func() { _ = g.Wait() }) + assertPanicsWithValue(t, "oh no!", func() { _ = g.Wait() }) } func TestPanicFeedWorkSecondPath(t *testing.T) { @@ -244,7 +243,7 @@ func TestPanicFeedWorkSecondPath(t *testing.T) { }) ctx, _ := g.(feedingGroup[int]).g.getContext() <-ctx.Done() - assert.PanicsWithValue(t, "oh no!", func() { + assertPanicsWithValue(t, "oh no!", func() { g.Go(func(context.Context) (int, error) { return 2, nil }) }) } @@ -270,7 +269,7 @@ func TestPanicFeedErrFunction(t *testing.T) { g.Go(func(context.Context) (int, error) { return 1, nil }) - assert.PanicsWithValue(t, "oh no!", func() { _ = g.Wait() }) + assertPanicsWithValue(t, "oh no!", func() { _ = g.Wait() }) } func TestPanicFeedErrWork(t *testing.T) { @@ -282,7 +281,7 @@ func TestPanicFeedErrWork(t *testing.T) { g.Go(func(context.Context) (int, error) { panic("oh no!") }) - assert.PanicsWithValue(t, "oh no!", func() { _ = g.Wait() }) + assertPanicsWithValue(t, "oh no!", func() { _ = g.Wait() }) } func TestPanicFeedErrWorkSecondPath(t *testing.T) { @@ -296,7 +295,7 @@ func TestPanicFeedErrWorkSecondPath(t *testing.T) { }) ctx, _ := g.(feedingMultiErrGroup[int]).g.getContext() <-ctx.Done() - assert.PanicsWithValue(t, "oh no!", func() { + assertPanicsWithValue(t, "oh no!", func() { g.Go(func(context.Context) (int, error) { return 2, nil }) }) } @@ -380,10 +379,10 @@ func TestGroupsPanicAgain(t *testing.T) { innerGroup.Go(func(context.Context) { panic("at the disco") }) innerGroup.Wait() }) - assert.PanicsWithValue(t, "at the disco", outerGroup.Wait) - assert.PanicsWithValue(t, "at the disco", innerGroup.Wait) - assert.PanicsWithValue(t, "at the disco", outerGroup.Wait) - assert.PanicsWithValue(t, "at the disco", innerGroup.Wait) + assertPanicsWithValue(t, "at the disco", outerGroup.Wait) + assertPanicsWithValue(t, "at the disco", innerGroup.Wait) + assertPanicsWithValue(t, "at the disco", outerGroup.Wait) + assertPanicsWithValue(t, "at the disco", innerGroup.Wait) }) } } @@ -392,8 +391,8 @@ func TestPipeGroupPanicsAgain(t *testing.T) { t.Parallel() g := Feed(Unlimited(context.Background()), func(context.Context, int) error { return nil }) g.Go(func(context.Context) (int, error) { panic("at the disco") }) - assert.PanicsWithValue(t, "at the disco", func() { _ = g.Wait() }) - assert.PanicsWithValue(t, "at the disco", func() { _ = g.Wait() }) + assertPanicsWithValue(t, "at the disco", func() { _ = g.Wait() }) + assertPanicsWithValue(t, "at the disco", func() { _ = g.Wait() }) } func TestForgottenPipeLegiblePanic(t *testing.T) { @@ -419,33 +418,6 @@ func TestForgottenPipeLegiblePanic(t *testing.T) { // happens the panic will be stored in the executor, so we re-panic that // specific error with a more diagnostic message. blocker.Done() - assert.PanicsWithValue(t, "parallel executor pipe error: a "+ + assertPanicsWithValue(t, "parallel executor pipe error: a "+ "collector using this same executor was probably not awaited", exec.Wait) } - -func TestPanicNil(t *testing.T) { - // Read what is actually thrown when we call panic(nil) - nilPanic := func() (p any) { - defer func() { - p = recover() - }() - panic(nil) - }() - if nilPanic != nil { - // We are probably on go1.21 or later, where panic(nil) is transformed - // into a runtime.PanicNilError. - assert.Equal(t, - "PanicNilError", - reflect.TypeOf(nilPanic).Elem().Name()) - return - } - - t.Parallel() - g := Unlimited(context.Background()) - g.Go(func(context.Context) { - // Panics that are literally `nil` should also be caught, even though - // they aren't detectable without some trickery (prior to go1.21) - panic(nil) - }) - assert.PanicsWithValue(t, nil, g.Wait) -}