Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

capture stack traces in worker panics #6

Merged
merged 5 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/gotest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go-version: [ '1.20', '1.21.x', '1.22.x' ]
go-version: [ '1.21.x', '1.22.x', '1.23.x' ]

steps:
- uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/wandb/parallel

go 1.21
go 1.23.1

require github.com/stretchr/testify v1.9.0

Expand Down
47 changes: 41 additions & 6 deletions group.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"runtime"
"runtime/debug"
"sync"
"sync/atomic"
)
Expand All @@ -17,6 +18,18 @@ const misuseMessage = "parallel executor misuse: don't reuse executors"

var errPanicked = errors.New("panicked")

// WorkerPanic represents a panic value propagated from a task within a parallel
// executor, and is the main type of panic that you might expect to receive.
type WorkerPanic struct {
// Panic contains the originally panic()ed value.
Panic any
// Stacktraces contains the stacktraces of the panics. The stack trace of
// the line that threw the original Panic value appears first, and any other
// stack traces from other parallel groups that received this panic and re-
// threw it appear in order afterwards.
Stacktraces []string
}

// NOTE: If you want to really get crazy with it, it IS permissible and safe to
// call Go(...) from multiple threads without additional synchronization, on
// every kind of executor. HOWEVER: the caller always assumes full
Expand Down Expand Up @@ -110,7 +123,7 @@ func makeGroup(ctx context.Context, cancel context.CancelCauseFunc) *group {
type group struct {
runner
wg sync.WaitGroup
panicked atomic.Pointer[any] // Stores panic values
panicked atomic.Pointer[WorkerPanic] // Stores panic values
}

func (g *group) Go(op func(context.Context)) {
Expand All @@ -131,12 +144,34 @@ func (g *group) Go(op func(context.Context)) {
// When the function call has exited without returning, hoist
// the recover()ed panic value and a stack trace so it can be
// re-panicked.
//
// Currently we do not store stack traces from the panicked
// goroutine or distinguish between panics and runtime.Goexit().
p := recover()
g.panicked.CompareAndSwap(nil, &p)
g.cancel(errPanicked)
if p == nil {
// This is a runtime.Goexit(), such as from a process
// termination or a test failure; let that propagate instead
g.cancel(context.Canceled)
} else {
// If we are propagating a panic that is already a
// WorkerPanic (for example, if we have panics propagating
// through multiple parallel groups), just add our
// stacktrace onto the end of the slice; otherwise make a
// new WorkerPanic value.
var wp WorkerPanic
switch tp := p.(type) {
case WorkerPanic:
wp = WorkerPanic{
Panic: tp.Panic,
Stacktraces: append(tp.Stacktraces, string(debug.Stack())),
}
default:
wp = WorkerPanic{
Panic: p,
Stacktraces: []string{string(debug.Stack())},
}
}

g.panicked.CompareAndSwap(nil, &wp)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, why CompareAndSwap? Is there a risk of setting g.panicked twice?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pretty positive i just wanted to make sure that specifically the first panic sticks, rather than just "any panic who cares"

g.cancel(errPanicked)
}
}
g.wg.Done()
}()
Expand Down
34 changes: 33 additions & 1 deletion group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,20 @@ import (
"github.com/stretchr/testify/assert"
)

func assertPanicsWithValue(t *testing.T, expectedValue any, f func()) {
t.Helper()

defer func() {
p := recover()
if p == nil {
t.Fatal("didn't panic but should have")
}
assert.Equal(t, expectedValue, p.(WorkerPanic).Panic)
}()

f()
}

func TestGroup(t *testing.T) {
for _, test := range []struct {
name string
Expand Down Expand Up @@ -156,7 +170,7 @@ func testLimitedGroupMaxConcurrency(t *testing.T, name string, g Executor, limit
if shouldSucceed {
assert.NotPanics(t, jobInserter.Wait)
} else {
assert.PanicsWithValue(t, "poison pill", jobInserter.Wait)
assertPanicsWithValue(t, "poison pill", jobInserter.Wait)
}
})
}
Expand Down Expand Up @@ -213,3 +227,21 @@ func testConcurrentGroupWaitReallyWaits(t *testing.T, name string, g Executor) {
testingGroup.Wait()
})
}

