diff --git a/README.md b/README.md index 4f1c0cd..5411f17 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ Prerequisites: * [ActivitySimple](src/ActivitySimple) - Simple workflow that runs simple activities. * [ActivityWorker](src/ActivityWorker) - Use .NET activities from a workflow in another language. * [AspNet](src/AspNet) - Demonstration of a generic host worker and an ASP.NET workflow starter. +* [Bedrock](src/Bedrock) - Orchestrate a chatbot with Amazon Bedrock. * [ClientMtls](src/ClientMtls) - How to use client certificate authentication, e.g. for Temporal Cloud. * [ContextPropagation](src/ContextPropagation) - Context propagation via interceptors. * [CounterInterceptor](src/CounterInterceptor/) - Simple Workflow and Client Interceptors example. diff --git a/TemporalioSamples.sln b/TemporalioSamples.sln index 6f01454..0f5bfbb 100644 --- a/TemporalioSamples.sln +++ b/TemporalioSamples.sln @@ -61,6 +61,16 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.CounterIn EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.Patching", "src\Patching\TemporalioSamples.Patching.csproj", "{751E0AF8-62EE-4220-A571-D1C53DD0D45A}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.SignalsQueries", "src\SignalsQueries\TemporalioSamples.SignalsQueries.csproj", "{4EA37A92-E4D5-4348-AF2F-0CAE37A1E079}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Bedrock", "Bedrock", "{5339989C-3791-4D75-A9F1-42620C443D4A}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.Bedrock.Basic", "src\Bedrock\Basic\TemporalioSamples.Bedrock.Basic.csproj", "{CC3487CB-F795-4CA9-A4F6-28C145AE383B}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.Bedrock.SignalsAndQueries", "src\Bedrock\SignalsAndQueries\TemporalioSamples.Bedrock.SignalsAndQueries.csproj", "{4E69DE72-E972-4155-B4D1-ABFE0B054ECF}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.Bedrock.Entity", "src\Bedrock\Entity\TemporalioSamples.Bedrock.Entity.csproj", "{AE18875F-B7D2-4A3C-8784-15B76277D508}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -167,6 +177,22 @@ Global {751E0AF8-62EE-4220-A571-D1C53DD0D45A}.Debug|Any CPU.Build.0 = Debug|Any CPU {751E0AF8-62EE-4220-A571-D1C53DD0D45A}.Release|Any CPU.ActiveCfg = Release|Any CPU {751E0AF8-62EE-4220-A571-D1C53DD0D45A}.Release|Any CPU.Build.0 = Release|Any CPU + {4EA37A92-E4D5-4348-AF2F-0CAE37A1E079}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4EA37A92-E4D5-4348-AF2F-0CAE37A1E079}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4EA37A92-E4D5-4348-AF2F-0CAE37A1E079}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4EA37A92-E4D5-4348-AF2F-0CAE37A1E079}.Release|Any CPU.Build.0 = Release|Any CPU + {CC3487CB-F795-4CA9-A4F6-28C145AE383B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {CC3487CB-F795-4CA9-A4F6-28C145AE383B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {CC3487CB-F795-4CA9-A4F6-28C145AE383B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {CC3487CB-F795-4CA9-A4F6-28C145AE383B}.Release|Any CPU.Build.0 = Release|Any CPU + {4E69DE72-E972-4155-B4D1-ABFE0B054ECF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4E69DE72-E972-4155-B4D1-ABFE0B054ECF}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4E69DE72-E972-4155-B4D1-ABFE0B054ECF}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4E69DE72-E972-4155-B4D1-ABFE0B054ECF}.Release|Any CPU.Build.0 = Release|Any CPU + {AE18875F-B7D2-4A3C-8784-15B76277D508}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {AE18875F-B7D2-4A3C-8784-15B76277D508}.Debug|Any CPU.Build.0 = Debug|Any CPU + {AE18875F-B7D2-4A3C-8784-15B76277D508}.Release|Any CPU.ActiveCfg = Release|Any CPU + {AE18875F-B7D2-4A3C-8784-15B76277D508}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -199,5 +225,10 @@ Global {FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} {F9C44936-8BF9-4919-BB66-8F1888E22AEB} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} {751E0AF8-62EE-4220-A571-D1C53DD0D45A} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} + {4EA37A92-E4D5-4348-AF2F-0CAE37A1E079} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} + {5339989C-3791-4D75-A9F1-42620C443D4A} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} + {CC3487CB-F795-4CA9-A4F6-28C145AE383B} = {5339989C-3791-4D75-A9F1-42620C443D4A} + {4E69DE72-E972-4155-B4D1-ABFE0B054ECF} = {5339989C-3791-4D75-A9F1-42620C443D4A} + {AE18875F-B7D2-4A3C-8784-15B76277D508} = {5339989C-3791-4D75-A9F1-42620C443D4A} EndGlobalSection EndGlobal diff --git a/src/Bedrock/Basic/BedrockActivities.cs b/src/Bedrock/Basic/BedrockActivities.cs new file mode 100644 index 0000000..c96fd01 --- /dev/null +++ b/src/Bedrock/Basic/BedrockActivities.cs @@ -0,0 +1,39 @@ +using System.Text.Json; +using System.Text.Json.Nodes; +using Amazon.BedrockRuntime; +using Amazon.BedrockRuntime.Model; +using Temporalio.Activities; + +namespace TemporalioSamples.Bedrock.Basic; + +public class BedrockActivities(IAmazonBedrockRuntime bedrock) +{ + public record PromptArgs(string Prompt); + + public record PromptResult(string Response); + + [Activity] + public async Task PromptBedrockAsync(PromptArgs args) + { + var body = JsonSerializer.Serialize(new + { + prompt = args.Prompt, + max_gen_len = 512, + temperature = 0.1, + top_p = 0.2, + }); + + var request = new InvokeModelRequest + { + ModelId = "meta.llama3-1-70b-instruct-v1:0", + Accept = "application/json", + ContentType = "application/json", + Body = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(body)), + }; + + var response = await bedrock.InvokeModelAsync(request); + var modelResponse = await JsonNode.ParseAsync(response.Body); + var responseText = modelResponse?["generation"]?.ToString() ?? string.Empty; + return new(responseText); + } +} \ No newline at end of file diff --git a/src/Bedrock/Basic/BedrockWorkflow.workflow.cs b/src/Bedrock/Basic/BedrockWorkflow.workflow.cs new file mode 100644 index 0000000..aed763f --- /dev/null +++ b/src/Bedrock/Basic/BedrockWorkflow.workflow.cs @@ -0,0 +1,29 @@ +using Microsoft.Extensions.Logging; +using Temporalio.Workflows; + +namespace TemporalioSamples.Bedrock.Basic; + +[Workflow] +public class BedrockWorkflow +{ + public record WorkflowArgs(string Prompt); + + public record WorkflowResult(string Response); + + [WorkflowRun] + public async Task RunAsync(WorkflowArgs args) + { + Workflow.Logger.LogInformation("Prompt: {Prompt}", args.Prompt); + + var promptResult = await Workflow.ExecuteActivityAsync( + (BedrockActivities activities) => activities.PromptBedrockAsync(new(args.Prompt)), + new() + { + StartToCloseTimeout = TimeSpan.FromSeconds(20), + }); + + Workflow.Logger.LogInformation("Response:\n{Response}", promptResult.Response); + + return new(promptResult.Response); + } +} \ No newline at end of file diff --git a/src/Bedrock/Basic/Program.cs b/src/Bedrock/Basic/Program.cs new file mode 100644 index 0000000..958ff21 --- /dev/null +++ b/src/Bedrock/Basic/Program.cs @@ -0,0 +1,69 @@ +using Amazon.BedrockRuntime; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Temporalio.Client; +using Temporalio.Extensions.Hosting; +using TemporalioSamples.Bedrock.Basic; + +async Task RunWorkerAsync() +{ + var builder = Host.CreateApplicationBuilder(args); + + builder.Logging. + SetMinimumLevel(LogLevel.Information). + AddSimpleConsole(options => options.SingleLine = true); + + builder.Services.AddSingleton(_ => new AmazonBedrockRuntimeClient()); + + builder.Services. + AddHostedTemporalWorker(clientTargetHost: "localhost:7233", clientNamespace: "default", taskQueue: "basic-bedrock-task-queue"). + AddSingletonActivities(). + AddWorkflow(); + + var app = builder.Build(); + await app.RunAsync(); +} + +async Task SendMessageAsync() +{ + var prompt = args.ElementAtOrDefault(1); + if (prompt is null) + { + Console.WriteLine("Usage: dotnet run send-message ''"); + Console.WriteLine("Example: dotnet run send-message 'What animals are marsupials?'"); + return; + } + + var client = await CreateClientAsync(); + var workflowId = "basic-bedrock-workflow"; + + // Start the workflow + var result = await client.ExecuteWorkflowAsync( + (BedrockWorkflow workflow) => + workflow.RunAsync(new(prompt)), + new WorkflowOptions(workflowId, "basic-bedrock-task-queue")); + + Console.WriteLine($"Result: {result.Response}"); +} + +async Task CreateClientAsync() => + await TemporalClient.ConnectAsync(new("localhost:7233") + { + LoggerFactory = LoggerFactory.Create(builder => + builder. + AddSimpleConsole(options => options.TimestampFormat = "[HH:mm:ss] "). + SetMinimumLevel(LogLevel.Information)), + }); + +switch (args.ElementAtOrDefault(0)) +{ + case "worker": + await RunWorkerAsync(); + break; + case "send-message": + await SendMessageAsync(); + break; + default: + throw new ArgumentException("Must pass 'worker' or 'send-message' as the single argument"); +} \ No newline at end of file diff --git a/src/Bedrock/Basic/README.md b/src/Bedrock/Basic/README.md new file mode 100644 index 0000000..b86d9f2 --- /dev/null +++ b/src/Bedrock/Basic/README.md @@ -0,0 +1,10 @@ +# Basic Amazon Bedrock workflow + +A basic Bedrock workflow. Starts a workflow with a prompt, generates a response and ends the workflow. + +To run, first see `Bedrock` [README.md](../README.md) for prerequisites specific to this sample. Once set up, run the following from this directory: + +1. Run the worker: `dotnet run worker` +2. In another terminal run the client with a prompt: + + e.g. `dotnet run send-message 'What animals are marsupials?'` \ No newline at end of file diff --git a/src/Bedrock/Basic/TemporalioSamples.Bedrock.Basic.csproj b/src/Bedrock/Basic/TemporalioSamples.Bedrock.Basic.csproj new file mode 100644 index 0000000..0a9d24f --- /dev/null +++ b/src/Bedrock/Basic/TemporalioSamples.Bedrock.Basic.csproj @@ -0,0 +1,11 @@ + + + + Exe + + + + + + + \ No newline at end of file diff --git a/src/Bedrock/Entity/BedrockActivities.cs b/src/Bedrock/Entity/BedrockActivities.cs new file mode 100644 index 0000000..d29f6cd --- /dev/null +++ b/src/Bedrock/Entity/BedrockActivities.cs @@ -0,0 +1,39 @@ +using System.Text.Json; +using System.Text.Json.Nodes; +using Amazon.BedrockRuntime; +using Amazon.BedrockRuntime.Model; +using Temporalio.Activities; + +namespace TemporalioSamples.Bedrock.Entity; + +public class BedrockActivities(IAmazonBedrockRuntime bedrock) +{ + public record PromptArgs(string Prompt); + + public record PromptResult(string Response); + + [Activity] + public async Task PromptBedrockAsync(PromptArgs args) + { + var body = JsonSerializer.Serialize(new + { + prompt = args.Prompt, + max_gen_len = 512, + temperature = 0.1, + top_p = 0.2, + }); + + var request = new InvokeModelRequest + { + ModelId = "meta.llama3-1-70b-instruct-v1:0", + Accept = "application/json", + ContentType = "application/json", + Body = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(body)), + }; + + var response = await bedrock.InvokeModelAsync(request); + var modelResponse = await JsonNode.ParseAsync(response.Body); + var responseText = modelResponse?["generation"]?.ToString() ?? string.Empty; + return new(responseText); + } +} \ No newline at end of file diff --git a/src/Bedrock/Entity/BedrockWorkflow.workflow.cs b/src/Bedrock/Entity/BedrockWorkflow.workflow.cs new file mode 100644 index 0000000..bfacb37 --- /dev/null +++ b/src/Bedrock/Entity/BedrockWorkflow.workflow.cs @@ -0,0 +1,168 @@ +using System.Collections.ObjectModel; +using Microsoft.Extensions.Logging; +using Temporalio.Workflows; + +namespace TemporalioSamples.Bedrock.Entity; + +[Workflow] +public class BedrockWorkflow +{ + public record WorkflowArgs(string? ConversationSummary = null, Queue? PromptQueue = null, bool? ChatEnded = null); + + public record WorkflowResult(Collection ConversationHistory); + + public record UserPrompt(string Prompt); + + public record ConversationEntry(string Speaker, string Message); + + private const int ContinueAsNewPerTurns = 6; + private readonly Queue promptQueue = new(); + private bool chatEnded; + + [WorkflowRun] + public async Task RunAsync(WorkflowArgs args) + { + if (args.ConversationSummary is not null) + { + ConversationHistory.Add(new ConversationEntry(Speaker: "conversation_summary", Message: args.ConversationSummary)); + ConversationSummary = args.ConversationSummary; + } + + if (args.PromptQueue is not null) + { + while (args.PromptQueue.TryDequeue(out var prompt)) + { + promptQueue.Enqueue(prompt); + } + } + + if (args.ChatEnded is not null) + { + chatEnded = args.ChatEnded.Value; + } + + while (true) + { + Workflow.Logger.LogInformation("Waiting for prompts..."); + + // Wait for a chat message or chat ended signal + await Workflow.WaitConditionAsync(() => promptQueue.Count > 0 || chatEnded); + + // Fetch next user prompt and add to conversation history + while (promptQueue.TryDequeue(out var prompt)) + { + ConversationHistory.Add(new(Speaker: "user", Message: prompt)); + Workflow.Logger.LogInformation("Prompt: {Prompt}", prompt); + + // Send the prompt to Amazon Bedrock + var promptResult = await Workflow.ExecuteActivityAsync( + (BedrockActivities activities) => activities.PromptBedrockAsync(new(PromptWithHistory(prompt))), + new() + { + StartToCloseTimeout = TimeSpan.FromSeconds(20), + }); + + Workflow.Logger.LogInformation("Response:\n{Response}", promptResult.Response); + + // Append the response to the conversation history + ConversationHistory.Add(new(Speaker: "response", promptResult.Response)); + + // Continue as new every x conversational turns to avoid event + // history size getting too large. This is also to avoid the + // prompt (with conversational history) getting too large for + // AWS Bedrock. + + // We summarize the chat to date and use that as input to the + // new workflow + if (ConversationHistory.Count >= ContinueAsNewPerTurns) + { + // Summarize the conversation to date using Amazon Bedrock + var summaryResult = await Workflow.ExecuteActivityAsync( + (BedrockActivities activities) => activities.PromptBedrockAsync(new(PromptSummaryFromHistory())), + new() + { + StartToCloseTimeout = TimeSpan.FromSeconds(20), + }); + + ConversationSummary = summaryResult.Response; + Workflow.Logger.LogInformation("Continuing as new due to {ContinueAsNewPerTurns} conversational turns.", ContinueAsNewPerTurns); + + throw Workflow.CreateContinueAsNewException(workflow => + workflow.RunAsync(new(ConversationSummary, promptQueue, chatEnded))); + } + } + + // If end chat signal was sent + if (chatEnded) + { + // The workflow might be continued as new without any + // chat to summarize, so only call Bedrock if there + // is more than the previous summary in the history. + if (ConversationHistory.Count > 1) + { + var summaryResult = await Workflow.ExecuteActivityAsync( + (BedrockActivities activities) => activities.PromptBedrockAsync(new(PromptSummaryFromHistory())), + new() + { + StartToCloseTimeout = TimeSpan.FromSeconds(20), + }); + + ConversationSummary = summaryResult.Response; + } + + Workflow.Logger.LogInformation("Chat ended. Conversation summary:\n{ConversationSummary}", ConversationSummary); + return new WorkflowResult(ConversationHistory); + } + } + } + + [WorkflowQuery] + public string? ConversationSummary { get; private set; } + + [WorkflowQuery] + public Collection ConversationHistory { get; } = new(); + + [WorkflowSignal] + public Task UserPromptAsync(UserPrompt signal) + { + // Chat timed out but the workflow is waiting for a chat summary to be generated + if (chatEnded) + { + Workflow.Logger.LogWarning("Message dropped due to chat closed: {Prompt}", signal.Prompt); + return Task.CompletedTask; + } + + promptQueue.Enqueue(signal.Prompt); + return Task.CompletedTask; + } + + [WorkflowSignal] + public async Task EndChatAsync() => chatEnded = true; + + private string FormatHistory() => string.Join(" ", ConversationHistory.Select(x => x.Message)); + + private string PromptWithHistory(string prompt) + { + // Create the prompt given to Amazon Bedrock for each conversational turn + var history = FormatHistory(); + + return $""" + Here is the conversation history: {history} Please add + a few sentence response to the prompt in plain text sentences. + Don't editorialize or add metadata like response. Keep the + text a plain explanation based on the history. Prompt: {prompt} + """; + } + + private string PromptSummaryFromHistory() + { + // Create the prompt to Amazon Bedrock to summarize the conversation history + var history = FormatHistory(); + + return $""" + Here is the conversation history between a user and a chatbot: + {history} -- Please produce a two sentence summary of + this conversation. + """; + } +} \ No newline at end of file diff --git a/src/Bedrock/Entity/Program.cs b/src/Bedrock/Entity/Program.cs new file mode 100644 index 0000000..6b2a83b --- /dev/null +++ b/src/Bedrock/Entity/Program.cs @@ -0,0 +1,106 @@ +using Amazon.BedrockRuntime; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Temporalio.Client; +using Temporalio.Extensions.Hosting; +using TemporalioSamples.Bedrock.Entity; + +async Task RunWorkerAsync() +{ + var builder = Host.CreateApplicationBuilder(args); + + builder.Logging. + SetMinimumLevel(LogLevel.Information). + AddSimpleConsole(options => options.SingleLine = true); + + builder.Services.AddSingleton(_ => new AmazonBedrockRuntimeClient()); + + builder.Services. + AddHostedTemporalWorker(clientTargetHost: "localhost:7233", clientNamespace: "default", taskQueue: "entity-bedrock-task-queue"). + AddSingletonActivities(). + AddWorkflow(); + + var app = builder.Build(); + await app.RunAsync(); +} + +async Task SendMessageAsync() +{ + var prompt = args.ElementAtOrDefault(1); + if (prompt is null) + { + Console.WriteLine("Usage: dotnet run send-message ''"); + Console.WriteLine("Example: dotnet run send-message 'What animals are marsupials?'"); + return; + } + + var client = await CreateClientAsync(); + var workflowId = "entity-bedrock-workflow"; + + // Sends a signal to the workflow (and starts it if needed) + var workflowOptions = new WorkflowOptions(workflowId, "entity-bedrock-task-queue"); + workflowOptions.SignalWithStart((BedrockWorkflow workflow) => workflow.UserPromptAsync(new(prompt))); + await client.StartWorkflowAsync((BedrockWorkflow workflow) => workflow.RunAsync(new(null, null, null)), workflowOptions); +} + +async Task GetHistoryAsync() +{ + var client = await CreateClientAsync(); + var workflowId = "entity-bedrock-workflow"; + var handle = client.GetWorkflowHandle(workflowId); + + // Queries the workflow for the conversation history + var history = await handle.QueryAsync(workflow => workflow.ConversationHistory); + + Console.WriteLine("Conversation History:"); + foreach (var entry in history) + { + Console.WriteLine($"{entry.Speaker}: {entry.Message}"); + } + + // Queries the workflow for the conversation summary + var summary = await handle.QueryAsync(workflow => workflow.ConversationSummary); + if (summary is not null) + { + Console.WriteLine("Conversation Summary:"); + Console.WriteLine(summary); + } +} + +async Task EndChatAsync() +{ + var client = await CreateClientAsync(); + var workflowId = "entity-bedrock-workflow"; + var handle = client.GetWorkflowHandle(workflowId); + + // Sends a signal to the workflow + await handle.SignalAsync((workflow) => workflow.EndChatAsync()); +} + +async Task CreateClientAsync() => + await TemporalClient.ConnectAsync(new("localhost:7233") + { + LoggerFactory = LoggerFactory.Create(builder => + builder. + AddSimpleConsole(options => options.TimestampFormat = "[HH:mm:ss] "). + SetMinimumLevel(LogLevel.Information)), + }); + +switch (args.ElementAtOrDefault(0)) +{ + case "worker": + await RunWorkerAsync(); + break; + case "send-message": + await SendMessageAsync(); + break; + case "get-history": + await GetHistoryAsync(); + break; + case "end-chat": + await EndChatAsync(); + break; + default: + throw new ArgumentException("Must pass 'worker' or 'send-message' as the single argument"); +} \ No newline at end of file diff --git a/src/Bedrock/Entity/README.md b/src/Bedrock/Entity/README.md new file mode 100644 index 0000000..64ff1e3 --- /dev/null +++ b/src/Bedrock/Entity/README.md @@ -0,0 +1,19 @@ +# Multi-turn chat with Amazon Bedrock Entity Workflow + +Multi-Turn Chat using an Entity Workflow. The workflow runs forever unless explicitly ended. The workflow continues as new after a configurable number of chat turns to keep the prompt size small and the Temporal event history small. Each continued-as-new workflow receives a summary of the conversation history so far for context. + +To run, first see `Bedrock` [README.md](../README.md) for prerequisites specific to this sample. Once set up, run the following from this directory: + +1. Run the worker: `dotnet run worker` +2. In another terminal run the client with a prompt. + + Example: `dotnet run send-message 'What animals are marsupials?'` + +3. View the worker's output for the response. +4. Give followup prompts by signaling the workflow. + + Example: `dotnet run send-message 'Do they lay eggs?'` +5. Get the conversation history summary by querying the workflow. + + Example: `dotnet run get-history` +6. To end the chat session, run `dotnet run end-chat` \ No newline at end of file diff --git a/src/Bedrock/Entity/TemporalioSamples.Bedrock.Entity.csproj b/src/Bedrock/Entity/TemporalioSamples.Bedrock.Entity.csproj new file mode 100644 index 0000000..0a9d24f --- /dev/null +++ b/src/Bedrock/Entity/TemporalioSamples.Bedrock.Entity.csproj @@ -0,0 +1,11 @@ + + + + Exe + + + + + + + \ No newline at end of file diff --git a/src/Bedrock/README.md b/src/Bedrock/README.md new file mode 100644 index 0000000..04f093d --- /dev/null +++ b/src/Bedrock/README.md @@ -0,0 +1,21 @@ +# AI Chatbot example using Amazon Bedrock + +Demonstrates how Temporal and Amazon Bedrock can be used to quickly build bulletproof AI applications. + +## Samples + +* [Basic](Basic) - A basic Bedrock workflow to process a single prompt. +* [SignalsAndQueries](SignalsAndQueries) - Extension to the basic workflow to allow multiple prompts through signals & queries. +* [Entity](Entity) - Full multi-Turn chat using an entity workflow. + +## Pre-requisites + +1. An AWS account with Bedrock enabled. +2. A machine that has access to Bedrock. +3. A local Temporal server running on the same machine. See [Temporal's dev server docs](https://docs.temporal.io/cli#start-dev-server) for more information. + +These examples use Amazon's .NET SDK. To configure your AWS credentials, follow the instructions in [the AWS SDK for .NET documentation](https://docs.aws.amazon.com/sdk-for-net/v3/developer-guide/creds-idc.html). + +## Running the samples + +There are 3 Bedrock samples, see the README.md in each subdirectory for instructions on running each. \ No newline at end of file diff --git a/src/Bedrock/SignalsAndQueries/BedrockActivities.cs b/src/Bedrock/SignalsAndQueries/BedrockActivities.cs new file mode 100644 index 0000000..96952ff --- /dev/null +++ b/src/Bedrock/SignalsAndQueries/BedrockActivities.cs @@ -0,0 +1,39 @@ +using System.Text.Json; +using System.Text.Json.Nodes; +using Amazon.BedrockRuntime; +using Amazon.BedrockRuntime.Model; +using Temporalio.Activities; + +namespace TemporalioSamples.Bedrock.SignalsAndQueries; + +public class BedrockActivities(IAmazonBedrockRuntime bedrock) +{ + public record PromptArgs(string Prompt); + + public record PromptResult(string Response); + + [Activity] + public async Task PromptBedrockAsync(PromptArgs args) + { + var body = JsonSerializer.Serialize(new + { + prompt = args.Prompt, + max_gen_len = 512, + temperature = 0.1, + top_p = 0.2, + }); + + var request = new InvokeModelRequest + { + ModelId = "meta.llama3-1-70b-instruct-v1:0", + Accept = "application/json", + ContentType = "application/json", + Body = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(body)), + }; + + var response = await bedrock.InvokeModelAsync(request); + var modelResponse = await JsonNode.ParseAsync(response.Body); + var responseText = modelResponse?["generation"]?.ToString() ?? string.Empty; + return new(responseText); + } +} \ No newline at end of file diff --git a/src/Bedrock/SignalsAndQueries/BedrockWorkflow.workflow.cs b/src/Bedrock/SignalsAndQueries/BedrockWorkflow.workflow.cs new file mode 100644 index 0000000..1f75fe0 --- /dev/null +++ b/src/Bedrock/SignalsAndQueries/BedrockWorkflow.workflow.cs @@ -0,0 +1,121 @@ +using System.Collections.ObjectModel; +using Microsoft.Extensions.Logging; +using Temporalio.Workflows; + +namespace TemporalioSamples.Bedrock.SignalsAndQueries; + +[Workflow] +public class BedrockWorkflow +{ + public record WorkflowArgs(int InactivityTimeoutMinutes); + + public record WorkflowResult(Collection ConversationHistory); + + public record UserPrompt(string Prompt); + + public record ConversationEntry(string Speaker, string Message); + + private readonly Queue promptQueue = new(); + private bool chatTimeout; + + [WorkflowRun] + public async Task RunAsync(WorkflowArgs args) + { + while (true) + { + Workflow.Logger.LogInformation("Waiting for prompts... or closing chat after {InactivityTimeoutMinutes} minutes(s)", args.InactivityTimeoutMinutes); + + // Wait for a chat message (signal) or timeout + if (await Workflow.WaitConditionAsync(() => promptQueue.Count > 0, timeout: TimeSpan.FromMinutes(args.InactivityTimeoutMinutes))) + { + // Fetch next user prompt and add to conversation history + while (promptQueue.TryDequeue(out var prompt)) + { + ConversationHistory.Add(new(Speaker: "user", Message: prompt)); + Workflow.Logger.LogInformation("Prompt: {Prompt}", prompt); + + // Send the prompt to Amazon Bedrock + var promptResult = await Workflow.ExecuteActivityAsync( + (BedrockActivities activities) => activities.PromptBedrockAsync(new(PromptWithHistory(prompt))), + new() + { + StartToCloseTimeout = TimeSpan.FromSeconds(20), + }); + + Workflow.Logger.LogInformation("Response:\n{Response}", promptResult.Response); + + // Append the response to the conversation history + ConversationHistory.Add(new(Speaker: "response", Message: promptResult.Response)); + } + } + else + { + // If timeout was reached + chatTimeout = true; + Workflow.Logger.LogInformation("Chat closed due to inactivity"); + + // End the workflow + break; + } + } + + // Generate a summary before ending the workflow + var summaryResult = await Workflow.ExecuteActivityAsync( + (BedrockActivities activities) => activities.PromptBedrockAsync(new(PromptSummaryFromHistory())), + new() + { + StartToCloseTimeout = TimeSpan.FromSeconds(20), + }); + + ConversationSummary = summaryResult.Response; + Workflow.Logger.LogInformation("Conversation summary:\n{ConversationSummary}", ConversationSummary); + return new WorkflowResult(ConversationHistory); + } + + [WorkflowQuery] + public string? ConversationSummary { get; private set; } + + [WorkflowQuery] + public Collection ConversationHistory { get; } = new(); + + [WorkflowSignal] + public Task UserPromptAsync(UserPrompt signal) + { + // Chat timed out but the workflow is waiting for a chat summary to be generated + if (chatTimeout) + { + Workflow.Logger.LogWarning("Message dropped due to chat closed: {Prompt}", signal.Prompt); + return Task.CompletedTask; + } + + promptQueue.Enqueue(signal.Prompt); + return Task.CompletedTask; + } + + private string FormatHistory() => string.Join(" ", ConversationHistory.Select(x => x.Message)); + + private string PromptWithHistory(string prompt) + { + // Create the prompt given to Amazon Bedrock for each conversational turn + var history = FormatHistory(); + + return $""" + Here is the conversation history: {history} Please add + a few sentence response to the prompt in plain text sentences. + Don't editorialize or add metadata like response. Keep the + text a plain explanation based on the history. Prompt: {prompt} + """; + } + + private string PromptSummaryFromHistory() + { + // Create the prompt to Amazon Bedrock to summarize the conversation history + var history = FormatHistory(); + + return $""" + Here is the conversation history between a user and a chatbot: + {history} -- Please produce a two sentence summary of + this conversation. + """; + } +} \ No newline at end of file diff --git a/src/Bedrock/SignalsAndQueries/Program.cs b/src/Bedrock/SignalsAndQueries/Program.cs new file mode 100644 index 0000000..3ac1c6c --- /dev/null +++ b/src/Bedrock/SignalsAndQueries/Program.cs @@ -0,0 +1,94 @@ +using Amazon.BedrockRuntime; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Temporalio.Client; +using Temporalio.Extensions.Hosting; +using TemporalioSamples.Bedrock.SignalsAndQueries; + +async Task RunWorkerAsync() +{ + var builder = Host.CreateApplicationBuilder(args); + + builder.Logging. + SetMinimumLevel(LogLevel.Information). + AddSimpleConsole(options => options.SingleLine = true); + + builder.Services.AddSingleton(_ => new AmazonBedrockRuntimeClient()); + + builder.Services. + AddHostedTemporalWorker(clientTargetHost: "localhost:7233", clientNamespace: "default", taskQueue: "with-signals-bedrock-task-queue"). + AddSingletonActivities(). + AddWorkflow(); + + var app = builder.Build(); + await app.RunAsync(); +} + +async Task SendMessageAsync() +{ + var prompt = args.ElementAtOrDefault(1); + if (prompt is null) + { + Console.WriteLine("Usage: dotnet run send-message ''"); + Console.WriteLine("Example: dotnet run send-message 'What animals are marsupials?'"); + return; + } + + var client = await CreateClientAsync(); + var workflowId = "bedrock-workflow-with-signals"; + var inactivityTimeoutMinutes = 1; + + // Sends a signal to the workflow (and starts it if needed) + var workflowOptions = new WorkflowOptions(workflowId, "with-signals-bedrock-task-queue"); + workflowOptions.SignalWithStart((BedrockWorkflow workflow) => workflow.UserPromptAsync(new(prompt))); + await client.StartWorkflowAsync((BedrockWorkflow workflow) => workflow.RunAsync(new(inactivityTimeoutMinutes)), workflowOptions); +} + +async Task GetHistoryAsync() +{ + var client = await CreateClientAsync(); + var workflowId = "bedrock-workflow-with-signals"; + var handle = client.GetWorkflowHandle(workflowId); + + // Queries the workflow for the conversation history + var history = await handle.QueryAsync(workflow => workflow.ConversationHistory); + + Console.WriteLine("Conversation History:"); + foreach (var entry in history) + { + Console.WriteLine($"{entry.Speaker}: {entry.Message}"); + } + + // Queries the workflow for the conversation summary + var summary = await handle.QueryAsync(workflow => workflow.ConversationSummary); + if (summary is not null) + { + Console.WriteLine("Conversation Summary:"); + Console.WriteLine(summary); + } +} + +async Task CreateClientAsync() => + await TemporalClient.ConnectAsync(new("localhost:7233") + { + LoggerFactory = LoggerFactory.Create(builder => + builder. + AddSimpleConsole(options => options.TimestampFormat = "[HH:mm:ss] "). + SetMinimumLevel(LogLevel.Information)), + }); + +switch (args.ElementAtOrDefault(0)) +{ + case "worker": + await RunWorkerAsync(); + break; + case "send-message": + await SendMessageAsync(); + break; + case "get-history": + await GetHistoryAsync(); + break; + default: + throw new ArgumentException("Must pass 'worker' or 'send-message' as the single argument"); +} \ No newline at end of file diff --git a/src/Bedrock/SignalsAndQueries/README.md b/src/Bedrock/SignalsAndQueries/README.md new file mode 100644 index 0000000..0dac8cd --- /dev/null +++ b/src/Bedrock/SignalsAndQueries/README.md @@ -0,0 +1,19 @@ +# Amazon Bedrock workflow using Signals and Queries + +Adding signals & queries to the [basic Bedrock sample](../Basic). Starts a workflow with a prompt, allows follow-up prompts to be given using Temporal signals, and allows the conversation history to be queried using Temporal queries. + +To run, first see `Bedrock` [README.md](../README.md) for prerequisites specific to this sample. Once set up, run the following from this directory: + +1. Run the worker: `dotnet run worker` +2. In another terminal run the client with a prompt. + + Example: `dotnet run send-message 'What animals are marsupials?'` + +3. View the worker's output for the response. +4. Give followup prompts by signaling the workflow. + + Example: `dotnet run send-message 'Do they lay eggs?'` +5. Get the conversation history by querying the workflow. + + Example: `dotnet run get-history` +6. The workflow will timeout after inactivity. \ No newline at end of file diff --git a/src/Bedrock/SignalsAndQueries/TemporalioSamples.Bedrock.SignalsAndQueries.csproj b/src/Bedrock/SignalsAndQueries/TemporalioSamples.Bedrock.SignalsAndQueries.csproj new file mode 100644 index 0000000..0a9d24f --- /dev/null +++ b/src/Bedrock/SignalsAndQueries/TemporalioSamples.Bedrock.SignalsAndQueries.csproj @@ -0,0 +1,11 @@ + + + + Exe + + + + + + + \ No newline at end of file diff --git a/tests/AssertMore.cs b/tests/AssertMore.cs new file mode 100644 index 0000000..08429d6 --- /dev/null +++ b/tests/AssertMore.cs @@ -0,0 +1,36 @@ +namespace TemporalioSamples.Tests; + +public static class AssertMore +{ + public static Task EventuallyAsync( + Func func, TimeSpan? interval = null, int iterations = 15) => + EventuallyAsync( + async () => + { + await func(); + return ValueTuple.Create(); + }, + interval, + iterations); + + public static async Task EventuallyAsync( + Func> func, TimeSpan? interval = null, int iterations = 15) + { + var tick = interval ?? TimeSpan.FromMilliseconds(300); + for (var i = 0; ; i++) + { + try + { + return await func(); + } + catch (Xunit.Sdk.XunitException) + { + if (i >= iterations - 1) + { + throw; + } + } + await Task.Delay(tick); + } + } +} \ No newline at end of file diff --git a/tests/Bedrock/BedrockEntityWorkflowTests.cs b/tests/Bedrock/BedrockEntityWorkflowTests.cs new file mode 100644 index 0000000..645758f --- /dev/null +++ b/tests/Bedrock/BedrockEntityWorkflowTests.cs @@ -0,0 +1,140 @@ +namespace TemporalioSamples.Tests.Bedrock; + +using Temporalio.Activities; +using Temporalio.Client; +using Temporalio.Exceptions; +using Temporalio.Testing; +using Temporalio.Worker; +using TemporalioSamples.Bedrock.Entity; +using Xunit; +using Xunit.Abstractions; + +public class BedrockEntityWorkflowTests(ITestOutputHelper output) : TestBase(output) +{ + [Fact] + public async Task ContinuesAsNew() + { + var activities = new TestBedrockActivities(); + + await using var env = await WorkflowEnvironment.StartLocalAsync(new() { LoggerFactory = LoggerFactory, }); + + using var worker = new TemporalWorker( + env.Client, + new TemporalWorkerOptions("bedrock-entity-test-task-queue"). + AddActivity(activities.PromptBedrockAsync). + AddWorkflow()); + + await worker.ExecuteAsync(async () => + { + var workflowOptions = new WorkflowOptions(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!); + workflowOptions.SignalWithStart((BedrockWorkflow wf) => wf.UserPromptAsync(new("What animals are marsupials?"))); + var handle = await env.Client.StartWorkflowAsync((BedrockWorkflow wf) => wf.RunAsync(new(null, null, null)), workflowOptions); + + await handle.SignalAsync(wf => wf.UserPromptAsync(new("Do they lay eggs?"))); + await handle.SignalAsync(wf => wf.UserPromptAsync(new("Are you a chicken?"))); + await handle.SignalAsync(wf => wf.EndChatAsync()); + await handle.GetResultAsync(); + + // Check whether the workflow continued as new + var firstRunHistory = await (handle with { RunId = handle.ResultRunId }).FetchHistoryAsync(); + var continued = firstRunHistory.Events.Any(evt => evt.WorkflowExecutionContinuedAsNewEventAttributes != null); + Assert.True(continued); + }); + } + + [Fact] + public async Task QueryConversationHistory() + { + var activities = new TestBedrockActivities(); + + await using var env = await WorkflowEnvironment.StartLocalAsync(new() { LoggerFactory = LoggerFactory, }); + + using var worker = new TemporalWorker( + env.Client, + new TemporalWorkerOptions("bedrock-entity-test-task-queue"). + AddActivity(activities.PromptBedrockAsync). + AddWorkflow()); + + await worker.ExecuteAsync(async () => + { + var workflowId = $"workflow-{Guid.NewGuid()}"; + var workflowOptions = new WorkflowOptions(id: workflowId, taskQueue: worker.Options.TaskQueue!); + workflowOptions.SignalWithStart((BedrockWorkflow wf) => wf.UserPromptAsync(new("What animals are marsupials?"))); + + var handle = await env.Client.StartWorkflowAsync((BedrockWorkflow wf) => wf.RunAsync(new(null, null, null)), workflowOptions); + + await AssertMore.EventuallyAsync(async () => + { + var history = await handle.QueryAsync(workflow => workflow.ConversationHistory); + Assert.Equal(2, history.Count); + Assert.Equal(new BedrockWorkflow.ConversationEntry("user", "What animals are marsupials?"), history[0]); + Assert.Equal(new BedrockWorkflow.ConversationEntry("response", "Marsupials are a group of mammals that give birth to underdeveloped young."), history[1]); + }); + + await handle.SignalAsync(wf => wf.UserPromptAsync(new("Do they lay eggs?"))); + + await AssertMore.EventuallyAsync(async () => + { + var history = await handle.QueryAsync(workflow => workflow.ConversationHistory); + Assert.Equal(4, history.Count); + Assert.Equal(new BedrockWorkflow.ConversationEntry("user", "What animals are marsupials?"), history[0]); + Assert.Equal(new BedrockWorkflow.ConversationEntry("response", "Marsupials are a group of mammals that give birth to underdeveloped young."), history[1]); + Assert.Equal(new BedrockWorkflow.ConversationEntry("user", "Do they lay eggs?"), history[2]); + Assert.Equal(new BedrockWorkflow.ConversationEntry("response", "No, marsupials do not lay eggs. They are mammals, which means they give birth to live young."), history[3]); + }); + + // At this point the workflow will continue as new. + await handle.SignalAsync(wf => wf.UserPromptAsync(new("Are you a chicken?"))); + + await AssertMore.EventuallyAsync(async () => + { + try + { + var history = await handle.QueryAsync(workflow => workflow.ConversationHistory); + Assert.Single(history); + Assert.Equal(new BedrockWorkflow.ConversationEntry("conversation_summary", "This is the summary."), history[0]); + } + catch (RpcException rpcEx) when (rpcEx.Message == "Workflow task is not scheduled yet.") + { + Assert.Fail("Should not get here"); + } + }); + await handle.SignalAsync(wf => wf.EndChatAsync()); + + await AssertMore.EventuallyAsync(async () => + { + var summary = await handle.QueryAsync(workflow => workflow.ConversationSummary); + Assert.Equal("This is the summary.", summary); + }); + + await handle.GetResultAsync(); + + // Check whether the workflow continued as new + var firstRunHistory = await (handle with { RunId = handle.ResultRunId }).FetchHistoryAsync(); + var continued = firstRunHistory.Events.Any(evt => evt.WorkflowExecutionContinuedAsNewEventAttributes != null); + Assert.True(continued); + }); + } + + private class TestBedrockActivities + { + private int count; + + [Activity] + public Task PromptBedrockAsync(BedrockActivities.PromptArgs args) + { + count++; + var result = count switch + { + 1 => new BedrockActivities.PromptResult( + "Marsupials are a group of mammals that give birth to underdeveloped young."), + 2 => new BedrockActivities.PromptResult( + "No, marsupials do not lay eggs. They are mammals, which means they give birth to live young."), + 3 => new BedrockActivities.PromptResult("No, I am not a chicken."), + 4 => new BedrockActivities.PromptResult("This is the summary."), + _ => throw new InvalidOperationException("Should not get called"), + }; + return Task.FromResult(result); + } + } +} \ No newline at end of file diff --git a/tests/TemporalioSamples.Tests.csproj b/tests/TemporalioSamples.Tests.csproj index a8ba929..69c1b80 100644 --- a/tests/TemporalioSamples.Tests.csproj +++ b/tests/TemporalioSamples.Tests.csproj @@ -21,6 +21,7 @@ +