Skip to content

Commit

Permalink
Implement Workflow Mutex Sample
Browse files Browse the repository at this point in the history
  • Loading branch information
devbased committed Nov 1, 2023
1 parent bcb9d79 commit e9e1096
Show file tree
Hide file tree
Showing 15 changed files with 474 additions and 3 deletions.
6 changes: 3 additions & 3 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Temporalio" Version="0.1.0-beta1" />
<PackageReference Include="Temporalio.Extensions.Hosting" Version="0.1.0-beta1" />
<PackageReference Include="Temporalio.Extensions.OpenTelemetry" Version="0.1.0-beta1" />
<PackageReference Include="Temporalio" Version="0.1.0-beta2" />
<PackageReference Include="Temporalio.Extensions.Hosting" Version="0.1.0-beta2" />
<PackageReference Include="Temporalio.Extensions.OpenTelemetry" Version="0.1.0-beta2" />
<!--
Can also reference the SDK downloaded to a local directory:
<ProjectReference Include="$(MSBuildThisFileDirectory)..\temporal-sdk-dotnet\src\Temporalio\Temporalio.csproj" />
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Prerequisites:
* [ClientMtls](src/ClientMtls) - How to use client certificate authentication, e.g. for Temporal Cloud.
* [DependencyInjection](src/DependencyInjection) - How to inject dependencies in activities and use generic hosts for workers
* [Encryption](src/Encryption) - End-to-end encryption with Temporal payload codecs.
* [Mutex](src/Mutex) - How to implement a mutex as a workflow. Demonstrates how to avoid race conditions or parallel mutually exclusive operations on the same resource.
* [Polling](src/Polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
* [Schedules](src/Schedules) - How to schedule workflows to be run at specific times in the future.
* [WorkerVersioning](src/WorkerVersioning) - How to use the Worker Versioning feature to more easily deploy changes to Workflow & other code.
Expand Down
7 changes: 7 additions & 0 deletions TemporalioSamples.sln
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.Dependenc
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.WorkerVersioning", "src\WorkerVersioning\TemporalioSamples.WorkerVersioning.csproj", "{CA3FD1BC-C918-4B15-96F6-D6DDA125E63C}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.Mutex", "src\Mutex\TemporalioSamples.Mutex.csproj", "{3168FB2D-D821-433A-A761-309E0474DE48}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -128,6 +130,10 @@ Global
{CA3FD1BC-C918-4B15-96F6-D6DDA125E63C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CA3FD1BC-C918-4B15-96F6-D6DDA125E63C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CA3FD1BC-C918-4B15-96F6-D6DDA125E63C}.Release|Any CPU.Build.0 = Release|Any CPU
{3168FB2D-D821-433A-A761-309E0474DE48}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{3168FB2D-D821-433A-A761-309E0474DE48}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3168FB2D-D821-433A-A761-309E0474DE48}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3168FB2D-D821-433A-A761-309E0474DE48}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{7AECC7C6-9A21-4B8A-84D9-AFC4F5840CAF} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
Expand All @@ -150,5 +156,6 @@ Global
{11A5854B-EE6E-4752-9C46-F466503D853B} = {AE21E7F4-B114-4761-81B1-8FA63E9F6BB8}
{10E6F7C9-7F6C-4A8E-94A1-99C10F46BBA4} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
{CA3FD1BC-C918-4B15-96F6-D6DDA125E63C} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
{3168FB2D-D821-433A-A761-309E0474DE48} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
EndGlobalSection
EndGlobal
38 changes: 38 additions & 0 deletions src/Mutex/Activities.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
namespace TemporalioSamples.Mutex;

using Temporalio.Activities;

public record NotifyLockedInput(string ResourceId, string ReleaseSignalName);

public record UseApiThatCantBeCalledInParallelInput(TimeSpan SleepFor);

public record NotifyUnlockedInput(string ResourceId);

public static class Activities
{
[Activity]
public static void NotifyLocked(NotifyLockedInput input)
{
ActivityExecutionContext.Current.Logger.LogInformation(
"Lock for resource '{ResourceId}' acquired, release signal name '{ReleaseSignalName}'", input.ResourceId, input.ReleaseSignalName);
}

[Activity]
public static async Task UseApiThatCantBeCalledInParallelAsync(UseApiThatCantBeCalledInParallelInput input)
{
var logger = ActivityExecutionContext.Current.Logger;

logger.LogInformation("Sleeping for '{SleepFor}'...", input.SleepFor);

await Task.Delay(input.SleepFor);

logger.LogInformation("Done sleeping!");
}

[Activity]
public static void NotifyUnlocked(NotifyUnlockedInput input)
{
ActivityExecutionContext.Current.Logger.LogInformation(
"Lock for resource '{ResourceId}' released", input.ResourceId);
}
}
10 changes: 10 additions & 0 deletions src/Mutex/Impl/ILockHandle.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace TemporalioSamples.Mutex.Impl;

