Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce DisableStrictNonDeterminismCheck worker option #1288

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ type (
contextPropagators []ContextPropagator
tracer opentracing.Tracer
workflowInterceptorFactories []WorkflowInterceptorFactory
disableStrictNonDeterminism bool
}

activityProvider func(name string) activity
Expand Down Expand Up @@ -388,7 +389,7 @@ func newWorkflowTaskHandler(
registry *registry,
) WorkflowTaskHandler {
ensureRequiredParams(&params)
return &workflowTaskHandlerImpl{
wth := &workflowTaskHandlerImpl{
domain: domain,
logger: params.Logger,
ppMgr: ppMgr,
Expand All @@ -402,7 +403,16 @@ func newWorkflowTaskHandler(
contextPropagators: params.ContextPropagators,
tracer: params.Tracer,
workflowInterceptorFactories: params.WorkflowInterceptorChainFactories,
disableStrictNonDeterminism: params.WorkerBugPorts.DisableStrictNonDeterminismCheck,
}

traceLog(func() {
taylanisikdemir marked this conversation as resolved.
Show resolved Hide resolved
wth.logger.Debug("Workflow task handler is created.",
zap.String(tagDomain, wth.domain),
zap.Bool("disableStrictNonDeterminism", wth.disableStrictNonDeterminism))
})

return wth
}

// TODO: need a better eviction policy based on memory usage
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,7 @@ func newAggregatedWorker(
var workflowWorker *workflowWorker
if !wOptions.DisableWorkflowWorker {
testTags := getTestTags(wOptions.BackgroundActivityContext)
if testTags != nil && len(testTags) > 0 {
if len(testTags) > 0 {
workflowWorker = newWorkflowWorkerWithPressurePoints(
service,
domain,
Expand Down
27 changes: 21 additions & 6 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func testActivityMultipleArgsWithStruct(ctx context.Context, i int, s testActivi
}

func (s *internalWorkerTestSuite) TestCreateWorker() {
worker := createWorkerWithThrottle(s.T(), s.service, float64(500.0), WorkerOptions{})
worker := createWorkerWithThrottle(s.T(), s.service, 500, WorkerOptions{})
err := worker.Start()
require.NoError(s.T(), err)
time.Sleep(time.Millisecond * 200)
Expand Down Expand Up @@ -227,6 +227,14 @@ func (s *internalWorkerTestSuite) TestCreateWorker_WithAutoScaler() {
worker.Stop()
}

func (s *internalWorkerTestSuite) TestCreateWorker_WithStrictNonDeterminism() {
worker := createWorkerWithStrictNonDeterminismDisabled(s.T(), s.service)
err := worker.Start()
require.NoError(s.T(), err)
time.Sleep(time.Millisecond * 200)
taylanisikdemir marked this conversation as resolved.
Show resolved Hide resolved
worker.Stop()
}

func (s *internalWorkerTestSuite) TestCreateWorker_WithHost() {
worker := createWorkerWithHost(s.T(), s.service)
err := worker.Start()
Expand Down Expand Up @@ -348,15 +356,15 @@ func createWorker(
t *testing.T,
service *workflowservicetest.MockClient,
) *aggregatedWorker {
return createWorkerWithThrottle(t, service, float64(0.0), WorkerOptions{})
return createWorkerWithThrottle(t, service, 0, WorkerOptions{})
}

func createShadowWorker(
t *testing.T,
service *workflowservicetest.MockClient,
shadowOptions *ShadowOptions,
) *aggregatedWorker {
return createWorkerWithThrottle(t, service, float64(0.0), WorkerOptions{
return createWorkerWithThrottle(t, service, 0, WorkerOptions{
EnableShadowWorker: true,
ShadowOptions: *shadowOptions,
})
Expand Down Expand Up @@ -415,21 +423,28 @@ func createWorkerWithDataConverter(
t *testing.T,
service *workflowservicetest.MockClient,
) *aggregatedWorker {
return createWorkerWithThrottle(t, service, float64(0.0), WorkerOptions{DataConverter: newTestDataConverter()})
return createWorkerWithThrottle(t, service, 0, WorkerOptions{DataConverter: newTestDataConverter()})
}

func createWorkerWithAutoscaler(
t *testing.T,
service *workflowservicetest.MockClient,
) *aggregatedWorker {
return createWorkerWithThrottle(t, service, float64(0), WorkerOptions{FeatureFlags: FeatureFlags{PollerAutoScalerEnabled: true}})
return createWorkerWithThrottle(t, service, 0, WorkerOptions{FeatureFlags: FeatureFlags{PollerAutoScalerEnabled: true}})
}

func createWorkerWithStrictNonDeterminismDisabled(
t *testing.T,
service *workflowservicetest.MockClient,
) *aggregatedWorker {
return createWorkerWithThrottle(t, service, 0, WorkerOptions{WorkerBugPorts: WorkerBugPorts{DisableStrictNonDeterminismCheck: true}})
}

func createWorkerWithHost(
t *testing.T,
service *workflowservicetest.MockClient,
) *aggregatedWorker {
return createWorkerWithThrottle(t, service, float64(0), WorkerOptions{Host: "test_host"})
return createWorkerWithThrottle(t, service, 0, WorkerOptions{Host: "test_host"})
}

func (s *internalWorkerTestSuite) testCompleteActivityHelper(opt *ClientOptions) {
Expand Down
24 changes: 24 additions & 0 deletions internal/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,30 @@ type (
// Optional: Host is just string on the machine running the client
// default: empty string
Host string

// Optional: See WorkerBugPorts for more details
//
// Deprecated: All bugports are always deprecated and may be removed at any time.
WorkerBugPorts WorkerBugPorts
}

// WorkerBugPorts allows opt-in enabling of older, possibly buggy behavior, primarily intended to allow temporarily
// emulating old behavior until a fix is deployed.
// By default, bugs (especially rarely-occurring ones) are fixed and all users are opted into the new behavior.
// Back-ported buggy behavior *may* be available via these flags.
//
// Bugports are always deprecated and may be removed in future versions.
// Generally speaking they will *likely* remain in place for one minor version, and then they may be removed to
// allow cleaning up the additional code complexity that they cause.
taylanisikdemir marked this conversation as resolved.
Show resolved Hide resolved
// Deprecated: All bugports are always deprecated and may be removed at any time
WorkerBugPorts struct {
// Optional: Disable strict non-determinism checks for workflow.
// There are some non-determinism cases which are missed by original implementation and a fix is on the way.
// The fix will be toggleable by this parameter.
// Default: false, which means strict non-determinism checks are enabled.
//
// Deprecated: All bugports are always deprecated and may be removed at any time
DisableStrictNonDeterminismCheck bool
}
)

Expand Down
6 changes: 3 additions & 3 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ type (
// Generally speaking they will *likely* remain in place for one minor version, and then they may be removed to
// allow cleaning up the additional code complexity that they cause.
//
// deprecated
// Deprecated: All bugports are always deprecated and may be removed at any time.
Bugports Bugports
}

Expand All @@ -500,7 +500,7 @@ type (
// Generally speaking they will *likely* remain in place for one minor version, and then they may be removed to
// allow cleaning up the additional code complexity that they cause.
//
// deprecated
// DEPRECATED: All bugports are always deprecated and may be removed at any time.
Bugports struct {
// StartChildWorkflowsOnCanceledContext allows emulating older, buggy behavior that existed prior to v0.18.4.
//
Expand Down Expand Up @@ -530,7 +530,7 @@ type (
//
// Added in 0.18.4, this may be removed in or after v0.19.0, so please migrate off of it ASAP.
//
// deprecated
// Deprecated: All bugports are always deprecated and may be removed at any time.
StartChildWorkflowsOnCanceledContext bool
}
)
Expand Down