Skip to content

Commit

Permalink
Merge pull request #8 from wandb/panic-pretty-printing
Browse files Browse the repository at this point in the history
Panic pretty printing
  • Loading branch information
mumbleskates authored Oct 9, 2024
2 parents a13eedc + 8ce2b07 commit 5798e0c
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 11 deletions.
1 change: 0 additions & 1 deletion .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ linters:
- typecheck
- unused
- gci
- exportloopref
linters-settings:
goimports:
local-prefixes: "github.com/wandb/parallel"
Expand Down
4 changes: 2 additions & 2 deletions collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,10 +324,10 @@ func (pg *pipeGroup[T, R]) doWait() {
}()
// Runs first: Wait for inputs. Wait "quietly", not canceling the
// context yet so if there is an error later we can still see it
pg.g.quietWait()
pg.g.waitWithoutCanceling()
}()
// Runs third: Wait for outputs to be done
pg.pipeWorkers.quietWait()
pg.pipeWorkers.waitWithoutCanceling()
}

var _ CollectingExecutor[int] = collectingGroup[int]{}
Expand Down
33 changes: 25 additions & 8 deletions group.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package parallel
import (
"context"
"errors"
"fmt"
"runtime"
"runtime/debug"
"strings"
"sync"
"sync/atomic"
)
Expand All @@ -29,7 +31,7 @@ var (

// WorkerPanic represents a panic value propagated from a task within a parallel
// executor, and is the main type of panic that you might expect to receive.
type WorkerPanic struct {
type WorkerPanic struct { //nolint:errname
// Panic contains the originally panic()ed value.
Panic any
// Stacktraces contains the stacktraces of the panics. The stack trace of
Expand All @@ -39,6 +41,21 @@ type WorkerPanic struct {
Stacktraces []string
}

// We pretty-print our wrapped panic type including the captured stack traces.
func (wp WorkerPanic) Error() string {
var sb strings.Builder
for _, s := range wp.Stacktraces {
sb.WriteString(s)
sb.WriteByte('\n')
}
return fmt.Sprintf(
"%#v\n\nPrior %d executor stack trace(s), innermost first:\n%s",
wp.Panic,
len(wp.Stacktraces),
sb.String(),
)
}

// NOTE: If you want to really get crazy with it, it IS permissible and safe to
// call Go(...) from multiple threads without additional synchronization, on
// every kind of executor. HOWEVER: the caller always assumes full
Expand All @@ -65,7 +82,7 @@ type Executor interface {
getContext() (context.Context, context.CancelCauseFunc)
// Waits without canceling the context with errGroupDone. The caller of this
// function promises that they will be responsible for canceling the context
quietWait()
waitWithoutCanceling()
}

// Creates a basic executor which runs all the functions given in one goroutine
Expand Down Expand Up @@ -142,11 +159,11 @@ func (n *runner) Go(op func(context.Context)) {
}

func (n *runner) Wait() {
n.quietWait()
n.waitWithoutCanceling()
n.cancel(errGroupDone)
}

func (n *runner) quietWait() {
func (n *runner) waitWithoutCanceling() {
n.awaited.Store(true)
runtime.SetFinalizer(n, nil)
}
Expand Down Expand Up @@ -226,10 +243,10 @@ func (g *group) Go(op func(context.Context)) {

func (g *group) Wait() {
defer g.cancel(errGroupDone)
g.quietWait()
g.waitWithoutCanceling()
}

func (g *group) quietWait() {
func (g *group) waitWithoutCanceling() {
g.awaited.Store(true)
runtime.SetFinalizer(g, nil)
g.wg.Wait()
Expand Down Expand Up @@ -310,12 +327,12 @@ func (lg *limitedGroup) Wait() {
lg.g.Wait()
}

func (lg *limitedGroup) quietWait() {
func (lg *limitedGroup) waitWithoutCanceling() {
if !lg.awaited.Swap(true) {
close(lg.ops)
runtime.SetFinalizer(lg, nil) // Don't try to close this chan again :)
}
lg.g.quietWait()
lg.g.waitWithoutCanceling()
}

func (lg *limitedGroup) getContext() (context.Context, context.CancelCauseFunc) {
Expand Down
27 changes: 27 additions & 0 deletions panic_demo/panic_demo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package main

import (
"context"

"github.com/wandb/parallel"
)

func main() {
g := parallel.Unlimited(context.Background())
g.Go(bear)
g.Wait()
}

func bear(_ context.Context) {
g := parallel.Unlimited(context.Background())
g.Go(foo)
g.Wait()
}

func foo(_ context.Context) {
bar()
}

func bar() {
panic("baz")
}

0 comments on commit 5798e0c

Please sign in to comment.