From b0dfcd0bd6c851cd5dc2280103600cfa59189eb3 Mon Sep 17 00:00:00 2001 From: taylan isikdemir Date: Fri, 3 Nov 2023 15:15:35 -0700 Subject: [PATCH 1/4] Introduce EnableStrictNonDeterminismCheck worker option --- internal/internal_task_handlers.go | 12 +++++++++++- internal/internal_worker.go | 2 +- internal/internal_worker_test.go | 15 +++++++++++++++ internal/worker.go | 21 +++++++++++++++++++++ 4 files changed, 48 insertions(+), 2 deletions(-) diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index ed7c448d5..0bcb50908 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -131,6 +131,7 @@ type ( contextPropagators []ContextPropagator tracer opentracing.Tracer workflowInterceptorFactories []WorkflowInterceptorFactory + enableStrictNonDeterminism bool } activityProvider func(name string) activity @@ -388,7 +389,7 @@ func newWorkflowTaskHandler( registry *registry, ) WorkflowTaskHandler { ensureRequiredParams(¶ms) - return &workflowTaskHandlerImpl{ + wth := &workflowTaskHandlerImpl{ domain: domain, logger: params.Logger, ppMgr: ppMgr, @@ -402,7 +403,16 @@ func newWorkflowTaskHandler( contextPropagators: params.ContextPropagators, tracer: params.Tracer, workflowInterceptorFactories: params.WorkflowInterceptorChainFactories, + enableStrictNonDeterminism: params.WorkerBugPorts.EnableStrictNonDeterminismCheck, } + + traceLog(func() { + wth.logger.Debug("Workflow task handler is created.", + zap.String(tagDomain, wth.domain), + zap.Bool("EnableStrictNonDeterminism", wth.enableStrictNonDeterminism)) + }) + + return wth } // TODO: need a better eviction policy based on memory usage diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 04dbd1550..75beddda3 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -1041,7 +1041,7 @@ func newAggregatedWorker( var workflowWorker *workflowWorker if !wOptions.DisableWorkflowWorker { testTags := getTestTags(wOptions.BackgroundActivityContext) - if testTags != nil && len(testTags) > 0 { + if len(testTags) > 0 { workflowWorker = newWorkflowWorkerWithPressurePoints( service, domain, diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index 72c52a994..0f4b2318c 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -227,6 +227,14 @@ func (s *internalWorkerTestSuite) TestCreateWorker_WithAutoScaler() { worker.Stop() } +func (s *internalWorkerTestSuite) TestCreateWorker_WithStrictNonDeterminism() { + worker := createWorkerWithStrictNonDeterminismOption(s.T(), s.service) + err := worker.Start() + require.NoError(s.T(), err) + time.Sleep(time.Millisecond * 200) + worker.Stop() +} + func (s *internalWorkerTestSuite) TestCreateWorker_WithHost() { worker := createWorkerWithHost(s.T(), s.service) err := worker.Start() @@ -425,6 +433,13 @@ func createWorkerWithAutoscaler( return createWorkerWithThrottle(t, service, float64(0), WorkerOptions{FeatureFlags: FeatureFlags{PollerAutoScalerEnabled: true}}) } +func createWorkerWithStrictNonDeterminismOption( + t *testing.T, + service *workflowservicetest.MockClient, +) *aggregatedWorker { + return createWorkerWithThrottle(t, service, float64(0), WorkerOptions{WorkerBugPorts: WorkerBugPorts{EnableStrictNonDeterminismCheck: true}}) +} + func createWorkerWithHost( t *testing.T, service *workflowservicetest.MockClient, diff --git a/internal/worker.go b/internal/worker.go index c5225bed2..5a8bf8daa 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -266,6 +266,27 @@ type ( // Optional: Host is just string on the machine running the client // default: empty string Host string + + // Optional: See WorkerBugPorts for more details + WorkerBugPorts WorkerBugPorts + } + + // WorkerBugPorts allows opt-in enabling of older, possibly buggy behavior, primarily intended to allow temporarily + // emulating old behavior until a fix is deployed. + // By default, bugs (especially rarely-occurring ones) are fixed and all users are opted into the new behavior. + // Back-ported buggy behavior *may* be available via these flags. + // + // Bugports are always deprecated and may be removed in future versions. + // Generally speaking they will *likely* remain in place for one minor version, and then they may be removed to + // allow cleaning up the additional code complexity that they cause. + WorkerBugPorts struct { + // Optional: Enable strict non-determinism checks for workflow. + // There are some non-determinism cases which are missed by original implementation and a fix is on the way. + // The fix will be activated by this option which basicakky accuracy of the non-determinism checks. + // Exposing this as bugport for now to avoid breaking existing workflows which are actually non-deterministic but users depend on this. + // Once we identify such cases and notify users, we can enable this by default. + // default: false + EnableStrictNonDeterminismCheck bool } ) From e9c9ada91c67d12672cd5fb217a46b5e24fd9511 Mon Sep 17 00:00:00 2001 From: taylan isikdemir Date: Fri, 3 Nov 2023 21:00:52 -0700 Subject: [PATCH 2/4] Use standard DEPRECATED: --- internal/worker.go | 7 ++++++- internal/workflow.go | 6 +++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/internal/worker.go b/internal/worker.go index 5a8bf8daa..704788446 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -268,6 +268,8 @@ type ( Host string // Optional: See WorkerBugPorts for more details + // + // Deprecated: This field is deprecated and will be removed in the future. WorkerBugPorts WorkerBugPorts } @@ -279,13 +281,16 @@ type ( // Bugports are always deprecated and may be removed in future versions. // Generally speaking they will *likely* remain in place for one minor version, and then they may be removed to // allow cleaning up the additional code complexity that they cause. + // Deprecated: This is deprecated and will be removed in the future. + // WorkerBugPorts struct { // Optional: Enable strict non-determinism checks for workflow. // There are some non-determinism cases which are missed by original implementation and a fix is on the way. // The fix will be activated by this option which basicakky accuracy of the non-determinism checks. // Exposing this as bugport for now to avoid breaking existing workflows which are actually non-deterministic but users depend on this. // Once we identify such cases and notify users, we can enable this by default. - // default: false + // + // Deprecated: This field is deprecated and will be removed in the future. EnableStrictNonDeterminismCheck bool } ) diff --git a/internal/workflow.go b/internal/workflow.go index b762c24bb..9df6fe499 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -481,7 +481,7 @@ type ( // Generally speaking they will *likely* remain in place for one minor version, and then they may be removed to // allow cleaning up the additional code complexity that they cause. // - // deprecated + // Deprecated: This field is deprecated and will be removed in the future. Bugports Bugports } @@ -500,7 +500,7 @@ type ( // Generally speaking they will *likely* remain in place for one minor version, and then they may be removed to // allow cleaning up the additional code complexity that they cause. // - // deprecated + // DEPRECATED: This is deprecated and will be removed in the future. Bugports struct { // StartChildWorkflowsOnCanceledContext allows emulating older, buggy behavior that existed prior to v0.18.4. // @@ -530,7 +530,7 @@ type ( // // Added in 0.18.4, this may be removed in or after v0.19.0, so please migrate off of it ASAP. // - // deprecated + // Deprecated: This field is deprecated and will be removed in the future. StartChildWorkflowsOnCanceledContext bool } ) From f6ec95c784b7e7f44d64f39e80165b2241fc3937 Mon Sep 17 00:00:00 2001 From: taylan isikdemir Date: Mon, 6 Nov 2023 16:06:31 -0800 Subject: [PATCH 3/4] address review feedback --- internal/internal_worker_test.go | 14 +++++++------- internal/worker.go | 7 +++---- internal/workflow.go | 6 +++--- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index 0f4b2318c..72328d868 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -196,7 +196,7 @@ func testActivityMultipleArgsWithStruct(ctx context.Context, i int, s testActivi } func (s *internalWorkerTestSuite) TestCreateWorker() { - worker := createWorkerWithThrottle(s.T(), s.service, float64(500.0), WorkerOptions{}) + worker := createWorkerWithThrottle(s.T(), s.service, 500, WorkerOptions{}) err := worker.Start() require.NoError(s.T(), err) time.Sleep(time.Millisecond * 200) @@ -356,7 +356,7 @@ func createWorker( t *testing.T, service *workflowservicetest.MockClient, ) *aggregatedWorker { - return createWorkerWithThrottle(t, service, float64(0.0), WorkerOptions{}) + return createWorkerWithThrottle(t, service, 0, WorkerOptions{}) } func createShadowWorker( @@ -364,7 +364,7 @@ func createShadowWorker( service *workflowservicetest.MockClient, shadowOptions *ShadowOptions, ) *aggregatedWorker { - return createWorkerWithThrottle(t, service, float64(0.0), WorkerOptions{ + return createWorkerWithThrottle(t, service, 0, WorkerOptions{ EnableShadowWorker: true, ShadowOptions: *shadowOptions, }) @@ -423,28 +423,28 @@ func createWorkerWithDataConverter( t *testing.T, service *workflowservicetest.MockClient, ) *aggregatedWorker { - return createWorkerWithThrottle(t, service, float64(0.0), WorkerOptions{DataConverter: newTestDataConverter()}) + return createWorkerWithThrottle(t, service, 0, WorkerOptions{DataConverter: newTestDataConverter()}) } func createWorkerWithAutoscaler( t *testing.T, service *workflowservicetest.MockClient, ) *aggregatedWorker { - return createWorkerWithThrottle(t, service, float64(0), WorkerOptions{FeatureFlags: FeatureFlags{PollerAutoScalerEnabled: true}}) + return createWorkerWithThrottle(t, service, 0, WorkerOptions{FeatureFlags: FeatureFlags{PollerAutoScalerEnabled: true}}) } func createWorkerWithStrictNonDeterminismOption( t *testing.T, service *workflowservicetest.MockClient, ) *aggregatedWorker { - return createWorkerWithThrottle(t, service, float64(0), WorkerOptions{WorkerBugPorts: WorkerBugPorts{EnableStrictNonDeterminismCheck: true}}) + return createWorkerWithThrottle(t, service, 0, WorkerOptions{WorkerBugPorts: WorkerBugPorts{EnableStrictNonDeterminismCheck: true}}) } func createWorkerWithHost( t *testing.T, service *workflowservicetest.MockClient, ) *aggregatedWorker { - return createWorkerWithThrottle(t, service, float64(0), WorkerOptions{Host: "test_host"}) + return createWorkerWithThrottle(t, service, 0, WorkerOptions{Host: "test_host"}) } func (s *internalWorkerTestSuite) testCompleteActivityHelper(opt *ClientOptions) { diff --git a/internal/worker.go b/internal/worker.go index 704788446..be388ef01 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -269,7 +269,7 @@ type ( // Optional: See WorkerBugPorts for more details // - // Deprecated: This field is deprecated and will be removed in the future. + // Deprecated: All bugports are always deprecated and may be removed at any time. WorkerBugPorts WorkerBugPorts } @@ -281,8 +281,7 @@ type ( // Bugports are always deprecated and may be removed in future versions. // Generally speaking they will *likely* remain in place for one minor version, and then they may be removed to // allow cleaning up the additional code complexity that they cause. - // Deprecated: This is deprecated and will be removed in the future. - // + // Deprecated: All bugports are always deprecated and may be removed at any time WorkerBugPorts struct { // Optional: Enable strict non-determinism checks for workflow. // There are some non-determinism cases which are missed by original implementation and a fix is on the way. @@ -290,7 +289,7 @@ type ( // Exposing this as bugport for now to avoid breaking existing workflows which are actually non-deterministic but users depend on this. // Once we identify such cases and notify users, we can enable this by default. // - // Deprecated: This field is deprecated and will be removed in the future. + // Deprecated: All bugports are always deprecated and may be removed at any time EnableStrictNonDeterminismCheck bool } ) diff --git a/internal/workflow.go b/internal/workflow.go index 9df6fe499..b39a7cc59 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -481,7 +481,7 @@ type ( // Generally speaking they will *likely* remain in place for one minor version, and then they may be removed to // allow cleaning up the additional code complexity that they cause. // - // Deprecated: This field is deprecated and will be removed in the future. + // Deprecated: All bugports are always deprecated and may be removed at any time. Bugports Bugports } @@ -500,7 +500,7 @@ type ( // Generally speaking they will *likely* remain in place for one minor version, and then they may be removed to // allow cleaning up the additional code complexity that they cause. // - // DEPRECATED: This is deprecated and will be removed in the future. + // DEPRECATED: All bugports are always deprecated and may be removed at any time. Bugports struct { // StartChildWorkflowsOnCanceledContext allows emulating older, buggy behavior that existed prior to v0.18.4. // @@ -530,7 +530,7 @@ type ( // // Added in 0.18.4, this may be removed in or after v0.19.0, so please migrate off of it ASAP. // - // Deprecated: This field is deprecated and will be removed in the future. + // Deprecated: All bugports are always deprecated and may be removed at any time. StartChildWorkflowsOnCanceledContext bool } ) From 63e4ff58d1aa1ecc4bad16fc992719c7cfccc50e Mon Sep 17 00:00:00 2001 From: taylan isikdemir Date: Mon, 6 Nov 2023 18:01:54 -0800 Subject: [PATCH 4/4] reverse bool --- internal/internal_task_handlers.go | 6 +++--- internal/internal_worker_test.go | 6 +++--- internal/worker.go | 9 ++++----- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 0bcb50908..1c53a05d2 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -131,7 +131,7 @@ type ( contextPropagators []ContextPropagator tracer opentracing.Tracer workflowInterceptorFactories []WorkflowInterceptorFactory - enableStrictNonDeterminism bool + disableStrictNonDeterminism bool } activityProvider func(name string) activity @@ -403,13 +403,13 @@ func newWorkflowTaskHandler( contextPropagators: params.ContextPropagators, tracer: params.Tracer, workflowInterceptorFactories: params.WorkflowInterceptorChainFactories, - enableStrictNonDeterminism: params.WorkerBugPorts.EnableStrictNonDeterminismCheck, + disableStrictNonDeterminism: params.WorkerBugPorts.DisableStrictNonDeterminismCheck, } traceLog(func() { wth.logger.Debug("Workflow task handler is created.", zap.String(tagDomain, wth.domain), - zap.Bool("EnableStrictNonDeterminism", wth.enableStrictNonDeterminism)) + zap.Bool("disableStrictNonDeterminism", wth.disableStrictNonDeterminism)) }) return wth diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index 72328d868..764efd111 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -228,7 +228,7 @@ func (s *internalWorkerTestSuite) TestCreateWorker_WithAutoScaler() { } func (s *internalWorkerTestSuite) TestCreateWorker_WithStrictNonDeterminism() { - worker := createWorkerWithStrictNonDeterminismOption(s.T(), s.service) + worker := createWorkerWithStrictNonDeterminismDisabled(s.T(), s.service) err := worker.Start() require.NoError(s.T(), err) time.Sleep(time.Millisecond * 200) @@ -433,11 +433,11 @@ func createWorkerWithAutoscaler( return createWorkerWithThrottle(t, service, 0, WorkerOptions{FeatureFlags: FeatureFlags{PollerAutoScalerEnabled: true}}) } -func createWorkerWithStrictNonDeterminismOption( +func createWorkerWithStrictNonDeterminismDisabled( t *testing.T, service *workflowservicetest.MockClient, ) *aggregatedWorker { - return createWorkerWithThrottle(t, service, 0, WorkerOptions{WorkerBugPorts: WorkerBugPorts{EnableStrictNonDeterminismCheck: true}}) + return createWorkerWithThrottle(t, service, 0, WorkerOptions{WorkerBugPorts: WorkerBugPorts{DisableStrictNonDeterminismCheck: true}}) } func createWorkerWithHost( diff --git a/internal/worker.go b/internal/worker.go index be388ef01..abef02a5b 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -283,14 +283,13 @@ type ( // allow cleaning up the additional code complexity that they cause. // Deprecated: All bugports are always deprecated and may be removed at any time WorkerBugPorts struct { - // Optional: Enable strict non-determinism checks for workflow. + // Optional: Disable strict non-determinism checks for workflow. // There are some non-determinism cases which are missed by original implementation and a fix is on the way. - // The fix will be activated by this option which basicakky accuracy of the non-determinism checks. - // Exposing this as bugport for now to avoid breaking existing workflows which are actually non-deterministic but users depend on this. - // Once we identify such cases and notify users, we can enable this by default. + // The fix will be toggleable by this parameter. + // Default: false, which means strict non-determinism checks are enabled. // // Deprecated: All bugports are always deprecated and may be removed at any time - EnableStrictNonDeterminismCheck bool + DisableStrictNonDeterminismCheck bool } )