func TestCanGoexit(t *testing.T) {
g := Unlimited(context.Background())
g.Go(func(context.Context) {
// Ideally we would test t.Fatal() here to show that parallel now plays
// nicely with the testing lib, but there doesn't seem to be any good
// way to xfail a golang test. As it happens t.Fatal() just sets a fail
// flag and then calls Goexit() anyway; if we treat nil recover() values
// as Goexit() (guaranteed since 1.21 with the advent of PanicNilError)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... realizes what it means that panic(nil) use to recover() as nil

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is some deep lore to how panic(nil) used to work (and be worked around)

// we can handle this very simply, without needing a "double defer
// sandwich".
//
// Either way, we expect Goexit() to work normally in tests now and not
// fail or re-panic.
runtime.Goexit()
})
g.Wait()
}
64 changes: 18 additions & 46 deletions safety_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package parallel
import (
"context"
"errors"
"reflect"
"runtime"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -40,7 +39,7 @@ func TestLimitedGroupCleanup(t *testing.T) {
op(nil) // have mercy and run those ops anyway, just so we get a full count
}
// The channel should get closed!
assert.Equal(t, int64(100), counter)
assert.Equal(t, int64(100), atomic.LoadInt64(&counter))
}

func TestCollectorCleanup(t *testing.T) {
Expand Down Expand Up @@ -134,7 +133,7 @@ func TestPanicGroup(t *testing.T) {
// Wait for the group to "die" when the panic hits
ctx, _ := g.getContext()
<-ctx.Done()
assert.PanicsWithValue(t, "wow", func() {
assertPanicsWithValue(t, "wow", func() {
g.Wait()
})
}
Expand All @@ -154,7 +153,7 @@ func TestPanicGroupSecondPath(t *testing.T) {
// Wait for the group to "die" when the panic hits
ctx, _ := g.getContext()
<-ctx.Done()
assert.PanicsWithValue(t, "wow", func() {
assertPanicsWithValue(t, "wow", func() {
g.Go(func(context.Context) {
t.Fatal("this op should never run")
})
Expand All @@ -179,7 +178,7 @@ func TestPanicLimitedGroup(t *testing.T) {
})
waitForNonPanic.Wait()
block.Done()
assert.PanicsWithValue(t, "lol", func() {
assertPanicsWithValue(t, "lol", func() {
g.Wait()
})
}
Expand All @@ -202,7 +201,7 @@ func TestPanicLimitedGroupSecondPath(t *testing.T) {
})
waitForNonPanic.Wait()
block.Done()
assert.PanicsWithValue(t, "lol", func() {
assertPanicsWithValue(t, "lol", func() {
// Eventually :)
for {
g.Go(func(context.Context) {})
Expand All @@ -218,7 +217,7 @@ func TestPanicFeedFunction(t *testing.T) {
g.Go(func(context.Context) (int, error) {
return 1, nil
})
assert.PanicsWithValue(t, "oh no!", func() { _ = g.Wait() })
assertPanicsWithValue(t, "oh no!", func() { _ = g.Wait() })
}

func TestPanicFeedWork(t *testing.T) {
Expand All @@ -230,7 +229,7 @@ func TestPanicFeedWork(t *testing.T) {
g.Go(func(context.Context) (int, error) {
panic("oh no!")
})
assert.PanicsWithValue(t, "oh no!", func() { _ = g.Wait() })
assertPanicsWithValue(t, "oh no!", func() { _ = g.Wait() })
}

func TestPanicFeedWorkSecondPath(t *testing.T) {
Expand All @@ -244,7 +243,7 @@ func TestPanicFeedWorkSecondPath(t *testing.T) {
})
ctx, _ := g.(feedingGroup[int]).g.getContext()
<-ctx.Done()
assert.PanicsWithValue(t, "oh no!", func() {
assertPanicsWithValue(t, "oh no!", func() {
g.Go(func(context.Context) (int, error) { return 2, nil })
})
}
Expand All @@ -270,7 +269,7 @@ func TestPanicFeedErrFunction(t *testing.T) {
g.Go(func(context.Context) (int, error) {
return 1, nil
})
assert.PanicsWithValue(t, "oh no!", func() { _ = g.Wait() })
assertPanicsWithValue(t, "oh no!", func() { _ = g.Wait() })
}

func TestPanicFeedErrWork(t *testing.T) {
Expand All @@ -282,7 +281,7 @@ func TestPanicFeedErrWork(t *testing.T) {
g.Go(func(context.Context) (int, error) {
panic("oh no!")
})
assert.PanicsWithValue(t, "oh no!", func() { _ = g.Wait() })
assertPanicsWithValue(t, "oh no!", func() { _ = g.Wait() })
}

func TestPanicFeedErrWorkSecondPath(t *testing.T) {
Expand All @@ -296,7 +295,7 @@ func TestPanicFeedErrWorkSecondPath(t *testing.T) {
})
ctx, _ := g.(feedingMultiErrGroup[int]).g.getContext()
<-ctx.Done()
assert.PanicsWithValue(t, "oh no!", func() {
assertPanicsWithValue(t, "oh no!", func() {
g.Go(func(context.Context) (int, error) { return 2, nil })
})
}
Expand Down Expand Up @@ -380,10 +379,10 @@ func TestGroupsPanicAgain(t *testing.T) {
innerGroup.Go(func(context.Context) { panic("at the disco") })
innerGroup.Wait()
})
assert.PanicsWithValue(t, "at the disco", outerGroup.Wait)
assert.PanicsWithValue(t, "at the disco", innerGroup.Wait)
assert.PanicsWithValue(t, "at the disco", outerGroup.Wait)
assert.PanicsWithValue(t, "at the disco", innerGroup.Wait)
assertPanicsWithValue(t, "at the disco", outerGroup.Wait)
assertPanicsWithValue(t, "at the disco", innerGroup.Wait)
assertPanicsWithValue(t, "at the disco", outerGroup.Wait)
assertPanicsWithValue(t, "at the disco", innerGroup.Wait)
})
}
}
Expand All @@ -392,8 +391,8 @@ func TestPipeGroupPanicsAgain(t *testing.T) {
t.Parallel()
g := Feed(Unlimited(context.Background()), func(context.Context, int) error { return nil })
g.Go(func(context.Context) (int, error) { panic("at the disco") })
assert.PanicsWithValue(t, "at the disco", func() { _ = g.Wait() })
assert.PanicsWithValue(t, "at the disco", func() { _ = g.Wait() })
assertPanicsWithValue(t, "at the disco", func() { _ = g.Wait() })
assertPanicsWithValue(t, "at the disco", func() { _ = g.Wait() })
}

func TestForgottenPipeLegiblePanic(t *testing.T) {
Expand All @@ -419,33 +418,6 @@ func TestForgottenPipeLegiblePanic(t *testing.T) {
// happens the panic will be stored in the executor, so we re-panic that
// specific error with a more diagnostic message.
blocker.Done()
assert.PanicsWithValue(t, "parallel executor pipe error: a "+
assertPanicsWithValue(t, "parallel executor pipe error: a "+
"collector using this same executor was probably not awaited", exec.Wait)
}

func TestPanicNil(t *testing.T) {
// Read what is actually thrown when we call panic(nil)
nilPanic := func() (p any) {
defer func() {
p = recover()
}()
panic(nil)
}()
if nilPanic != nil {
// We are probably on go1.21 or later, where panic(nil) is transformed
// into a runtime.PanicNilError.
assert.Equal(t,
"PanicNilError",
reflect.TypeOf(nilPanic).Elem().Name())
return
}

t.Parallel()
g := Unlimited(context.Background())
g.Go(func(context.Context) {
// Panics that are literally `nil` should also be caught, even though
// they aren't detectable without some trickery (prior to go1.21)
panic(nil)
})
assert.PanicsWithValue(t, nil, g.Wait)
}
Loading