Skip to content

Commit

Permalink
Include updateID and updateName in update logger (#1660)
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns authored Oct 8, 2024
1 parent b300e50 commit b4e934e
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 18 deletions.
2 changes: 2 additions & 0 deletions internal/internal_logging_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,6 @@ const (
tagNexusService = "NexusService"
tagPanicError = "PanicError"
tagPanicStack = "PanicStack"
tagUpdateID = "UpdateID"
tagUpdateName = "UpdateName"
)
9 changes: 8 additions & 1 deletion internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -1262,7 +1262,14 @@ func GetLogger(ctx Context) log.Logger {
}

func (wc *workflowEnvironmentInterceptor) GetLogger(ctx Context) log.Logger {
return wc.env.GetLogger()
logger := wc.env.GetLogger()
// Add update info to the logger if available
uc := ctx.Value(updateInfoContextKey)
if uc == nil {
return logger
}
updateInfo := uc.(*UpdateInfo)
return log.With(logger, tagUpdateID, updateInfo.ID, tagUpdateName, updateInfo.Name)
}

// GetMetricsHandler returns a metrics handler to be used in workflow's context
Expand Down
97 changes: 81 additions & 16 deletions internal/workflow_testsuite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,22 @@ func TestAllHandlersFinished(t *testing.T) {
require.Equal(t, 2, result)
}

// parseLogs parses the logs from the buffer and returns the logs as a slice of maps
func parseLogs(t *testing.T, buf *bytes.Buffer) []map[string]any {
var ms []map[string]any
for _, line := range bytes.Split(buf.Bytes(), []byte{'\n'}) {
if len(line) == 0 {
continue
}
var m map[string]any
err := json.Unmarshal(line, &m)
require.NoError(t, err)
fmt.Println(m)
ms = append(ms, m)
}
return ms
}

func TestWorkflowAllHandlersFinished(t *testing.T) {
// runWf runs a workflow that sends two updates and then signals the workflow to complete
runWf := func(completionType string, buf *bytes.Buffer) (int, error) {
Expand Down Expand Up @@ -648,21 +664,6 @@ func TestWorkflowAllHandlersFinished(t *testing.T) {
require.NoError(t, env.GetWorkflowResult(&result))
return result, nil
}
// parseLogs parses the logs from the buffer and returns the logs as a slice of maps
parseLogs := func(buf *bytes.Buffer) []map[string]any {
var ms []map[string]any
for _, line := range bytes.Split(buf.Bytes(), []byte{'\n'}) {
if len(line) == 0 {
continue
}
var m map[string]any
err := json.Unmarshal(line, &m)
require.NoError(t, err)
fmt.Println(m)
ms = append(ms, m)
}
return ms
}
// parseWarnedUpdates parses the warned updates from the logs and returns them as a slice of maps
parseWarnedUpdates := func(updates interface{}) []map[string]interface{} {
var warnedUpdates []map[string]interface{}
Expand All @@ -674,7 +675,7 @@ func TestWorkflowAllHandlersFinished(t *testing.T) {
}
// assertExpectedLogs asserts that the logs in the buffer are as expected
assertExpectedLogs := func(t *testing.T, buf *bytes.Buffer, shouldWarn bool) {
logs := parseLogs(buf)
logs := parseLogs(t, buf)
if shouldWarn {
require.Len(t, logs, 1)
require.Equal(t, unhandledUpdateWarningMessage, logs[0]["msg"])
Expand Down Expand Up @@ -718,6 +719,70 @@ func TestWorkflowAllHandlersFinished(t *testing.T) {
})
}

func TestWorkflowUpdateLogger(t *testing.T) {
var suite WorkflowTestSuite
var buf bytes.Buffer
th := slog.NewJSONHandler(&buf, &slog.HandlerOptions{Level: slog.LevelInfo})
suite.SetLogger(log.NewStructuredLogger(slog.New(th)))
env := suite.NewTestWorkflowEnvironment()

env.RegisterDelayedCallback(func() {
env.UpdateWorkflow("logging_update", "id_1", &updateCallback{
reject: func(err error) {
require.Fail(t, "update should not be rejected")
},
accept: func() {},
complete: func(interface{}, error) {},
})
}, 0)

env.RegisterDelayedCallback(func() {
env.SignalWorkflow("completion", nil)
}, time.Minute*2)

env.ExecuteWorkflow(func(ctx Context) (int, error) {
var ranUpdates int
err := SetUpdateHandler(ctx, "logging_update", func(ctx Context) error {
ranUpdates++
log := GetLogger(ctx)
log.Info("logging update handler")
return nil
}, UpdateHandlerOptions{
Validator: func(ctx Context) error {
log := GetLogger(ctx)
log.Info("logging update validator")
return nil
},
})
if err != nil {
return 0, err
}

var completeType string
s := NewSelector(ctx)
s.AddReceive(ctx.Done(), func(c ReceiveChannel, more bool) {
completeType = "cancel"
}).AddReceive(GetSignalChannel(ctx, "completion"), func(c ReceiveChannel, more bool) {
c.Receive(ctx, &completeType)
}).Select(ctx)
return ranUpdates, nil
})

require.NoError(t, env.GetWorkflowError())
var result int
require.NoError(t, env.GetWorkflowResult(&result))
// Verify logs
logs := parseLogs(t, &buf)
require.Len(t, logs, 2)
require.Equal(t, logs[0][tagUpdateName], "logging_update")
require.Equal(t, logs[0][tagUpdateID], "id_1")
require.Equal(t, logs[0]["msg"], "logging update validator")
require.Equal(t, logs[1][tagUpdateName], "logging_update")
require.Equal(t, logs[1][tagUpdateID], "id_1")
require.Equal(t, logs[1]["msg"], "logging update handler")

}

func TestWorkflowStartTimeInsideTestWorkflow(t *testing.T) {
var suite WorkflowTestSuite
env := suite.NewTestWorkflowEnvironment()
Expand Down
6 changes: 5 additions & 1 deletion workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,11 @@ func GetCurrentUpdateInfo(ctx Context) *UpdateInfo {
return internal.GetCurrentUpdateInfo(ctx)
}

// GetLogger returns a logger to be used in workflow's context
// GetLogger returns a logger to be used in workflow's context.
// This logger does not record logs during replay.
//
// The logger may also extract additional fields from the context, such as update info
// if used in an update handler.
func GetLogger(ctx Context) log.Logger {
return internal.GetLogger(ctx)
}
Expand Down

0 comments on commit b4e934e

Please sign in to comment.