public interface ILockHandle : IAsyncDisposable
{
public string LockInitiatorId { get; }

public string ResourceId { get; }

public string ReleaseSignalName { get; }
}
8 changes: 8 additions & 0 deletions src/Mutex/Impl/ILockHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace TemporalioSamples.Mutex.Impl;

internal interface ILockHandler
{
public string? CurrentOwnerId { get; }

public Task HandleAsync(LockRequest lockRequest);
}
3 changes: 3 additions & 0 deletions src/Mutex/Impl/LockRequest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace TemporalioSamples.Mutex.Impl;

internal record LockRequest(string InitiatorId, string AcquireLockSignalName, TimeSpan? Timeout = null);
37 changes: 37 additions & 0 deletions src/Mutex/Impl/MutexActivities.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
namespace TemporalioSamples.Mutex.Impl;

using Temporalio.Activities;
using Temporalio.Client;
using Temporalio.Workflows;

internal record SignalWithStartMutexWorkflowInput(string MutexWorkflowId, string ResourceId, string AcquireLockSignalName, TimeSpan? LockTimeout = null);

internal class MutexActivities
{
private static readonly string RequestLockSignalName =
WorkflowSignalDefinition.FromMethod(
typeof(MutexWorkflow).GetMethod(nameof(MutexWorkflow.RequestLockAsync))
?? throw new InvalidOperationException($"Method {nameof(MutexWorkflow.RequestLockAsync)} not found on type {typeof(MutexWorkflow)}"))
.Name ?? throw new InvalidOperationException("Signal name is null.");

private readonly ITemporalClient client;

public MutexActivities(ITemporalClient client)
{
this.client = client;
}

[Activity]
public async Task SignalWithStartMutexWorkflowAsync(SignalWithStartMutexWorkflowInput input)
{
var activityInfo = ActivityExecutionContext.Current.Info;

await this.client.StartWorkflowAsync(
(MutexWorkflow mw) => mw.RunAsync(MutexWorkflowInput.Empty),
new WorkflowOptions(input.MutexWorkflowId, activityInfo.TaskQueue)
{
StartSignal = RequestLockSignalName,
StartSignalArgs = new object[] { new LockRequest(activityInfo.WorkflowId, input.AcquireLockSignalName, input.LockTimeout), },
});
}
}
60 changes: 60 additions & 0 deletions src/Mutex/Impl/MutexWorkflow.workflow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
namespace TemporalioSamples.Mutex.Impl;

using Temporalio.Workflows;

internal record MutexWorkflowInput(IReadOnlyCollection<LockRequest> InitialRequests)
{
public static readonly MutexWorkflowInput Empty = new(Array.Empty<LockRequest>());
}

[Workflow]
internal class MutexWorkflow
{
private readonly ILockHandler lockHandler = WorkflowMutex.CreateLockHandler();
private readonly Queue<LockRequest> requests = new();

[WorkflowRun]
public async Task RunAsync(MutexWorkflowInput input)
{
var logger = Workflow.Logger;

foreach (var request in input.InitialRequests)
{
requests.Enqueue(request);
}

while (!Workflow.ContinueAsNewSuggested)
{
if (requests.Count == 0)
{
logger.LogInformation("No lock requests, waiting for more...");

await Workflow.WaitConditionAsync(() => requests.Count > 0);
}

while (requests.TryDequeue(out var lockRequest))
{
await lockHandler.HandleAsync(lockRequest);
}
}

if (requests.Count > 0)
{
var newInput = new MutexWorkflowInput(requests);
throw Workflow.CreateContinueAsNewException((MutexWorkflow x) => x.RunAsync(newInput));
}
}

[WorkflowQuery]
public string? CurrentOwnerId => lockHandler.CurrentOwnerId;

[WorkflowSignal]
public Task RequestLockAsync(LockRequest request)
{
requests.Enqueue(request);

Workflow.Logger.LogInformation("Received lock request. (InitiatorId='{InitiatorId}')", request.InitiatorId);

return Task.CompletedTask;
}
}
18 changes: 18 additions & 0 deletions src/Mutex/Impl/TemporalWorkerOptionsExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
namespace TemporalioSamples.Mutex.Impl;

using Temporalio.Client;
using Temporalio.Worker;

public static class TemporalWorkerOptionsExtensions
{
public static TemporalWorkerOptions AddWorkflowMutex(this TemporalWorkerOptions options, ITemporalClient client)
{
var mutexActivities = new MutexActivities(client);

options
.AddAllActivities(mutexActivities)
.AddWorkflow<MutexWorkflow>();

return options;
}
}
133 changes: 133 additions & 0 deletions src/Mutex/Impl/WorkflowMutex.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
namespace TemporalioSamples.Mutex.Impl;

