Skip to content

Commit

Permalink
Merge pull request #6 from wandb/stacktraces
Browse files Browse the repository at this point in the history
capture stack traces in worker panics
  • Loading branch information
mumbleskates authored Sep 24, 2024
2 parents 1d7e087 + 9a15da7 commit c70b8da
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 55 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/gotest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go-version: [ '1.20', '1.21.x', '1.22.x' ]
go-version: [ '1.21.x', '1.22.x', '1.23.x' ]

steps:
- uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/wandb/parallel

go 1.21
go 1.23.1

require github.com/stretchr/testify v1.9.0

Expand Down
47 changes: 41 additions & 6 deletions group.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"runtime"
"runtime/debug"
"sync"
"sync/atomic"
)
Expand All @@ -17,6 +18,18 @@ const misuseMessage = "parallel executor misuse: don't reuse executors"

var errPanicked = errors.New("panicked")

// WorkerPanic represents a panic value propagated from a task within a parallel
// executor, and is the main type of panic that you might expect to receive.
type WorkerPanic struct {
// Panic contains the originally panic()ed value.
Panic any
// Stacktraces contains the stacktraces of the panics. The stack trace of
// the line that threw the original Panic value appears first, and any other
// stack traces from other parallel groups that received this panic and re-
// threw it appear in order afterwards.
Stacktraces []string
}

// NOTE: If you want to really get crazy with it, it IS permissible and safe to
// call Go(...) from multiple threads without additional synchronization, on
// every kind of executor. HOWEVER: the caller always assumes full
Expand Down Expand Up @@ -110,7 +123,7 @@ func makeGroup(ctx context.Context, cancel context.CancelCauseFunc) *group {
type group struct {
runner
wg sync.WaitGroup
panicked atomic.Pointer[any] // Stores panic values
panicked atomic.Pointer[WorkerPanic] // Stores panic values
}

func (g *group) Go(op func(context.Context)) {
Expand All @@ -131,12 +144,34 @@ func (g *group) Go(op func(context.Context)) {
// When the function call has exited without returning, hoist
// the recover()ed panic value and a stack trace so it can be
// re-panicked.
//
// Currently we do not store stack traces from the panicked
// goroutine or distinguish between panics and runtime.Goexit().
p := recover()
g.panicked.CompareAndSwap(nil, &p)
g.cancel(errPanicked)
if p == nil {
// This is a runtime.Goexit(), such as from a process
// termination or a test failure; let that propagate instead
g.cancel(context.Canceled)
} else {
// If we are propagating a panic that is already a
// WorkerPanic (for example, if we have panics propagating
// through multiple parallel groups), just add our
// stacktrace onto the end of the slice; otherwise make a
// new WorkerPanic value.
var wp WorkerPanic
switch tp := p.(type) {
case WorkerPanic:
wp = WorkerPanic{
Panic: tp.Panic,
Stacktraces: append(tp.Stacktraces, string(debug.Stack())),
}
default:
wp = WorkerPanic{
Panic: p,
Stacktraces: []string{string(debug.Stack())},
}
}

g.panicked.CompareAndSwap(nil, &wp)
g.cancel(errPanicked)
}
}
g.wg.Done()
}()
Expand Down
34 changes: 33 additions & 1 deletion group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,20 @@ import (
"github.com/stretchr/testify/assert"
)

func assertPanicsWithValue(t *testing.T, expectedValue any, f func()) {
t.Helper()

defer func() {
p := recover()
if p == nil {
t.Fatal("didn't panic but should have")
}
assert.Equal(t, expectedValue, p.(WorkerPanic).Panic)
}()

f()
}

func TestGroup(t *testing.T) {
for _, test := range []struct {
name string
Expand Down Expand Up @@ -156,7 +170,7 @@ func testLimitedGroupMaxConcurrency(t *testing.T, name string, g Executor, limit
if shouldSucceed {
assert.NotPanics(t, jobInserter.Wait)
} else {
assert.PanicsWithValue(t, "poison pill", jobInserter.Wait)
assertPanicsWithValue(t, "poison pill", jobInserter.Wait)
}
})
}
Expand Down Expand Up @@ -213,3 +227,21 @@ func testConcurrentGroupWaitReallyWaits(t *testing.T, name string, g Executor) {
testingGroup.Wait()
})
}

