Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patching sample #80

Merged
merged 7 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Prerequisites:
* [DependencyInjection](src/DependencyInjection) - How to inject dependencies in activities and use generic hosts for workers
* [Encryption](src/Encryption) - End-to-end encryption with Temporal payload codecs.
* [Mutex](src/Mutex) - How to implement a mutex as a workflow. Demonstrates how to avoid race conditions or parallel mutually exclusive operations on the same resource.
* [Patching](src/Patching) - Alter workflows safely with Patch and DeprecatePatch.
* [Polling](src/Polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
* [SafeMessageHandlers](src/SafeMessageHandlers) - Use `Semaphore` to ensure operations are atomically processed in a workflow.
* [Saga](src/Saga) - Demonstrates how to implement a saga pattern.
Expand Down
7 changes: 7 additions & 0 deletions TemporalioSamples.sln
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.ContextPr
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.SafeMessageHandlers", "src\SafeMessageHandlers\TemporalioSamples.SafeMessageHandlers.csproj", "{FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.Patching", "src\Patching\TemporalioSamples.Patching.csproj", "{B45A80C8-19EC-4404-B8A1-C91281659784}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -155,6 +157,10 @@ Global
{FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}.Release|Any CPU.Build.0 = Release|Any CPU
{B45A80C8-19EC-4404-B8A1-C91281659784}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B45A80C8-19EC-4404-B8A1-C91281659784}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B45A80C8-19EC-4404-B8A1-C91281659784}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B45A80C8-19EC-4404-B8A1-C91281659784}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -185,5 +191,6 @@ Global
{B3DB7B8C-7BD3-4A53-A809-AB6279B1A630} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
{7B797D20-485F-441D-8E71-AF7E315FA9CF} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
{FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
{B45A80C8-19EC-4404-B8A1-C91281659784} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
EndGlobalSection
EndGlobal
12 changes: 12 additions & 0 deletions src/Patching/Activities.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using Temporalio.Activities;

namespace TemporalioSamples.Patching;

public static class Activities
{
[Activity]
public static string PrePatchActivity() => "pre-patch";

[Activity]
public static string PostPatchActivity() => "post-patch";
}
16 changes: 16 additions & 0 deletions src/Patching/MyWorkflow1Initial.workflow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using Temporalio.Workflows;

namespace TemporalioSamples.Patching;

[Workflow("MyWorkflow")]
public class MyWorkflow1Initial
{
[WorkflowRun]
public async Task RunAsync()
{
Result = await Workflow.ExecuteActivityAsync(() => Activities.PrePatchActivity(), new() { ScheduleToCloseTimeout = TimeSpan.FromMinutes(5) });
}

[WorkflowQuery]
public string? Result { get; private set; }
}
23 changes: 23 additions & 0 deletions src/Patching/MyWorkflow2Patched.workflow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Temporalio.Workflows;

namespace TemporalioSamples.Patching;

[Workflow("MyWorkflow")]
public class MyWorkflow2Patched
{
[WorkflowRun]
public async Task RunAsync()
{
if (Workflow.Patched("my-patch"))
{
Result = await Workflow.ExecuteActivityAsync(() => Activities.PostPatchActivity(), new() { ScheduleToCloseTimeout = TimeSpan.FromMinutes(5) });
}
else
{
Result = await Workflow.ExecuteActivityAsync(() => Activities.PrePatchActivity(), new() { ScheduleToCloseTimeout = TimeSpan.FromMinutes(5) });
}
}

[WorkflowQuery]
public string? Result { get; private set; }
}
17 changes: 17 additions & 0 deletions src/Patching/MyWorkflow3PatchDeprecated.workflow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using Temporalio.Workflows;

namespace TemporalioSamples.Patching;

[Workflow("MyWorkflow")]
public class MyWorkflow3PatchDeprecated
{
[WorkflowRun]
public async Task RunAsync()
{
Workflow.DeprecatePatch("my-patch");
Result = await Workflow.ExecuteActivityAsync(() => Activities.PostPatchActivity(), new() { ScheduleToCloseTimeout = TimeSpan.FromMinutes(5) });
}

[WorkflowQuery]
public string? Result { get; private set; }
}
16 changes: 16 additions & 0 deletions src/Patching/MyWorkflow4PatchComplete.workflow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using Temporalio.Workflows;

namespace TemporalioSamples.Patching;

[Workflow("MyWorkflow")]
public class MyWorkflow4PatchComplete
{
[WorkflowRun]
public async Task RunAsync()
{
Result = await Workflow.ExecuteActivityAsync(() => Activities.PostPatchActivity(), new() { ScheduleToCloseTimeout = TimeSpan.FromMinutes(5) });
}

[WorkflowQuery]
public string? Result { get; private set; }
}
102 changes: 102 additions & 0 deletions src/Patching/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
using Microsoft.Extensions.Logging;
using Temporalio.Client;
using Temporalio.Worker;
using TemporalioSamples.Patching;

// Create a client to localhost on default namespace
var client = await TemporalClient.ConnectAsync(new("localhost:7233")
{
LoggerFactory = LoggerFactory.Create(builder =>
builder.
AddSimpleConsole(options => options.TimestampFormat = "[HH:mm:ss] ").
SetMinimumLevel(LogLevel.Information)),
});

async Task RunWorkerAsync()
{
// Cancellation token cancelled on ctrl+c
using var tokenSource = new CancellationTokenSource();
Console.CancelKeyPress += (_, eventArgs) =>
{
tokenSource.Cancel();
eventArgs.Cancel = true;
};

var workerOptions = new TemporalWorkerOptions(taskQueue: "patching-task-queue")
.AddActivity(Activities.PrePatchActivity)
.AddActivity(Activities.PostPatchActivity);

switch (args.ElementAtOrDefault(2))
{
case "initial":
workerOptions.AddWorkflow<MyWorkflow1Initial>();
break;
case "patched":
workerOptions.AddWorkflow<MyWorkflow2Patched>();
break;
case "patch-deprecated":
workerOptions.AddWorkflow<MyWorkflow3PatchDeprecated>();
break;
case "patch-complete":
workerOptions.AddWorkflow<MyWorkflow4PatchComplete>();
break;
default:
throw new ArgumentException("Which workflow. Can be 'initial', 'patched', 'patch-deprecated', or 'patch-complete'");
}

// Run worker until cancelled
Console.WriteLine("Running worker");

using var worker = new TemporalWorker(client, workerOptions);

try
{
await worker.ExecuteAsync(tokenSource.Token);
}
catch (OperationCanceledException)
{
Console.WriteLine("Worker cancelled");
}
}

async Task RunStarterAsync()
{
var workflowId = args.ElementAtOrDefault(2);
if (workflowId is null)
{
throw new ArgumentException("Workflow id is required");
}

switch (args.ElementAtOrDefault(1))
{
case "--start-workflow":
{
// Since it's just used for typing purposes, it doesn't matter which one we start
var handle = await client.StartWorkflowAsync((MyWorkflow1Initial wf) => wf.RunAsync(), new(id: workflowId, taskQueue: "patching-task-queue"));
Console.WriteLine($"Started workflow with ID {handle.Id} and run ID {handle.ResultRunId}");
break;
}
case "--query-workflow":
{
// Since it's just used for typing purposes, it doesn't matter which one we query
var handle = client.GetWorkflowHandle(workflowId);
var result = await handle.QueryAsync((MyWorkflow1Initial wf) => wf.Result);
Console.WriteLine($"Query result for ID {handle.Id}: {result}");
break;
}
default:
throw new ArgumentException("Either --start-workflow or --query-workflow is required");
}
}

switch (args.ElementAtOrDefault(0))
{
case "worker":
await RunWorkerAsync();
break;
case "starter":
await RunStarterAsync();
break;
default:
throw new ArgumentException("Must pass 'worker' or 'starter' as the single argument");
}
103 changes: 103 additions & 0 deletions src/Patching/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Patching Sample

This sample shows how to safely alter a workflow using `Workflow.Patched` and `Workflow.DeprecatePatch` in stages.

To run, first see [README.md](../../README.md) for prerequisites. Then follow the patching stages below.

### Stage 1 - Initial code

This stage is for existing running workflows. To simulate our initial workflow, run the worker in a separate terminal:

dotnet run worker --workflow initial

Now we can start this workflow:

dotnet run starter --start-workflow initial-workflow-id

This will output "Started workflow with ID initial-workflow-id and ...". Now query this workflow:

dotnet run starter --query-workflow initial-workflow-id

This will output "Query result for ID initial-workflow-id: pre-patch".

### Stage 2 - Patch the workflow

This stage is for needing to run old and new workflows at the same time. To simulate our patched workflow, stop the
worker from before and start it again with the patched workflow:

dotnet run worker --workflow patched

Now let's start another workflow with this patched code:

dotnet run starter --start-workflow patched-workflow-id

This will output "Started workflow with ID patched-workflow-id and ...". Now query the old workflow that's still
running:

dotnet run starter --query-workflow initial-workflow-id

This will output "Query result for ID initial-workflow-id: pre-patch" since it is pre-patch. But if we execute a query
against the new code:

dotnet run starter --query-workflow patched-workflow-id

We get "Query result for ID patched-workflow-id: post-patch". This is how old workflow code can take old paths and new
workflow code can take new paths.

### Stage 3 - Deprecation
jakejscott marked this conversation as resolved.
Show resolved Hide resolved

Once we know that all workflows that started with the initial code from "Stage 1" are no longer running, we don't need
the patch so we can deprecate it. To use the patch deprecated workflow, stop the workflow from before and start it again
with:

dotnet run worker --workflow patch-deprecated

If we query a workflow in "Stage 1" we should get an error. This will output "Unhandled exception.
Temporalio.Exceptions.WorkflowQueryFailedException: [TMPRL1100] Nondeterminism error: Activity type of scheduled event
'PrePatchActivity' does not match activity type of activity command 'PostPatchActivity'"

dotnet run starter --query-workflow initial-workflow-id

All workflows in "Stage 2" and any new workflows will work. Now let's start another workflow with this patch deprecated
code:

dotnet run starter --start-workflow patch-deprecated-workflow-id

This will output "Started workflow with ID patch-deprecated-workflow-id and ...". Now query the patched workflow that's
still running:

dotnet run starter --query-workflow patched-workflow-id

This will output "Query result for ID patched-workflow-id: post-patch". And if we execute a query against the latest
workflow:

dotnet run starter --query-workflow patch-deprecated-workflow-id

As expected, this will output "Query result for ID patch-deprecated-workflow-id: post-patch".

### Stage 4 - Patch complete

Once we know we don't even have any workflows running on "Stage 2" or before (i.e. the workflow with the patch with
both code paths), we can just remove the patch deprecation altogether. To use the patch complete workflow, stop the
workflow from before and start it again with:

dotnet run worker --workflow patch-complete

All workflows in "Stage 3" and any new workflows will work. Now let's start another workflow with this patch complete
code:

dotnet run starter --start-workflow patch-complete-workflow-id

This will output "Started workflow with ID patch-complete-workflow-id and ...". Now query the patch deprecated workflow
that's still running:

dotnet run starter --query-workflow patch-deprecated-workflow-id

This will output "Query result for ID patch-deprecated-workflow-id: post-patch". And if we execute a query against the
latest workflow:

dotnet run starter --query-workflow patch-complete-workflow-id

As expected, this will output "Query result for ID patch-complete-workflow-id: post-patch".

Following these stages, we have successfully altered our workflow code.
7 changes: 7 additions & 0 deletions src/Patching/TemporalioSamples.Patching.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
</PropertyGroup>

</Project>