Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nexus: Fix link not being attached to a workflow started via a Handler function #1659

Merged
merged 2 commits into from
Oct 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 33 additions & 20 deletions temporalnexus/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (o *workflowRunOperation[I, O]) Start(
// Prevent the test env client from panicking when we try to use it from a workflow run operation.
ctx = context.WithValue(ctx, internal.IsWorkflowRunOpContextKey, true)

nctx, ok := internal.NexusOperationContextFromGoContext(ctx)
_, ok := internal.NexusOperationContextFromGoContext(ctx)
if !ok {
return nil, nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error")
}
Expand All @@ -221,7 +221,10 @@ func (o *workflowRunOperation[I, O]) Start(
if err != nil {
return nil, err
}
return &nexus.HandlerStartOperationResultAsync{OperationID: handle.ID()}, nil
return &nexus.HandlerStartOperationResultAsync{
OperationID: handle.ID(),
Links: []nexus.Link{handle.link()},
}, nil
}

wfOpts, err := o.options.GetOptions(ctx, input, options)
Expand All @@ -234,22 +237,9 @@ func (o *workflowRunOperation[I, O]) Start(
return nil, err
}

// Create the link information about the new workflow and return to the caller.
link := &common.Link_WorkflowEvent{
Namespace: nctx.Namespace,
WorkflowId: handle.ID(),
RunId: handle.RunID(),
Reference: &common.Link_WorkflowEvent_EventRef{
EventRef: &common.Link_WorkflowEvent_EventReference{
EventType: enums.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
},
},
}
nexusLink := ConvertLinkWorkflowEventToNexusLink(link)

return &nexus.HandlerStartOperationResultAsync{
OperationID: handle.ID(),
Links: []nexus.Link{nexusLink},
Links: []nexus.Link{handle.link()},
}, nil
}

Expand All @@ -262,11 +252,17 @@ type WorkflowHandle[T any] interface {
ID() string
// ID is the workflow's run ID.
RunID() string

/* Methods below intentionally not exposed, interface is not meant to be implementable outside of this package */

// Link to the WorkflowExecutionStarted event of the workflow represented by this handle.
link() nexus.Link
}

type workflowHandle[T any] struct {
id string
runID string
namespace string
id string
runID string
}

func (h workflowHandle[T]) ID() string {
Expand All @@ -277,6 +273,22 @@ func (h workflowHandle[T]) RunID() string {
return h.runID
}

func (h workflowHandle[T]) link() nexus.Link {
// Create the link information about the new workflow and return to the caller.
link := &common.Link_WorkflowEvent{
Namespace: h.namespace,
WorkflowId: h.ID(),
RunId: h.RunID(),
Reference: &common.Link_WorkflowEvent_EventRef{
EventRef: &common.Link_WorkflowEvent_EventReference{
EventType: enums.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
},
},
}
return ConvertLinkWorkflowEventToNexusLink(link)

}

// ExecuteWorkflow starts a workflow run for a [WorkflowRunOperationOptions] Handler, linking the execution chain to a
// Nexus operation (subsequent runs started from continue-as-new and retries).
// Automatically propagates the callback and request ID from the nexus options to the workflow.
Expand Down Expand Up @@ -354,7 +366,8 @@ func ExecuteUntypedWorkflow[R any](
return nil, err
}
return workflowHandle[R]{
id: run.GetID(),
runID: run.GetRunID(),
namespace: nctx.Namespace,
id: run.GetID(),
runID: run.GetRunID(),
}, nil
}
Loading