using Temporalio.Workflows;

internal record AcquireLockInput(string ReleaseSignalName);

/// <summary>
/// Represents a mutual exclusion mechanism for Workflows.
/// This part contains API for acquiring locks.
/// </summary>
public static class WorkflowMutex
{
private const string MutexWorkflowIdPrefix = "__wm-lock:";

public static async Task<ILockHandle> LockAsync(string resourceId, TimeSpan? lockTimeout = null)
{
if (!Workflow.InWorkflow)
{
throw new InvalidOperationException("Cannot acquire a lock outside of a workflow.");
}

var initiatorId = Workflow.Info.WorkflowId;
var lockStarted = Workflow.UtcNow;

string? releaseSignalName = null;
var acquireLockSignalName = Workflow.NewGuid().ToString();
var signalDefinition = WorkflowSignalDefinition.CreateWithoutAttribute(acquireLockSignalName, (AcquireLockInput input) =>
{
releaseSignalName = input.ReleaseSignalName;
return Task.CompletedTask;
});
Workflow.Signals[acquireLockSignalName] = signalDefinition;
try
{
var startMutexWorkflowInput = new SignalWithStartMutexWorkflowInput($"{MutexWorkflowIdPrefix}{resourceId}", resourceId, acquireLockSignalName, lockTimeout);
await Workflow.ExecuteActivityAsync<MutexActivities>(
act => act.SignalWithStartMutexWorkflowAsync(startMutexWorkflowInput),
new ActivityOptions { StartToCloseTimeout = TimeSpan.FromMinutes(1), });

await Workflow.WaitConditionAsync(() => releaseSignalName != null);

var elapsed = Workflow.UtcNow - lockStarted;
Workflow.Logger.LogInformation(
"Lock for resource '{ResourceId}' acquired in {AcquireTime}ms by '{LockInitiatorId}', release signal name '{ReleaseSignalName}'",
resourceId,
(int)elapsed.TotalMilliseconds,
initiatorId,
releaseSignalName);

return new LockHandle(initiatorId, startMutexWorkflowInput.MutexWorkflowId, resourceId, releaseSignalName!);
}
finally
{
Workflow.Signals.Remove(acquireLockSignalName);
}
}

internal static ILockHandler CreateLockHandler()
{
if (!Workflow.InWorkflow)
{
throw new InvalidOperationException("Cannot acquire a lock outside of a workflow.");
}

return new LockHandler();
}

internal sealed class LockHandle : ILockHandle
{
private readonly string mutexWorkflowId;

public LockHandle(string lockInitiatorId, string mutexWorkflowId, string resourceId, string releaseSignalId)
{
LockInitiatorId = lockInitiatorId;
this.mutexWorkflowId = mutexWorkflowId;
ResourceId = resourceId;
ReleaseSignalName = releaseSignalId;
}

/// <inheritdoc />
public string LockInitiatorId { get; }

/// <inheritdoc />
public string ResourceId { get; }

/// <inheritdoc />
public string ReleaseSignalName { get; }

/// <inheritdoc />
public async ValueTask DisposeAsync()
{
var mutexHandle = Workflow.GetExternalWorkflowHandle(mutexWorkflowId);
await mutexHandle.SignalAsync(ReleaseSignalName, Array.Empty<object?>());
}
}

internal sealed class LockHandler : ILockHandler
{
/// <inheritdoc />
public string? CurrentOwnerId { get; private set; }

/// <inheritdoc />
public async Task HandleAsync(LockRequest lockRequest)
{
var releaseSignalName = Workflow.NewGuid().ToString();

var initiator = Workflow.GetExternalWorkflowHandle(lockRequest.InitiatorId);
await initiator.SignalAsync(lockRequest.AcquireLockSignalName, new[] { new AcquireLockInput(releaseSignalName) });

var released = false;
Workflow.Signals[releaseSignalName] = WorkflowSignalDefinition.CreateWithoutAttribute(releaseSignalName, () =>
{
released = true;
return Task.CompletedTask;
});
CurrentOwnerId = lockRequest.InitiatorId;

if (!await Workflow.WaitConditionAsync(() => released, lockRequest.Timeout ?? Timeout.InfiniteTimeSpan))
{
Workflow.Logger.LogWarning(
"Lock for resource '{ResourceId}' has been timed out after '{Timeout}'. (LockInitiatorId='{LockInitiatorId}')",
Workflow.Info.WorkflowId[MutexWorkflowIdPrefix.Length..],
lockRequest.Timeout,
lockRequest.InitiatorId);
}

CurrentOwnerId = null;
Workflow.Signals.Remove(releaseSignalName);
}
}
}
Loading

0 comments on commit e9e1096

Please sign in to comment.