func TestCanGoexit(t *testing.T) {
g := Unlimited(context.Background())
g.Go(func(context.Context) {
// Ideally we would test t.Fatal() here to show that parallel now plays
// nicely with the testing lib, but there doesn't seem to be any good
// way to xfail a golang test. As it happens t.Fatal() just sets a fail
// flag and then calls Goexit() anyway; if we treat nil recover() values
// as Goexit() (guaranteed since 1.21 with the advent of PanicNilError)
// we can handle this very simply, without needing a "double defer
// sandwich".
//
// Either way, we expect Goexit() to work normally in tests now and not
// fail or re-panic.
runtime.Goexit()
})
g.Wait()
}
64 changes: 18 additions & 46 deletions safety_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package parallel
import (
"context"
"errors"
"reflect"
"runtime"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -40,7 +39,7 @@ func TestLimitedGroupCleanup(t *testing.T) {
op(nil) // have mercy and run those ops anyway, just so we get a full count
}
// The channel should get closed!
assert.Equal(t, int64(100), counter)
assert.Equal(t, int64(100), atomic.LoadInt64(&counter))
}

func TestCollectorCleanup(t *testing.T) {
Expand Down Expand Up @@ -134,7 +133,7 @@ func TestPanicGroup(t *testing.T) {
// Wait for the group to "die" when the panic hits
ctx, _ := g.getContext()
<-ctx.Done()
assert.PanicsWithValue(t, "wow", func() {
assertPanicsWithValue(t, "wow", func() {
g.Wait()
})
}
Expand All @@ -154,7 +153,7 @@ func TestPanicGroupSecondPath(t *testing.T) {
// Wait for the group to "die" when the panic hits
ctx, _ := g.getContext()
<-ctx.Done()
assert.PanicsWithValue(t, "wow", func() {
assertPanicsWithValue(t, "wow", func() {
g.Go(func(context.Context) {
t.Fatal("this op should never run")
})
Expand All @@ -179,7 +178,7 @@ func TestPanicLimitedGroup(t *testing.T) {
})
waitForNonPanic.Wait()
block.Done()
assert.PanicsWithValue(t, "lol", func() {
assertPanicsWithValue(t, "lol", func() {
g.Wait()
})
}
Expand All @@ -202,7 +201,7 @@ func TestPanicLimitedGroupSecondPath(t *testing.T) {
})
waitForNonPanic.Wait()
block.Done()
assert.PanicsWithValue(t, "lol", func() {
assertPanicsWithValue(t, "lol", func() {
// Eventually :)
for {
g.Go(func(context.Context) {})
Expand All @@ -218,7 +217,7 @@ func TestPanicFeedFunction(t *testing.T) {
g.Go(func(context.Context) (int, error) {
return 1, nil
})
assert.PanicsWithValue(t, "oh no!", func() { _ = g.Wait() })
assertPanicsWithValue(t, "oh no!", func() { _ = g.Wait() })
}

func TestPanicFeedWork(t *testing.T) {
Expand All @@ -230,7 +229,7 @@ func TestPanicFeedWork(t *testing.T) {
g.Go(func(context.Context) (int, error) {
panic("oh no!")
})
assert.PanicsWithValue(t, "oh no!", func() { _ = g.Wait() })
assertPanicsWithValue(t, "oh no!", func() { _ = g.Wait() })
}

func TestPanicFeedWorkSecondPath(t *testing.T) {
Expand All @@ -244,7 +243,7 @@ func TestPanicFeedWorkSecondPath(t *testing.T) {
})
ctx, _ := g.(feedingGroup[int]).g.getContext()
<-ctx.Done()
assert.PanicsWithValue(t, "oh no!", func() {
assertPanicsWithValue(t, "oh no!", func() {
g.Go(func(context.Context) (int, error) { return 2, nil })
})
}
Expand All @@ -270,7 +269,7 @@ func TestPanicFeedErrFunction(t *testing.T) {
g.Go(func(context.Context) (int, error) {
return 1, nil
})
assert.PanicsWithValue(t, "oh no!", func() { _ = g.Wait() })
assertPanicsWithValue(t, "oh no!", func() { _ = g.Wait() })
}

func TestPanicFeedErrWork(t *testing.T) {
Expand All @@ -282,7 +281,7 @@ func TestPanicFeedErrWork(t *testing.T) {
g.Go(func(context.Context) (int, error) {
panic("oh no!")
})
assert.PanicsWithValue(t, "oh no!", func() { _ = g.Wait() })
assertPanicsWithValue(t, "oh no!", func() { _ = g.Wait() })
}

func TestPanicFeedErrWorkSecondPath(t *testing.T) {
Expand All @@ -296,7 +295,7 @@ func TestPanicFeedErrWorkSecondPath(t *testing.T) {
})
ctx, _ := g.(feedingMultiErrGroup[int]).g.getContext()
<-ctx.Done()
assert.PanicsWithValue(t, "oh no!", func() {
assertPanicsWithValue(t, "oh no!", func() {
g.Go(func(context.Context) (int, error) { return 2, nil })
})
}
Expand Down Expand Up @@ -380,10 +379,10 @@ func TestGroupsPanicAgain(t *testing.T) {
innerGroup.Go(func(context.Context) { panic("at the disco") })
innerGroup.Wait()
})
assert.PanicsWithValue(t, "at the disco", outerGroup.Wait)
assert.PanicsWithValue(t, "at the disco", innerGroup.Wait)
assert.PanicsWithValue(t, "at the disco", outerGroup.Wait)
assert.PanicsWithValue(t, "at the disco", innerGroup.Wait)
assertPanicsWithValue(t, "at the disco", outerGroup.Wait)
assertPanicsWithValue(t, "at the disco", innerGroup.Wait)
assertPanicsWithValue(t, "at the disco", outerGroup.Wait)
assertPanicsWithValue(t, "at the disco", innerGroup.Wait)
})
}
}
Expand All @@ -392,8 +391,8 @@ func TestPipeGroupPanicsAgain(t *testing.T) {
t.Parallel()
g := Feed(Unlimited(context.Background()), func(context.Context, int) error { return nil })
g.Go(func(context.Context) (int, error) { panic("at the disco") })
assert.PanicsWithValue(t, "at the disco", func() { _ = g.Wait() })
assert.PanicsWithValue(t, "at the disco", func() { _ = g.Wait() })
assertPanicsWithValue(t, "at the disco", func() { _ = g.Wait() })
assertPanicsWithValue(t, "at the disco", func() { _ = g.Wait() })
}

func TestForgottenPipeLegiblePanic(t *testing.T) {
Expand All @@ -419,33 +418,6 @@ func TestForgottenPipeLegiblePanic(t *testing.T) {
// happens the panic will be stored in the executor, so we re-panic that
// specific error with a more diagnostic message.
blocker.Done()
assert.PanicsWithValue(t, "parallel executor pipe error: a "+
assertPanicsWithValue(t, "parallel executor pipe error: a "+
"collector using this same executor was probably not awaited", exec.Wait)
}

func TestPanicNil(t *testing.T) {
// Read what is actually thrown when we call panic(nil)
nilPanic := func() (p any) {
defer func() {
p = recover()
}()
panic(nil)
}()
if nilPanic != nil {
// We are probably on go1.21 or later, where panic(nil) is transformed
// into a runtime.PanicNilError.
assert.Equal(t,
"PanicNilError",
reflect.TypeOf(nilPanic).Elem().Name())
return
}

t.Parallel()
g := Unlimited(context.Background())
g.Go(func(context.Context) {
// Panics that are literally `nil` should also be caught, even though
// they aren't detectable without some trickery (prior to go1.21)
panic(nil)
})
assert.PanicsWithValue(t, nil, g.Wait)
}

0 comments on commit c70b8da

Please sign in to comment.