From b4e934e78096efef7f02f6cf1bea8d149818562d Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Tue, 8 Oct 2024 09:27:49 -0700 Subject: [PATCH] Include updateID and updateName in update logger (#1660) --- internal/internal_logging_tags.go | 2 + internal/workflow.go | 9 ++- internal/workflow_testsuite_test.go | 97 ++++++++++++++++++++++++----- workflow/workflow.go | 6 +- 4 files changed, 96 insertions(+), 18 deletions(-) diff --git a/internal/internal_logging_tags.go b/internal/internal_logging_tags.go index 1d49ce8ff..963ee922e 100644 --- a/internal/internal_logging_tags.go +++ b/internal/internal_logging_tags.go @@ -55,4 +55,6 @@ const ( tagNexusService = "NexusService" tagPanicError = "PanicError" tagPanicStack = "PanicStack" + tagUpdateID = "UpdateID" + tagUpdateName = "UpdateName" ) diff --git a/internal/workflow.go b/internal/workflow.go index 0f9b3e542..8cae1b815 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -1262,7 +1262,14 @@ func GetLogger(ctx Context) log.Logger { } func (wc *workflowEnvironmentInterceptor) GetLogger(ctx Context) log.Logger { - return wc.env.GetLogger() + logger := wc.env.GetLogger() + // Add update info to the logger if available + uc := ctx.Value(updateInfoContextKey) + if uc == nil { + return logger + } + updateInfo := uc.(*UpdateInfo) + return log.With(logger, tagUpdateID, updateInfo.ID, tagUpdateName, updateInfo.Name) } // GetMetricsHandler returns a metrics handler to be used in workflow's context diff --git a/internal/workflow_testsuite_test.go b/internal/workflow_testsuite_test.go index b9f779845..3fc46146b 100644 --- a/internal/workflow_testsuite_test.go +++ b/internal/workflow_testsuite_test.go @@ -545,6 +545,22 @@ func TestAllHandlersFinished(t *testing.T) { require.Equal(t, 2, result) } +// parseLogs parses the logs from the buffer and returns the logs as a slice of maps +func parseLogs(t *testing.T, buf *bytes.Buffer) []map[string]any { + var ms []map[string]any + for _, line := range bytes.Split(buf.Bytes(), []byte{'\n'}) { + if len(line) == 0 { + continue + } + var m map[string]any + err := json.Unmarshal(line, &m) + require.NoError(t, err) + fmt.Println(m) + ms = append(ms, m) + } + return ms +} + func TestWorkflowAllHandlersFinished(t *testing.T) { // runWf runs a workflow that sends two updates and then signals the workflow to complete runWf := func(completionType string, buf *bytes.Buffer) (int, error) { @@ -648,21 +664,6 @@ func TestWorkflowAllHandlersFinished(t *testing.T) { require.NoError(t, env.GetWorkflowResult(&result)) return result, nil } - // parseLogs parses the logs from the buffer and returns the logs as a slice of maps - parseLogs := func(buf *bytes.Buffer) []map[string]any { - var ms []map[string]any - for _, line := range bytes.Split(buf.Bytes(), []byte{'\n'}) { - if len(line) == 0 { - continue - } - var m map[string]any - err := json.Unmarshal(line, &m) - require.NoError(t, err) - fmt.Println(m) - ms = append(ms, m) - } - return ms - } // parseWarnedUpdates parses the warned updates from the logs and returns them as a slice of maps parseWarnedUpdates := func(updates interface{}) []map[string]interface{} { var warnedUpdates []map[string]interface{} @@ -674,7 +675,7 @@ func TestWorkflowAllHandlersFinished(t *testing.T) { } // assertExpectedLogs asserts that the logs in the buffer are as expected assertExpectedLogs := func(t *testing.T, buf *bytes.Buffer, shouldWarn bool) { - logs := parseLogs(buf) + logs := parseLogs(t, buf) if shouldWarn { require.Len(t, logs, 1) require.Equal(t, unhandledUpdateWarningMessage, logs[0]["msg"]) @@ -718,6 +719,70 @@ func TestWorkflowAllHandlersFinished(t *testing.T) { }) } +func TestWorkflowUpdateLogger(t *testing.T) { + var suite WorkflowTestSuite + var buf bytes.Buffer + th := slog.NewJSONHandler(&buf, &slog.HandlerOptions{Level: slog.LevelInfo}) + suite.SetLogger(log.NewStructuredLogger(slog.New(th))) + env := suite.NewTestWorkflowEnvironment() + + env.RegisterDelayedCallback(func() { + env.UpdateWorkflow("logging_update", "id_1", &updateCallback{ + reject: func(err error) { + require.Fail(t, "update should not be rejected") + }, + accept: func() {}, + complete: func(interface{}, error) {}, + }) + }, 0) + + env.RegisterDelayedCallback(func() { + env.SignalWorkflow("completion", nil) + }, time.Minute*2) + + env.ExecuteWorkflow(func(ctx Context) (int, error) { + var ranUpdates int + err := SetUpdateHandler(ctx, "logging_update", func(ctx Context) error { + ranUpdates++ + log := GetLogger(ctx) + log.Info("logging update handler") + return nil + }, UpdateHandlerOptions{ + Validator: func(ctx Context) error { + log := GetLogger(ctx) + log.Info("logging update validator") + return nil + }, + }) + if err != nil { + return 0, err + } + + var completeType string + s := NewSelector(ctx) + s.AddReceive(ctx.Done(), func(c ReceiveChannel, more bool) { + completeType = "cancel" + }).AddReceive(GetSignalChannel(ctx, "completion"), func(c ReceiveChannel, more bool) { + c.Receive(ctx, &completeType) + }).Select(ctx) + return ranUpdates, nil + }) + + require.NoError(t, env.GetWorkflowError()) + var result int + require.NoError(t, env.GetWorkflowResult(&result)) + // Verify logs + logs := parseLogs(t, &buf) + require.Len(t, logs, 2) + require.Equal(t, logs[0][tagUpdateName], "logging_update") + require.Equal(t, logs[0][tagUpdateID], "id_1") + require.Equal(t, logs[0]["msg"], "logging update validator") + require.Equal(t, logs[1][tagUpdateName], "logging_update") + require.Equal(t, logs[1][tagUpdateID], "id_1") + require.Equal(t, logs[1]["msg"], "logging update handler") + +} + func TestWorkflowStartTimeInsideTestWorkflow(t *testing.T) { var suite WorkflowTestSuite env := suite.NewTestWorkflowEnvironment() diff --git a/workflow/workflow.go b/workflow/workflow.go index d9a9c181c..a96e690a9 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -278,7 +278,11 @@ func GetCurrentUpdateInfo(ctx Context) *UpdateInfo { return internal.GetCurrentUpdateInfo(ctx) } -// GetLogger returns a logger to be used in workflow's context +// GetLogger returns a logger to be used in workflow's context. +// This logger does not record logs during replay. +// +// The logger may also extract additional fields from the context, such as update info +// if used in an update handler. func GetLogger(ctx Context) log.Logger { return internal.GetLogger(ctx) }