Skip to content

Commit

Permalink
feat: do not recover from panics with fatal errors (#3534)
Browse files Browse the repository at this point in the history
* feat: do not recover from panics with fatal errors

* Do not panic on logical compaction issues (bugs)
  • Loading branch information
aleks-p authored Aug 30, 2024
1 parent c46bfc4 commit bf6f9c5
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 6 deletions.
15 changes: 15 additions & 0 deletions pkg/experiment/metastore/metastore_fsm.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metastore

import (
"errors"
"fmt"
"io"
"reflect"
Expand Down Expand Up @@ -57,6 +58,16 @@ type fsmError struct {
err error
}

type fatalCommandError struct {
err error
}

func (e fatalCommandError) Error() string {
return fmt.Sprintf("fatal FSM command error: %v", e.err)
}

func (e fatalCommandError) Unwrap() error { return e }

func errResponse(l *raft.Log, err error) fsmResponse {
return fsmResponse{err: &fsmError{log: l, err: err}}
}
Expand Down Expand Up @@ -129,6 +140,10 @@ func handleCommand[Req, Resp proto.Message](raw []byte, cmd *raft.Log, call comm
var resp fsmResponse
defer func() {
if r := recover(); r != nil {
var fCommandError fatalCommandError
if errors.As(r.(error), &fCommandError) {
panic(fCommandError)
}
resp.err = util.PanicError(r)
}
}()
Expand Down
80 changes: 80 additions & 0 deletions pkg/experiment/metastore/metastore_fsm_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package metastore

import (
"errors"
"testing"

"github.com/hashicorp/raft"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"

metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
)

var testError = errors.New("test error")
var typedNil *metastorev1.AddBlockResponse = nil

func Test_handleCommandErrorHandling(t *testing.T) {
type args[Req proto.Message, Resp proto.Message] struct {
raw []byte
cmd *raft.Log
call commandCall[Req, Resp]
}
type testCase[Req proto.Message, Resp proto.Message] struct {
name string
args args[Req, Resp]
want fsmResponse
wantPanic bool
}
tests := []testCase[*metastorev1.AddBlockRequest, *metastorev1.AddBlockResponse]{
{
name: "no error",
args: args[*metastorev1.AddBlockRequest, *metastorev1.AddBlockResponse]{
raw: make([]byte, 0),
cmd: &raft.Log{},
call: func(log *raft.Log, request *metastorev1.AddBlockRequest) (*metastorev1.AddBlockResponse, error) {
return &metastorev1.AddBlockResponse{}, nil
},
},
want: fsmResponse{
msg: &metastorev1.AddBlockResponse{},
err: nil,
},
},
{
name: "a simple error is returned",
args: args[*metastorev1.AddBlockRequest, *metastorev1.AddBlockResponse]{
raw: make([]byte, 0),
cmd: &raft.Log{},
call: func(log *raft.Log, request *metastorev1.AddBlockRequest) (*metastorev1.AddBlockResponse, error) {
return nil, testError
},
},
want: fsmResponse{
msg: typedNil,
err: testError,
},
},
{
name: "a panic with a fatal error results in a real panic",
args: args[*metastorev1.AddBlockRequest, *metastorev1.AddBlockResponse]{
raw: make([]byte, 0),
cmd: &raft.Log{},
call: func(log *raft.Log, request *metastorev1.AddBlockRequest) (*metastorev1.AddBlockResponse, error) {
panic(fatalCommandError{testError})
},
},
wantPanic: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
defer func() {
if r := recover(); r != nil {
assert.True(t, tt.wantPanic)
}
}()
assert.Equal(t, tt.want, handleCommand(tt.args.raw, tt.args.cmd, tt.args.call))
})
}
}
31 changes: 25 additions & 6 deletions pkg/experiment/metastore/metastore_state_poll_compaction_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ func (m *metastoreState) pollCompactionJobs(request *compactorv1.PollCompactionJ
level.Warn(m.logger).Log("msg", "job is not assigned to the worker", "job", jobUpdate.JobName, "raft_log_index", jobUpdate.RaftLogIndex)
continue
}
jobKey := tenantShard{tenant: job.TenantId, shard: job.Shard}
level.Debug(m.logger).Log("msg", "processing status update for compaction job", "job", jobUpdate.JobName, "status", jobUpdate.Status)
switch jobUpdate.Status {
case compactorv1.CompactionStatus_COMPACTION_STATUS_SUCCESS:
// clean up the job, we don't keep completed jobs around
m.compactionJobQueue.evict(job.Name, job.RaftLogIndex)
jobKey := tenantShard{tenant: job.TenantId, shard: job.Shard}
stateUpdate.deletedJobs[jobKey] = append(stateUpdate.deletedJobs[jobKey], job.Name)
m.compactionMetrics.completedJobs.WithLabelValues(
fmt.Sprint(job.Shard), job.TenantId, fmt.Sprint(job.CompactionLevel)).Inc()
Expand Down Expand Up @@ -179,7 +179,7 @@ func (m *metastoreState) pollCompactionJobs(request *compactorv1.PollCompactionJ

err = m.writeToDb(stateUpdate)
if err != nil {
panic(fmt.Errorf("error writing metastore compaction state to db: %w", err))
panic(fatalCommandError{fmt.Errorf("error persisting metadata state to db, %w", err)})
}

return resp, nil
Expand Down Expand Up @@ -268,7 +268,12 @@ func (m *metastoreState) writeToDb(sTable *pollStateUpdate) error {
for _, b := range blocks {
block := m.findBlock(shard, b)
if block == nil {
return fmt.Errorf("block %s not found in shard %d", b, shard)
level.Error(m.logger).Log(
"msg", "a newly compacted block could not be found",
"block", b,
"shard", shard,
)
continue
}
name, key := keyForBlockMeta(shard, "", b)
err := updateBlockMetadataBucket(tx, name, func(bucket *bbolt.Bucket) error {
Expand All @@ -294,7 +299,11 @@ func (m *metastoreState) writeToDb(sTable *pollStateUpdate) error {
for _, jobName := range sTable.newJobs {
job := m.findJob(jobName)
if job == nil {
return fmt.Errorf("job %s not found", jobName)
level.Error(m.logger).Log(
"msg", "a newly added job could not be found",
"job", jobName,
)
continue
}
err := m.persistCompactionJob(job.Shard, job.TenantId, job, tx)
if err != nil {
Expand All @@ -305,7 +314,13 @@ func (m *metastoreState) writeToDb(sTable *pollStateUpdate) error {
for _, l := range levels {
queue := m.getOrCreateCompactionBlockQueue(key).blocksByLevel[l]
if queue == nil {
return fmt.Errorf("block queue for %v and level %d not found", key, l)
level.Error(m.logger).Log(
"msg", "block queue not found",
"shard", key.shard,
"tenant", key.tenant,
"level", l,
)
continue
}
err := m.persistCompactionJobBlockQueue(key.shard, key.tenant, l, queue, tx)
if err != nil {
Expand All @@ -327,7 +342,11 @@ func (m *metastoreState) writeToDb(sTable *pollStateUpdate) error {
for _, jobName := range sTable.updatedJobs {
job := m.findJob(jobName)
if job == nil {
return fmt.Errorf("job %s not found", jobName)
level.Error(m.logger).Log(
"msg", "an updated job could not be found",
"job", jobName,
)
continue
}
err := m.persistCompactionJob(job.Shard, job.TenantId, job, tx)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,20 @@ func Test_FailedCompaction(t *testing.T) {
verifyCompactionState(t, m)
}

func Test_PanicWithDbErrors(t *testing.T) {
m := initState(t)
addLevel0Blocks(m, 20)

// set up panic recovery
defer func() {
r := recover()
require.NotNilf(t, r, "we should panic when a DB error is returned")
}()
// close the db, this should cause errors when persisting the state
_ = m.db.boltdb.Close()
_, _ = m.pollCompactionJobs(&compactorv1.PollCompactionJobsRequest{JobCapacity: 2}, 20, 20)
}

func addLevel0Blocks(m *metastoreState, count int) {
for i := 0; i < count; i++ {
b := createBlock(i, 0, "", 0)
Expand Down

0 comments on commit bf6f9c5

Please sign in to comment.