diff --git a/test/integration_test.go b/test/integration_test.go index fb64f2093..56f790be6 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -4458,6 +4458,56 @@ func (ts *IntegrationTestSuite) testNonDeterminismFailureCause(historyMismatch b ts.True(taskFailedMetric >= 1) } +func (ts *IntegrationTestSuite) TestNonDeterminismFailureCauseReplay() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + fetchMetrics := func() (localMetric int64) { + for _, counter := range ts.metricsHandler.Counters() { + counter := counter + if counter.Name == "temporal_workflow_task_execution_failed" && counter.Tags["failure_reason"] == "NonDeterminismError" { + localMetric = counter.Value() + } + } + return + } + + // Confirm no metrics to start + taskFailedMetric := fetchMetrics() + ts.Zero(taskFailedMetric) + + // Start workflow + forcedNonDeterminismCounter = 0 + run, err := ts.client.ExecuteWorkflow( + ctx, + ts.startWorkflowOptions("test-non-determinism-failure-cause-replay-"+uuid.New()), + ts.workflows.NonDeterminismReplay, + ) + + ts.NoError(err) + defer func() { _ = ts.client.TerminateWorkflow(ctx, run.GetID(), run.GetRunID(), "", nil) }() + ts.NoError(run.Get(ctx, nil)) + + // Now, stop the worker and start a new one + ts.worker.Stop() + ts.workerStopped = true + nextWorker := worker.New(ts.client, ts.taskQueueName, worker.Options{}) + ts.registerWorkflowsAndActivities(nextWorker) + ts.NoError(nextWorker.Start()) + defer nextWorker.Stop() + + // Increase the determinism counter and send a tick to trigger replay + // non-determinism + forcedNonDeterminismCounter++ + fmt.Println("Querying workflow") + _, err = ts.client.QueryWorkflow(ctx, run.GetID(), run.GetRunID(), client.QueryTypeStackTrace, nil) + ts.Error(err) + ts.Equal("context deadline exceeded", err.Error()) + + taskFailedMetric = fetchMetrics() + ts.True(taskFailedMetric >= 1) +} + func (ts *IntegrationTestSuite) TestDeterminismUpsertSearchAttributesConditional() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() diff --git a/test/workflow_test.go b/test/workflow_test.go index aa042b05f..562761d8d 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -2784,6 +2784,18 @@ func (w *Workflows) ForcedNonDeterminism(ctx workflow.Context, sameCommandButDif return } +func (w *Workflows) NonDeterminismReplay(ctx workflow.Context) error { + ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions()) + var a Activities + var err error + if forcedNonDeterminismCounter == 0 { + err = workflow.ExecuteActivity(ctx, a.Sleep, 1*time.Millisecond).Get(ctx, nil) + } else { + err = workflow.Sleep(ctx, 1*time.Millisecond) + } + return err +} + func (w *Workflows) ScheduleTypedSearchAttributesWorkflow(ctx workflow.Context) (string, error) { attributes := workflow.GetTypedSearchAttributes(ctx) @@ -3259,6 +3271,7 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.SignalCounter) worker.RegisterWorkflow(w.PanicOnSignal) worker.RegisterWorkflow(w.ForcedNonDeterminism) + worker.RegisterWorkflow(w.NonDeterminismReplay) worker.RegisterWorkflow(w.MutableSideEffect) worker.RegisterWorkflow(w.HistoryLengths) worker.RegisterWorkflow(w.HeartbeatSpecificCount)