From 7cf742d2dcdf6fbbc9c1ff9e14fcfca9edd0b18d Mon Sep 17 00:00:00 2001 From: Edward Funnekotter Date: Thu, 26 Sep 2024 16:50:15 -0400 Subject: [PATCH] New features: inline broker request-response, temporary queues, improved docs and examples and better testing (#39) * Examples Update + Code Refactor (#25) * Removed StorageManager * Added examples for OpenAI, Bedrock, Anthropic, and VertexAI * Updating old examples (1/2) * Updating old examples (2/2) * Added support for temporary queue + UUID queue name (#26) * Add assembly component and auto-generated documents (#27) * Added the assembly component * Auto-generated documents * Added type check * Update the cache service expiry logic + Update the assembly component to use cache expiry for timeout * Moved assembly to the correct place * Added MoA Example + UUID Invoke Function (#28) * MoA example: Broadcast to multiple agents * Added MoA event manager, added uuid invoke_function + test, updated auto-generated docs * Added assembly layer to MoA example * Update documentation for new users + Refactored component_input & source_expression (#29) * Refactored component_input to input_selection * Updated, added, and enhanced the documentation with new users in mind * Refactored source_expression function to evaluate_expression (backward compatible) * Added tips and tricks section + info and examples on custom modules * tiny format update * tiny update * Fixed solace disconnection issues on shutting down (#30) * Add RAG example for AI connector + delete action for vector index (#31) * Added a RAG example for AI connector * Added delete option to vectordb * Changed id to ids * chore: Refactor make_history_start_with_user_message method (#32) Fix the method to not trim the first entry if it is a "system" role * Keep history depth needs to be a positive integer and test refactor (#33) * chore: Refactor clear_history_but_keep_depth method to handle negative depth values * chore: small change to how this is solved * chore: one more try * refactor: move utils_for_test_files.py to solace_ai_connector module * refactor: removed the orginal utils_for_test_files.py * refactor: update import statements in test files * refactor: add sys.path.append("src") to test files * refactor: standardize import order and sys.path.append in test files * refactor: a bit more test infrastructure changes * feat: allow component_module to accept module objects directly * feat: add types module import to utils.py * test: add static import and object config test * refactor: update test_static_import_and_object_config to use create_test_flows * refactor: Improve test structure and remove duplicate test case * fix: remove duplicate import of yaml module * refactor: Modify test config to use dict instead of YAML string * refactor: convert config_yaml from string to dictionary * refactor: update static import test to use pass_through component * test: Add delay component message passing test * feat: add test for delay component message processing * feat: Added a new test function (test_one_component) to make it very easy to just run some quick tests on a single input -> expected output tests on a single component * feat: added input_transforms to the test_one_component so that input transforms can be tested with it * chore: a bit of cleanup and new tests for test_one_component * chore: rename test_one_component because it was being picked up as a test by the pytest scanner * fix: fixed a typo * Fix for anthropic example (#35) * Updating version dependency (#37) * Fixed url and file name in getting started (#38) * Add guide for RAG (#39) * Added guide for RAG * update wording * Added link to other docs from RAG guide (#40) * chore: added a timeout setting for running component tests so that you can test situations where you don't expect any output (#34) * AI-124: Add a feature to provide simple blocking broker request/response ability for components (#42) * feat: add request_response_controller.py * feat: implement RequestResponseFlowManager and RequestResponseController classes * style: format code with black and improve readability * feat: implement RequestResponseController for flow-based request-response handling * feat: implement RequestResponseController for handling request-response patterns * fix: import SolaceAiConnector for type checking * refactor: restructure Flow class and improve code organization * feat: implement multiple named RequestResponseControllers per component * refactor: initialize request-response controllers in ComponentBase * test: add request_response_controller functionality tests * feat: finished implementation and added some tests * refactor: rename RequestResponseController to RequestResponseFlowController * refactor: rename RequestResponseController to RequestResponseFlowController * refactor: some name changes * fix: update test function names for RequestResponseFlowController * refactor: more name changes * Ed/req_resp_examples_and_fixes (#41) * feat: Added a request_response_flow example and fixed a few issues along the way * feat: Reworked the broker_request_response built-in ability of components to be simpler. Instead of having to have a defined flow and then name that flow, it will automatically create a flow with a single broker_request_response component in it. Now there is a straightforward interating function call to allow components to issue a request and get streaming or non-streaming responses from that flow. * chore: fix the request_response example and remove the old one * docs: add broker request-response configuration * docs: added advanced_component_features.md * docs: add broker request-response configuration details * docs: add payload encoding and format to broker config * docs: add cache service and timer manager to advanced_component_features.md * docs: add configuration requirement for broker request-response * docs: update broker request-response section with configuration info * docs: a bit more detail about do_broker_request_response * docs: add link to advanced features page in table of contents * docs: add link to advanced features page * docs: reorder table of contents in index.md * docs: add custom components documentation * docs: Remove advanced component features from table of contents * docs: clean up a double inclusion of the same section * docs: small example change * chore: remove dead code * chore: add some extra comments to explain some test code * docs: Update description of STDIN input component Update the description of the STDIN input component to clarify that it waits for its output message to be acknowledged before prompting for the next input. This change is made in the `stdin_input.py` file. * chore: add is_broker_request_response_enabled method * chore: Some changes after review * feat: AI-129: add ability to specify a default value for a an environment variable in a .yaml config file (#43) * DATAGO-85484 Bump min python version --------- Co-authored-by: Cyrus Mobini <68962752+cyrus2281@users.noreply.github.com> Co-authored-by: Art Morozov Co-authored-by: Art Morozov --- .github/workflows/ci.yml | 9 +- .github/workflows/release.yaml | 2 +- config.yaml | 1 - docs/advanced_component_features.md | 120 +++++++++ docs/configuration.md | 33 ++- docs/custom_components.md | 90 +++++++ docs/getting_started.md | 4 +- docs/guides/RAG.md | 236 +++++++++++++++++ docs/guides/index.md | 4 + docs/index.md | 2 + examples/ack_test.yaml | 1 - examples/anthropic_bedrock.yaml | 16 +- examples/error_handler.yaml | 1 - examples/llm/custom_components/__init__.py | 0 .../llm_streaming_custom_component.py | 52 ++++ .../openai_component_request_response.yaml | 147 +++++++++++ examples/request_reply.yaml | 1 - pyproject.toml | 6 +- requirements.txt | 4 +- src/solace_ai_connector/common/event.py | 5 +- src/solace_ai_connector/common/utils.py | 22 +- .../components/component_base.py | 66 ++++- .../components/general/delay.py | 3 +- .../general/for_testing/handler_callback.py | 67 +++++ .../general/openai/openai_chat_model_base.py | 110 +++++++- .../openai/openai_chat_model_with_history.py | 5 + .../components/inputs_outputs/broker_base.py | 9 +- .../components/inputs_outputs/broker_input.py | 3 +- .../inputs_outputs/broker_request_response.py | 164 +++++++++--- .../components/inputs_outputs/stdin_input.py | 34 ++- .../inputs_outputs/stdout_output.py | 19 +- src/solace_ai_connector/flow/flow.py | 19 +- .../flow/request_response_flow_controller.py | 156 +++++++++++ src/solace_ai_connector/main.py | 16 +- .../solace_ai_connector.py | 18 +- .../test_utils}/utils_for_test_files.py | 103 +++++++- tests/test_acks.py | 4 +- tests/test_aggregate.py | 5 +- tests/test_config_file.py | 51 +++- tests/test_error_flows.py | 6 +- tests/test_filter.py | 6 +- tests/test_flows.py | 5 +- tests/test_invoke.py | 18 +- tests/test_iterate.py | 6 +- tests/test_message_get_set_data.py | 4 +- tests/test_request_response_controller.py | 244 ++++++++++++++++++ tests/test_timer_input.py | 5 +- tests/test_transforms.py | 62 ++++- 48 files changed, 1822 insertions(+), 142 deletions(-) create mode 100644 docs/advanced_component_features.md create mode 100644 docs/custom_components.md create mode 100644 docs/guides/RAG.md create mode 100644 docs/guides/index.md create mode 100644 examples/llm/custom_components/__init__.py create mode 100644 examples/llm/custom_components/llm_streaming_custom_component.py create mode 100644 examples/llm/openai_component_request_response.yaml create mode 100644 src/solace_ai_connector/components/general/for_testing/handler_callback.py create mode 100644 src/solace_ai_connector/flow/request_response_flow_controller.py rename {tests => src/solace_ai_connector/test_utils}/utils_for_test_files.py (58%) create mode 100644 tests/test_request_response_controller.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 21e2b8b..439f70f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -14,7 +14,9 @@ permissions: jobs: ci: - uses: SolaceDev/solace-public-workflows/.github/workflows/hatch_ci.yml@v1.0.0 + uses: SolaceDev/solace-public-workflows/.github/workflows/hatch_ci.yml@latest + with: + min-python-version: "3.9" secrets: SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} SONAR_HOST_URL: ${{ vars.SONAR_HOST_URL }} @@ -29,8 +31,9 @@ jobs: ssh-key: ${{ secrets.COMMIT_KEY }} - name: Set up Hatch - uses: SolaceDev/solace-public-workflows/.github/actions/hatch-setup@v1.0.0 - + uses: SolaceDev/solace-public-workflows/.github/actions/hatch-setup@latest + with: + min-python-version: "3.9" - name: Set Up Docker Buildx id: builder uses: docker/setup-buildx-action@v3 diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index c483f4b..0026e90 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -18,7 +18,7 @@ permissions: jobs: release: - uses: SolaceDev/solace-public-workflows/.github/workflows/hatch_release_pypi.yml@v1.0.1 + uses: SolaceDev/solace-public-workflows/.github/workflows/hatch_release_pypi.yml@latest with: ENVIRONMENT: pypi version: ${{ github.event.inputs.version }} diff --git a/config.yaml b/config.yaml index fcb4667..360e99d 100644 --- a/config.yaml +++ b/config.yaml @@ -9,7 +9,6 @@ log: shared_config: - broker_config: &broker_connection - broker_connection_share: ${SOLACE_BROKER_URL} broker_type: solace broker_url: ${SOLACE_BROKER_URL} broker_username: ${SOLACE_BROKER_USERNAME} diff --git a/docs/advanced_component_features.md b/docs/advanced_component_features.md new file mode 100644 index 0000000..7053ff6 --- /dev/null +++ b/docs/advanced_component_features.md @@ -0,0 +1,120 @@ +# Advanced Component Features + +This document describes advanced features available to custom components in the Solace AI Connector. + +## Table of Contents +- [Broker Request-Response](#broker-request-response) +- [Cache Manager](#cache-manager) +- [Timer Features](#timer-features) + +## Broker Request-Response + +Components can perform a request and get a response from the broker using the `do_broker_request_response` method. This method supports both simple request-response and streamed responses. To use this feature, the component's configuration must include a `broker_request_response` section. For details on how to configure this section, refer to the [Broker Request-Response Configuration](configuration.md#broker-request-response-configuration) in the configuration documentation. + +This feature would be used in the invoke method of a custom component. When the `do_broker_request_response` method is called, the component will send a message to the broker and then block until a response (or a series of streamed chunks) is received. This makes it very easy to call services that are available via the broker. + +### Usage + +```python +response = self.do_broker_request_response(message, stream=False) +``` + +For streamed responses: + +```python +for chunk, is_last in self.do_broker_request_response(message, stream=True, streaming_complete_expression="input.payload:streaming.last_message"): + # Process each chunk + if is_last: + break +``` + +### Parameters + +- `message`: The message to send to the broker. This must have a topic and payload. +- `stream` (optional): Boolean indicating whether to expect a streamed response. Default is False. +- `streaming_complete_expression` (optional): An expression to evaluate on each response chunk to determine if it's the last one. This is required when `stream=True`. + +### Return Value + +- For non-streamed responses: Returns the response message. +- For streamed responses: Returns a generator that yields tuples of (chunk, is_last). Each chunk is a fully formed message with the format of the response. `is_last` is a boolean indicating if the chunk is the last one. + +## Memory Cache + +The cache service provides a flexible way to store and retrieve data with optional expiration. It supports different storage backends and offers features like automatic expiry checks. + +### Features + +1. Multiple storage backends: + - In-memory storage + - SQLAlchemy-based storage (for persistent storage) + +2. Key-value storage with metadata and expiry support +3. Automatic expiry checks in a background thread +4. Thread-safe operations + +### Usage + +Components can access the cache service through `self.cache_service`. Here are some common operations: + +```python +# Set a value with expiry +self.cache_service.set("key", "value", expiry=300) # Expires in 300 seconds + +# Get a value +value = self.cache_service.get("key") + +# Delete a value +self.cache_service.delete("key") + +# Get all values (including metadata and expiry) +all_data = self.cache_service.get_all() +``` + +### Configuration + +The cache service can be configured in the main configuration file: + +```yaml +cache: + backend: "memory" # or "sqlalchemy" + connection_string: "sqlite:///cache.db" # for SQLAlchemy backend +``` + +## Timer Features + +The timer manager allows components to schedule one-time or recurring timer events. This is useful for implementing delayed actions, periodic tasks, or timeouts. + +### Features + +1. One-time and recurring timers +2. Customizable timer IDs for easy management +3. Optional payloads for timer events + +### Usage + +Components can access the timer manager through `self.timer_manager`. Here are some common operations: + +```python +# Add a one-time timer +self.add_timer(delay_ms=5000, timer_id="my_timer", payload={"key": "value"}) + +# Add a recurring timer +self.add_timer(delay_ms=5000, timer_id="recurring_timer", interval_ms=10000, payload={"type": "recurring"}) + +# Cancel a timer +self.cancel_timer(timer_id="my_timer") +``` + +### Handling Timer Events + +To handle timer events, components should implement the `handle_timer_event` method: + +```python +def handle_timer_event(self, timer_data): + timer_id = timer_data["timer_id"] + payload = timer_data["payload"] + # Process the timer event +``` + +Timer events are automatically dispatched to the appropriate component by the timer manager. diff --git a/docs/configuration.md b/docs/configuration.md index 8c2b511..9394aa8 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -238,7 +238,7 @@ Each component configuration is a dictionary with the following keys: - `input_selection`: - A `source_expression` or `source_value` to use as the input to the component. Check [Expression Syntax](#expression-syntax) for more details. [Optional: If not specified, the complete previous component output will be used] - `queue_depth`: - The depth of the input queue for the component. - `num_instances`: - The number of instances of the component to run (Starts multiple threads to process messages) - +- `broker_request_response`: - Configuration for the broker request-response functionality. [Optional] ### component_module @@ -359,6 +359,37 @@ The `queue_depth` is an integer that specifies the depth of the input queue for The `num_instances` is an integer that specifies the number of instances of the component to run. This is the number of threads that will be started to process messages from the input queue. By default, the number of instances is 1. +### Broker Request-Response Configuration + +The `broker_request_response` configuration allows components to perform request-response operations with a broker. It has the following structure: + +```yaml +broker_request_response: + enabled: + broker_config: + broker_type: + broker_url: + broker_username: + broker_password: + broker_vpn: + payload_encoding: + payload_format: + request_expiry_ms: +``` + +- `enabled`: Set to `true` to enable broker request-response functionality for the component. +- `broker_config`: Configuration for the broker connection. + - `broker_type`: Type of the broker (e.g., "solace"). + - `broker_url`: URL of the broker. + - `broker_username`: Username for broker authentication. + - `broker_password`: Password for broker authentication. + - `broker_vpn`: VPN name for the broker connection. + - `payload_encoding`: Encoding for the payload (e.g., "utf-8", "base64"). + - `payload_format`: Format of the payload (e.g., "json", "text"). +- `request_expiry_ms`: Expiry time for requests in milliseconds. + +For more details on using this functionality, see the [Advanced Component Features](advanced_component_features.md#broker-request-response) documentation. + ### Built-in components The AI Event Connector comes with a number of built-in components that can be used to process messages. For a list of all built-in components, see the [Components](components/index.md) documentation. diff --git a/docs/custom_components.md b/docs/custom_components.md new file mode 100644 index 0000000..f34da91 --- /dev/null +++ b/docs/custom_components.md @@ -0,0 +1,90 @@ +# Custom Components + +## Purpose + +Custom components provide a way to extend the functionality of the Solace AI Connector beyond what's possible with the built-in components and configuration options. Sometimes, it's easier and more efficient to add custom code than to build a complex configuration file, especially for specialized or unique processing requirements. + +## Requirements of a Custom Component + +To create a custom component, you need to follow these requirements: + +1. **Inherit from ComponentBase**: Your custom component class should inherit from the `ComponentBase` class. + +2. **Info Section**: Define an `info` dictionary with the following keys: + - `class_name`: The name of your custom component class. + - `config_parameters`: A list of dictionaries describing the configuration parameters for your component. + - `input_schema`: A dictionary describing the expected input schema. + - `output_schema`: A dictionary describing the expected output schema. + +3. **Implement the `invoke` method**: This is the main method where your component's logic will be implemented. + +Here's a basic template for a custom component: + +```python +from solace_ai_connector.components.component_base import ComponentBase + +info = { + "class_name": "MyCustomComponent", + "config_parameters": [ + { + "name": "my_param", + "type": "string", + "required": True, + "description": "A custom parameter" + } + ], + "input_schema": { + "type": "object", + "properties": { + "input_data": {"type": "string"} + } + }, + "output_schema": { + "type": "object", + "properties": { + "output_data": {"type": "string"} + } + } +} + +class MyCustomComponent(ComponentBase): + def __init__(self, **kwargs): + super().__init__(info, **kwargs) + self.my_param = self.get_config("my_param") + + def invoke(self, message, data): + # Your custom logic here + result = f"{self.my_param}: {data['input_data']}" + return {"output_data": result} +``` + +## Overrideable Methods + +While the `invoke` method is the main one you'll implement, there are several other methods you can override to customize your component's behavior: + +1. `invoke(self, message, data)`: The main processing method for your component. +2. `get_next_event(self)`: Customize how your component receives events. +3. `send_message(self, message)`: Customize how your component sends messages to the next component. +4. `handle_timer_event(self, timer_data)`: Handle timer events if your component uses timers. +5. `handle_cache_expiry_event(self, timer_data)`: Handle cache expiry events if your component uses the cache service. +6. `process_pre_invoke(self, message)`: Customize preprocessing before `invoke` is called. +7. `process_post_invoke(self, result, message)`: Customize postprocessing after `invoke` is called. + +## Advanced Features + +Custom components can take advantage of advanced features provided by the Solace AI Connector. These include: + +- Broker request-response functionality +- Cache services +- Timer management + +For more information on these advanced features and how to use them in your custom components, please refer to the [Advanced Component Features](advanced_component_features.md) documentation. + +By creating custom components, you can extend the Solace AI Connector to meet your specific needs while still benefiting from the framework's built-in capabilities for event processing, flow management, and integration with Solace event brokers. + +## Example + +See the [Tips and Tricks page](tips_and_tricks.md) for an example of creating a custom component. + + +[] \ No newline at end of file diff --git a/docs/getting_started.md b/docs/getting_started.md index 14b8845..53d847b 100644 --- a/docs/getting_started.md +++ b/docs/getting_started.md @@ -72,7 +72,7 @@ area in the Subscriber side of the "Try me!" page. Download the OpenAI connector example configuration file: ```sh -curl https://raw.githubusercontent.com/SolaceLabs/solace-ai-connector/main/examples/llm/openai_chat.yaml > openai_chat.yaml +curl https://raw.githubusercontent.com/SolaceLabs/solace-ai-connector/refs/heads/main/examples/llm/langchain_openai_with_history_chat.yaml > langchain_openai_with_history_chat.yaml ``` For this one, you need to also define the following additional environment variables: @@ -94,7 +94,7 @@ pip install langchain_openai openai Run the connector: ```sh -solace-ai-connector openai_chat.yaml +solace-ai-connector langchain_openai_with_history_chat.yaml ``` Use the "Try Me!" function on the broker's browser UI (or some other means) to publish an event like this: diff --git a/docs/guides/RAG.md b/docs/guides/RAG.md new file mode 100644 index 0000000..218969d --- /dev/null +++ b/docs/guides/RAG.md @@ -0,0 +1,236 @@ + +- [Building AI-Powered Applications with Solace AI Connector: A Deep Dive into RAG, LLMs, and Embeddings](#building-ai-powered-applications-with-solace-ai-connector-a-deep-dive-into-rag-llms-and-embeddings) + - [What is Solace AI Connector?](#what-is-solace-ai-connector) + - [Key Concepts Behind the Configuration](#key-concepts-behind-the-configuration) + - [1. Large Language Models (LLMs)](#1-large-language-models-llms) + - [2. Retrieval-Augmented Generation (RAG)](#2-retrieval-augmented-generation-rag) + - [3. Embeddings](#3-embeddings) + - [4. Solace PubSub+ Platform](#4-solace-pubsub-platform) + - [Real-Time Data Consumption with Solace AI Connector](#real-time-data-consumption-with-solace-ai-connector) + - [Real-Time Data Embedding and Storage Flow](#real-time-data-embedding-and-storage-flow) + - [YAML Configuration Breakdown](#yaml-configuration-breakdown) + - [Logging Configuration](#logging-configuration) + - [Shared Configuration for Solace Broker](#shared-configuration-for-solace-broker) + - [Data Ingestion to ChromaDB (Embedding Flow)](#data-ingestion-to-chromadb-embedding-flow) + - [1. Solace Data Input Component](#1-solace-data-input-component) + - [2. Embedding and Storage in ChromaDB](#2-embedding-and-storage-in-chromadb) + - [RAG Inference Flow (Query and Response)](#rag-inference-flow-query-and-response) + - [1. Query Ingestion from Solace Topic](#1-query-ingestion-from-solace-topic) + - [2. ChromaDB Search for Relevant Documents](#2-chromadb-search-for-relevant-documents) + - [3. Response Generation Using OpenAI](#3-response-generation-using-openai) + - [4. Sending the Response Back to Solace](#4-sending-the-response-back-to-solace) + - [Flexibility in Components](#flexibility-in-components) + - [Conclusion](#conclusion) + +# Building AI-Powered Applications with Solace AI Connector: A Deep Dive into RAG, LLMs, and Embeddings + +In the fast-evolving world of AI, businesses are increasingly looking for ways to harness advanced technologies like **Retrieval-Augmented Generation (RAG)** and **Large Language Models (LLMs)** to provide smarter, more interactive applications. **Solace AI Connector** is one such CLI tool that allows you to create AI-powered applications interconnected to Solace's PubSub+ brokers. In this guide, we will explore the configuration of Solace AI Connector, how it can be used with technologies like **RAG**, **LLMs**, and **Embeddings**, and provide a deep understanding of these concepts. + +## What is Solace AI Connector? + +Solace AI Connector is a tool that enables AI-powered applications to interface with Solace PubSub+ brokers. By integrating Solace’s event-driven architecture with AI services (such as OpenAI’s models), you can create applications that interact with real-time data, perform knowledge retrieval, and generate intelligent responses. + +In this guide, we will walk through [a sample YAML configuration](../../examples/llm/openai_chroma_rag.yaml) that sets up two essential flows: +- **Data ingestion into a vector database** using Solace topics, embedding the data into **ChromaDB**. +- **Querying the ingested data** using **RAG**, where queries are sent to OpenAI for intelligent completion and response generation. + +## Key Concepts Behind the Configuration + +Before diving into the configuration, let’s explore the key concepts that power this setup. + +### 1. Large Language Models (LLMs) +LLMs are AI models trained on vast amounts of textual data, capable of generating human-like text. They are used in various tasks like text generation, summarization, translation, and answering complex questions. Models such as OpenAI's GPT-4o or Anthropic's Claude 3.5 Sonnet are examples of LLMs. These models can generate meaningful responses based on the context they are provided, but they also have limitations, such as the risk of hallucinations (generating incorrect or fabricated facts). + +### 2. Retrieval-Augmented Generation (RAG) +RAG is a framework that enhances the performance of LLMs by combining them with external knowledge retrieval. Instead of relying solely on the LLM’s internal knowledge, RAG retrieves relevant documents from an external database (such as ChromaDB) before generating a response. This approach enhances the accuracy of responses, as the generation process is “grounded” in factual information retrieved at the time of the query. + +### 3. Embeddings +Embeddings are numerical representations of text that capture its semantic meaning. They are crucial for many NLP tasks, as they allow models to measure similarity between different pieces of text. In the context of RAG, embedding models (such as **OpenAI Embeddings**) convert input data into vector representations that can be stored in a vector database like **ChromaDB**. When a query is issued, it is also converted into a vector, and similar documents can be retrieved based on their proximity in the vector space. + +### 4. Solace PubSub+ Platform +Solace’s PubSub+ platform provides event-driven messaging and streaming services, enabling applications to publish and subscribe to topics in real-time. In the context of AI applications, Solace acts as the message broker that facilitates the flow of data between different components (e.g., input data, queries, and responses). + +## Real-Time Data Consumption with Solace AI Connector + +One of the standout features of the Solace AI Connector is its ability to seamlessly handle real-time data. As data passes through the Solace broker, it can be consumed, embedded, and stored for future retrieval and analysis. + +### Real-Time Data Embedding and Storage Flow + +Using Solace topics, the connector can subscribe to real-time data streams. This data is processed in near real-time, where each message is embedded using an embedding model like **OpenAI Embeddings**. These embeddings are then stored in vector database like **ChromaDB** making them retrievable for future queries. + +For example, imagine a system that ingests live customer support chat messages. As each message is published to the Solace broker, the system embeds the message, stores the vector in a database, and makes it available for retrieval during future interactions or analyses. This architecture is particularly useful for applications that need to build dynamic, up-to-date knowledge bases based on streaming data. + +By leveraging the real-time messaging capabilities of Solace, the system ensures that data is continuously processed and stored in a structured, retrievable way, allowing for efficient and scalable AI-driven applications. + +## YAML Configuration Breakdown + +Let’s break down the YAML configuration provided in the example, which demonstrates how to implement RAG with Solace AI Connector and ChromaDB. + +### Logging Configuration +```yaml +log: + stdout_log_level: INFO + log_file_level: INFO + log_file: solace_ai_connector.log +``` +This section sets the logging level for the connector, ensuring that logs are captured both to the console and to a file. + +### Shared Configuration for Solace Broker +```yaml +shared_config: + - broker_config: &broker_connection + broker_type: solace + broker_url: ${SOLACE_BROKER_URL} + broker_username: ${SOLACE_BROKER_USERNAME} + broker_password: ${SOLACE_BROKER_PASSWORD} + broker_vpn: ${SOLACE_BROKER_VPN} +``` +Here, we define the connection settings for the Solace broker. These environment variables (`SOLACE_BROKER_URL`, etc.) are essential for establishing communication with the Solace messaging infrastructure. Shared configs can be reused throughout the configuration to maintain consistency. In this example, we're using it for the input and output Solace broker connections. + +### Data Ingestion to ChromaDB (Embedding Flow) + +This flow ingests data into **ChromaDB**, which is a vector database that will store embeddings of the input text. You could have used any other vector database, but for this example, we are using ChromaDB. + +#### 1. Solace Data Input Component +```yaml +- component_name: solace_data_input + component_module: broker_input + component_config: + <<: *broker_connection + broker_queue_name: demo_rag_data + broker_subscriptions: + - topic: demo/rag/data + qos: 1 + payload_encoding: utf-8 + payload_format: json +``` +This component listens to the **Solace topic** `demo/rag/data` for incoming data that needs to be embedded. It subscribes to the topic and expects the payload in JSON format with UTF-8 encoding. This is could be the real-time data stream that you want to process and embed. + +#### 2. Embedding and Storage in ChromaDB +```yaml +- component_name: chroma_embed + component_module: langchain_vector_store_embedding_index + component_config: + vector_store_component_path: langchain_chroma + vector_store_component_name: Chroma + vector_store_component_config: + persist_directory: ./chroma_data + collection_name: rag + embedding_component_path: langchain_openai + embedding_component_name: OpenAIEmbeddings + embedding_component_config: + api_key: ${OPENAI_API_KEY} + base_url: ${OPENAI_API_ENDPOINT} + model: ${OPENAI_EMBEDDING_MODEL_NAME} + input_transforms: + - type: copy + source_value: topic:demo/rag/data + dest_expression: user_data.vector_input:metadatas.source + - type: copy + source_expression: input.payload:texts + dest_expression: user_data.vector_input:texts + input_selection: + source_expression: user_data.vector_input +``` +This component uses `langchain_vector_store_embedding_index` to handle embedding logic which is a built-in component for adding and deleting embeddings to a vector database. It takes the input texts, converts them into embeddings using the OpenAI embedding model, and stores the embeddings in **ChromaDB**. ChromaDB is set to persist data in the `./chroma_data` directory. + +The data from the solace input broker is first transformed into the shape that we expect in `input_transforms`. The `input_selection` section then selects the transformed data to be used as input for the embedding process. + + +### RAG Inference Flow (Query and Response) + +This flow handles the **Retrieval-Augmented Generation (RAG)** process where a query is sent, relevant documents are retrieved, and a response is generated using OpenAI's models. + +#### 1. Query Ingestion from Solace Topic +```yaml +- component_name: solace_completion_broker + component_module: broker_input + component_config: + <<: *broker_connection + broker_queue_name: demo_rag_query + broker_subscriptions: + - topic: demo/rag/query +``` +This component listens for queries on the `demo/rag/query` Solace topic. The query is received as JSON data, and the Solace broker delivers it to the next step. + +#### 2. ChromaDB Search for Relevant Documents +```yaml +- component_name: chroma_search + component_module: langchain_vector_store_embedding_search + component_config: + vector_store_component_path: langchain_chroma + vector_store_component_name: Chroma + vector_store_component_config: + persist_directory: ./chroma_data + collection_name: rag + max_results: 5 +``` +This component searches ChromaDB for documents that are most similar to the query using the built-in `langchain_vector_store_embedding_search` component. It retrieves the top 5 results based on proximity in the vector space. + +#### 3. Response Generation Using OpenAI +```yaml +- component_name: llm_request + component_module: openai_chat_model + component_config: + api_key: ${OPENAI_API_KEY} + base_url: ${OPENAI_API_ENDPOINT} + model: ${OPENAI_MODEL_NAME} + temperature: 0.01 + input_transforms: + # Extract and format the retrieved data + - type: map + source_list_expression: previous:result + source_expression: | + template:{{text://item:text}}\n\n + dest_list_expression: user_data.retrieved_data + + - type: copy + source_expression: | + template:You are a helpful AI assistant. Using the provided context, help with the user's request below. Refrain to use any knowledge outside from the provided context. If the user query can not be answered using the provided context, reject user's query. + + + {{text://user_data.retrieved_data}} + + + + {{text://input.payload:query}} + + dest_expression: user_data.llm_input:messages.0.content + - type: copy + source_expression: static:user + dest_expression: user_data.llm_input:messages.0.role + input_selection: + source_expression: user_data.llm_input +``` +Once relevant documents are retrieved, we build the prompt with retrieved context. To prevent the model from hallucination, we ask it to refuse to answer if the answer is not provided in the given context. Then the **LLM (e.g., GPT-4o)** is used to generate a response. The temperature is set to `0.01`, meaning the response will be deterministic and focused on factual accuracy. The retrieved documents provide context for the LLM, which then generates a response based solely on this context. + +#### 4. Sending the Response Back to Solace + + +```yaml +- component_name: send_response + component_module: broker_output + component_config: + <<: *broker_connection + payload_encoding: utf-8 + payload_format: json +``` +The final component sends the generated response back to the Solace broker, specifically to the topic `demo/rag/query/response`, where the response can be consumed by the requesting application. + +## Flexibility in Components + +One of the key strengths of this architecture is its flexibility. Components like the **OpenAI connector** or **ChromaDB** can easily be swapped out for other AI service providers or vector databases. For example: +- Instead of OpenAI, you can use another LLM provider like **Cohere** or **Anthropic**. +- Instead of **ChromaDB**, you could use a different vector database like **Pinecone**, **Weaviate**, or **Milvus**. + +This modularity allows developers to adapt the system to different business requirements, AI services, and database solutions, providing greater flexibility and scalability. + +## Conclusion + +The Solace AI Connector, when combined with technologies like RAG, LLMs, and embeddings, enables a powerful AI-driven ecosystem for real-time applications. By ingesting data, embedding it into vector databases, and performing retrieval-augmented generation with LLMs, developers can build applications that provide accurate, context-aware responses to user queries fast. + +This YAML configuration serves as a template for setting up such an application, you can find the complete example in the [examples directory](../../examples/llm/openai_chroma_rag.yaml). + +## Want to Learn More About Solace AI Connector? + +Check out the [Solace AI Connector Overview](../overview.md) to explore its features in depth, or dive right in by following the [Getting Started Guide](../getting_started.md) to begin working with Solace AI Connector today! \ No newline at end of file diff --git a/docs/guides/index.md b/docs/guides/index.md new file mode 100644 index 0000000..05dfdff --- /dev/null +++ b/docs/guides/index.md @@ -0,0 +1,4 @@ +In this directory you can find complete guides on varies topics on how to use the Solace AI Connector. + + +- [Building AI-Powered Applications with Solace AI Connector: A Deep Dive into RAG, LLMs, and Embeddings](./RAG.md) \ No newline at end of file diff --git a/docs/index.md b/docs/index.md index 6450165..1b89909 100644 --- a/docs/index.md +++ b/docs/index.md @@ -9,7 +9,9 @@ This connector application makes it easy to connect your AI/ML models to Solace - [Configuration](configuration.md) - [Components](components/index.md) - [Transforms](transforms/index.md) +- [Custom Components](custom_components.md) - [Tips and Tricks](tips_and_tricks.md) +- [Guides](guides/index.md) - [Examples](../examples/) - [Contributing](../CONTRIBUTING.md) - [License](../LICENSE) diff --git a/examples/ack_test.yaml b/examples/ack_test.yaml index 08314aa..41fb2eb 100644 --- a/examples/ack_test.yaml +++ b/examples/ack_test.yaml @@ -15,7 +15,6 @@ log: shared_config: - broker_config: &broker_connection - broker_connection_share: ${SOLACE_BROKER_URL} broker_type: solace broker_url: ${SOLACE_BROKER_URL} broker_username: ${SOLACE_BROKER_USERNAME} diff --git a/examples/anthropic_bedrock.yaml b/examples/anthropic_bedrock.yaml index 7c35bb6..03a0c6c 100644 --- a/examples/anthropic_bedrock.yaml +++ b/examples/anthropic_bedrock.yaml @@ -3,7 +3,10 @@ # sends a message to an Anthropic Bedrock model, and # sends the response back to the Solace broker # It will ask the model to write a dry joke about the input -# message. It takes the entire payload of the input message +# message. +# Send a message to the Solace broker topics `my/topic1` or `my/topic2` +# with a plain text payload. The model will respond with a dry joke to the +# same topic prefixed with `response/`. (e.g. `response/my/topic1`) # # Dependencies: # pip install langchain_aws langchain_community @@ -28,12 +31,12 @@ log: shared_config: - broker_config: &broker_connection - broker_connection_share: ${SOLACE_BROKER_URL} broker_type: solace broker_url: ${SOLACE_BROKER_URL} broker_username: ${SOLACE_BROKER_USERNAME} broker_password: ${SOLACE_BROKER_PASSWORD} broker_vpn: ${SOLACE_BROKER_VPN} + payload_encoding: utf-8 # List of flows flows: @@ -51,7 +54,6 @@ flows: qos: 1 - topic: my/topic2 qos: 1 - payload_encoding: utf-8 payload_format: text - component_name: llm @@ -81,13 +83,7 @@ flows: - component_name: solace_sw_broker component_module: broker_output component_config: - broker_connection_share: ${SOLACE_BROKER_URL} - broker_type: solace - broker_url: ${SOLACE_BROKER_URL} - broker_username: ${SOLACE_BROKER_USERNAME} - broker_password: ${SOLACE_BROKER_PASSWORD} - broker_vpn: ${SOLACE_BROKER_VPN} - payload_encoding: utf-8 + <<: *broker_connection payload_format: text input_transforms: - type: copy diff --git a/examples/error_handler.yaml b/examples/error_handler.yaml index a8c700e..f227794 100644 --- a/examples/error_handler.yaml +++ b/examples/error_handler.yaml @@ -25,7 +25,6 @@ log: shared_config: - broker_config: &broker_connection - broker_connection_share: ${SOLACE_BROKER_URL} broker_type: solace broker_url: ${SOLACE_BROKER_URL} broker_username: ${SOLACE_BROKER_USERNAME} diff --git a/examples/llm/custom_components/__init__.py b/examples/llm/custom_components/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/llm/custom_components/llm_streaming_custom_component.py b/examples/llm/custom_components/llm_streaming_custom_component.py new file mode 100644 index 0000000..a1363d4 --- /dev/null +++ b/examples/llm/custom_components/llm_streaming_custom_component.py @@ -0,0 +1,52 @@ +# A simple pass-through component - what goes in comes out + +import sys + +sys.path.append("src") + +from solace_ai_connector.components.component_base import ComponentBase +from solace_ai_connector.common.message import Message + + +info = { + "class_name": "LlmStreamingCustomComponent", + "description": "Do a blocking LLM request/response", + "config_parameters": [ + { + "name": "llm_request_topic", + "description": "The topic to send the request to", + "type": "string", + } + ], + "input_schema": { + "type": "object", + "properties": {}, + }, + "output_schema": { + "type": "object", + "properties": {}, + }, +} + + +class LlmStreamingCustomComponent(ComponentBase): + def __init__(self, **kwargs): + super().__init__(info, **kwargs) + self.llm_request_topic = self.get_config("llm_request_topic") + + def invoke(self, message, data): + llm_message = Message(payload=data, topic=self.llm_request_topic) + for message, last_message in self.do_broker_request_response( + llm_message, + stream=True, + streaming_complete_expression="input.payload:last_chunk", + ): + text = message.get_data("input.payload:chunk") + if not text: + text = message.get_data("input.payload:content") or "no response" + if last_message: + return {"chunk": text} + self.output_streaming(message, {"chunk": text}) + + def output_streaming(self, message, data): + return self.process_post_invoke(data, message) diff --git a/examples/llm/openai_component_request_response.yaml b/examples/llm/openai_component_request_response.yaml new file mode 100644 index 0000000..bb102a6 --- /dev/null +++ b/examples/llm/openai_component_request_response.yaml @@ -0,0 +1,147 @@ +# This example demostrates how to use the request_response_flow_controller to +# inject another flow into an existing flow. This is commonly used when +# you want to call a service that is only accessible via the broker. +# +# Main flow: STDIN -> llm_streaming_custom_component -> STDOUT +# | ^ +# v | +# do_broker_request_response() +# | ^ +# v | +# Broker Broker +# +# +# LLM flow: Broker -> OpenAI -> Broker +# +# +# While this looks a bit complicated, it allows you to very easily use all +# the benefits of the broker to distribute service requests, such as load +# balancing, failover, and scaling to LLMs. +# +# It will subscribe to `demo/question` and expect an event with the payload: +# +# The input message has the following schema: +# { +# "text": "" +# } +# +# It will then send an event back to Solace with the topic: `demo/question/response` +# +# Dependencies: +# pip install -U langchain_openai openai +# +# required ENV variables: +# - OPENAI_API_KEY +# - OPENAI_API_ENDPOINT +# - OPENAI_MODEL_NAME +# - SOLACE_BROKER_URL +# - SOLACE_BROKER_USERNAME +# - SOLACE_BROKER_PASSWORD +# - SOLACE_BROKER_VPN + +--- +log: + stdout_log_level: INFO + log_file_level: DEBUG + log_file: solace_ai_connector.log + +shared_config: + - broker_config: &broker_connection + broker_type: solace + broker_url: ${SOLACE_BROKER_URL} + broker_username: ${SOLACE_BROKER_USERNAME} + broker_password: ${SOLACE_BROKER_PASSWORD} + broker_vpn: ${SOLACE_BROKER_VPN} + +# Take from input broker and publish back to Solace +flows: + # broker input processing + - name: main_flow + components: + # Input from a Solace broker + - component_name: input + component_module: stdin_input + + # Our custom component + - component_name: llm_streaming_custom_component + component_module: llm_streaming_custom_component + # Relative path to the component + component_base_path: examples/llm/custom_components + component_config: + llm_request_topic: example/llm/best + broker_request_response: + enabled: true + broker_config: *broker_connection + request_expiry_ms: 60000 + payload_encoding: utf-8 + payload_format: json + input_transforms: + - type: copy + source_expression: | + template:You are a helpful AI assistant. Please help with the user's request below: + + {{text://input.payload:text}} + + dest_expression: user_data.llm_input:messages.0.content + - type: copy + source_expression: static:user + dest_expression: user_data.llm_input:messages.0.role + input_selection: + source_expression: user_data.llm_input + + # Send response to stdout + - component_name: send_response + component_module: stdout_output + component_config: + add_new_line_between_messages: false + input_selection: + source_expression: previous:chunk + + + + # The LLM flow that is accessible via the broker + - name: llm_flow + components: + # Input from a Solace broker + - component_name: solace_sw_broker + component_module: broker_input + component_config: + <<: *broker_connection + broker_queue_name: example_flow_streaming + broker_subscriptions: + - topic: example/llm/best + qos: 1 + payload_encoding: utf-8 + payload_format: json + + # Do an LLM request + - component_name: llm_request + component_module: openai_chat_model + component_config: + api_key: ${OPENAI_API_KEY} + base_url: ${OPENAI_API_ENDPOINT} + model: ${MODEL_NAME} + temperature: 0.01 + llm_mode: stream + stream_to_next_component: true + stream_batch_size: 20 + input_selection: + source_expression: input.payload + + # Send response back to broker + - component_name: send_response + component_module: broker_output + component_config: + <<: *broker_connection + payload_encoding: utf-8 + payload_format: json + copy_user_properties: true + input_transforms: + - type: copy + source_expression: previous + dest_expression: user_data.output:payload + - type: copy + source_expression: input.user_properties:__solace_ai_connector_broker_request_reply_topic__ + dest_expression: user_data.output:topic + input_selection: + source_expression: user_data.output \ No newline at end of file diff --git a/examples/request_reply.yaml b/examples/request_reply.yaml index 3cdae47..69e5834 100644 --- a/examples/request_reply.yaml +++ b/examples/request_reply.yaml @@ -16,7 +16,6 @@ log: shared_config: - broker_config: &broker_connection - broker_connection_share: ${SOLACE_BROKER_URL} broker_type: solace broker_url: ${SOLACE_BROKER_URL} broker_username: ${SOLACE_BROKER_USERNAME} diff --git a/pyproject.toml b/pyproject.toml index 7c1f63a..5fbd262 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,7 @@ authors = [ ] description = "Solace AI Connector - make it easy to connect Solace PubSub+ Event Brokers to AI/ML frameworks" readme = "README.md" -requires-python = ">=3.8" +requires-python = ">=3.9" classifiers = [ "Programming Language :: Python :: 3", "License :: OSI Approved :: MIT License", @@ -19,8 +19,8 @@ classifiers = [ ] dependencies = [ "boto3~=1.34.122", - "langchain_core~=0.2.5", - "langchain~=0.2.3", + "langchain-core~=0.3.0", + "langchain~=0.3.0", "PyYAML~=6.0.1", "Requests~=2.32.3", "solace_pubsubplus>=1.8.0", diff --git a/requirements.txt b/requirements.txt index 702aa66..66ec399 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ boto3~=1.34.122 -langchain_core~=0.2.5 -langchain~=0.2.3 +langchain-core~=0.3.0 +langchain~=0.3.0 PyYAML~=6.0.1 Requests~=2.32.3 solace_pubsubplus~=1.8.0 diff --git a/src/solace_ai_connector/common/event.py b/src/solace_ai_connector/common/event.py index cc302c2..98f4217 100644 --- a/src/solace_ai_connector/common/event.py +++ b/src/solace_ai_connector/common/event.py @@ -6,7 +6,10 @@ class EventType(Enum): MESSAGE = "message" TIMER = "timer" CACHE_EXPIRY = "cache_expiry" - # Add more event types as needed + # Add more event types as need + + def __eq__(self, other): + return self.value == other.value class Event: diff --git a/src/solace_ai_connector/common/utils.py b/src/solace_ai_connector/common/utils.py index 003e2ff..4996c3b 100644 --- a/src/solace_ai_connector/common/utils.py +++ b/src/solace_ai_connector/common/utils.py @@ -6,6 +6,7 @@ import re import builtins import subprocess +import types from .log import log @@ -94,8 +95,11 @@ def resolve_config_values(config, allow_source_expression=False): return config -def import_module(name, base_path=None, component_package=None): - """Import a module by name""" +def import_module(module, base_path=None, component_package=None): + """Import a module by name or return the module object if it's already imported""" + + if isinstance(module, types.ModuleType): + return module if component_package: install_package(component_package) @@ -104,14 +108,13 @@ def import_module(name, base_path=None, component_package=None): if base_path not in sys.path: sys.path.append(base_path) try: - module = importlib.import_module(name) - return module + return importlib.import_module(module) except ModuleNotFoundError as exc: # If the module does not have a path associated with it, try # importing it from the known prefixes - annoying that this # is necessary. It seems you can't dynamically import a module # that is listed in an __init__.py file :( - if "." not in name: + if "." not in module: for prefix_prefix in ["solace_ai_connector", "."]: for prefix in [ ".components", @@ -123,22 +126,21 @@ def import_module(name, base_path=None, component_package=None): ".transforms", ".common", ]: - full_name = f"{prefix_prefix}{prefix}.{name}" + full_name = f"{prefix_prefix}{prefix}.{module}" try: if full_name.startswith("."): - module = importlib.import_module( + return importlib.import_module( full_name, package=__package__ ) else: - module = importlib.import_module(full_name) - return module + return importlib.import_module(full_name) except ModuleNotFoundError: pass except Exception as e: raise ImportError( f"Module load error for {full_name}: {e}" ) from e - raise ModuleNotFoundError(f"Module '{name}' not found") from exc + raise ModuleNotFoundError(f"Module '{module}' not found") from exc def invoke_config(config, allow_source_expression=False): diff --git a/src/solace_ai_connector/components/component_base.py b/src/solace_ai_connector/components/component_base.py index bd4c52c..f059c06 100644 --- a/src/solace_ai_connector/components/component_base.py +++ b/src/solace_ai_connector/components/component_base.py @@ -10,6 +10,7 @@ from ..common.message import Message from ..common.trace_message import TraceMessage from ..common.event import Event, EventType +from ..flow.request_response_flow_controller import RequestResponseFlowController DEFAULT_QUEUE_TIMEOUT_MS = 200 DEFAULT_QUEUE_MAX_DEPTH = 5 @@ -34,6 +35,9 @@ def __init__(self, module_info, **kwargs): self.cache_service = kwargs.pop("cache_service", None) self.component_config = self.config.get("component_config") or {} + self.broker_request_response_config = self.config.get( + "broker_request_response", None + ) self.name = self.config.get("component_name", "") resolve_config_values(self.component_config) @@ -51,6 +55,7 @@ def __init__(self, module_info, **kwargs): self.validate_config() self.setup_transforms() self.setup_communications() + self.setup_broker_request_response() def create_thread_and_run(self): self.thread = threading.Thread(target=self.run) @@ -66,6 +71,8 @@ def run(self): if self.trace_queue: self.trace_event(event) self.process_event(event) + except AssertionError as e: + raise e except Exception as e: log.error( "%sComponent has crashed: %s\n%s", @@ -214,7 +221,15 @@ def get_config(self, key=None, default=None): val = self.component_config.get(key, None) if val is None: val = self.config.get(key, default) - if callable(val): + + # We reserve a few callable function names for internal use + # They are used for the handler_callback component which is used + # in testing (search the tests directory for example uses) + if callable(val) and key not in [ + "invoke_handler", + "get_next_event_handler", + "send_message_handler", + ]: if self.current_message is None: raise ValueError( f"Component {self.log_identifier} is trying to use an `invoke` config " @@ -262,6 +277,32 @@ def setup_communications(self): else: self.input_queue = queue.Queue(maxsize=self.queue_max_depth) + def setup_broker_request_response(self): + if ( + not self.broker_request_response_config + or not self.broker_request_response_config.get("enabled", False) + ): + self.broker_request_response_controller = None + return + broker_config = self.broker_request_response_config.get("broker_config", {}) + request_expiry_ms = self.broker_request_response_config.get( + "request_expiry_ms", 30000 + ) + if not broker_config: + raise ValueError( + f"Broker request response config not found for component {self.name}" + ) + rrc_config = { + "broker_config": broker_config, + "request_expiry_ms": request_expiry_ms, + } + self.broker_request_response_controller = RequestResponseFlowController( + config=rrc_config, connector=self.connector + ) + + def is_broker_request_response_enabled(self): + return self.broker_request_response_controller is not None + def setup_transforms(self): self.transforms = Transforms( self.config.get("input_transforms", []), log_identifier=self.log_identifier @@ -365,3 +406,26 @@ def cleanup(self): self.input_queue.get_nowait() except queue.Empty: break + + # This should be used to do an on-the-fly broker request response + def do_broker_request_response( + self, message, stream=False, streaming_complete_expression=None + ): + if self.broker_request_response_controller: + if stream: + return ( + self.broker_request_response_controller.do_broker_request_response( + message, stream, streaming_complete_expression + ) + ) + else: + generator = ( + self.broker_request_response_controller.do_broker_request_response( + message + ) + ) + next_message, last = next(generator, None) + return next_message + raise ValueError( + f"Broker request response controller not found for component {self.name}" + ) diff --git a/src/solace_ai_connector/components/general/delay.py b/src/solace_ai_connector/components/general/delay.py index d4a05d0..8d8aaf0 100644 --- a/src/solace_ai_connector/components/general/delay.py +++ b/src/solace_ai_connector/components/general/delay.py @@ -38,5 +38,6 @@ def __init__(self, **kwargs): super().__init__(info, **kwargs) def invoke(self, message, data): - sleep(self.get_config("delay")) + delay = self.get_config("delay") + sleep(delay) return deepcopy(data) diff --git a/src/solace_ai_connector/components/general/for_testing/handler_callback.py b/src/solace_ai_connector/components/general/for_testing/handler_callback.py new file mode 100644 index 0000000..12d0ea7 --- /dev/null +++ b/src/solace_ai_connector/components/general/for_testing/handler_callback.py @@ -0,0 +1,67 @@ +"""This test component allows a tester to configure callback handlers for + get_next_event, send_message and invoke methods""" + +from ...component_base import ComponentBase + + +info = { + "class_name": "HandlerCallback", + "description": ( + "This test component allows a tester to configure callback handlers for " + "get_next_event, send_message and invoke methods" + ), + "config_parameters": [ + { + "name": "get_next_event_handler", + "required": False, + "description": "The callback handler for the get_next_event method", + "type": "function", + }, + { + "name": "send_message_handler", + "required": False, + "description": "The callback handler for the send_message method", + "type": "function", + }, + { + "name": "invoke_handler", + "required": False, + "description": "The callback handler for the invoke method", + "type": "function", + }, + ], + "input_schema": { + "type": "object", + "properties": {}, + }, + "output_schema": { + "type": "object", + "properties": {}, + }, +} + + +class HandlerCallback(ComponentBase): + def __init__(self, **kwargs): + super().__init__(info, **kwargs) + self.get_next_event_handler = self.get_config("get_next_event_handler") + self.send_message_handler = self.get_config("send_message_handler") + self.invoke_handler = self.get_config("invoke_handler") + + def get_next_event(self): + if self.get_next_event_handler: + return self.get_next_event_handler(self) + else: + return super().get_next_event() + + def send_message(self, message): + if self.send_message_handler: + return self.send_message_handler(self, message) + else: + return super().send_message(message) + + def invoke(self, message, data): + if self.invoke_handler: + return self.invoke_handler(self, message, data) + else: + return super().invoke(message, data) diff --git a/src/solace_ai_connector/components/general/openai/openai_chat_model_base.py b/src/solace_ai_connector/components/general/openai/openai_chat_model_base.py index 6a0884b..0ce6c90 100644 --- a/src/solace_ai_connector/components/general/openai/openai_chat_model_base.py +++ b/src/solace_ai_connector/components/general/openai/openai_chat_model_base.py @@ -1,9 +1,10 @@ """Base class for OpenAI chat models""" +import uuid + from openai import OpenAI from ...component_base import ComponentBase from ....common.message import Message -import uuid openai_info_base = { "class_name": "OpenAIChatModelBase", @@ -34,13 +35,29 @@ { "name": "stream_to_flow", "required": False, - "description": "Name the flow to stream the output to - this must be configured for llm_mode='stream'.", + "description": ( + "Name the flow to stream the output to - this must be configured for " + "llm_mode='stream'. This is mutually exclusive with stream_to_next_component." + ), "default": "", }, + { + "name": "stream_to_next_component", + "required": False, + "description": ( + "Whether to stream the output to the next component in the flow. " + "This is mutually exclusive with stream_to_flow." + ), + "default": False, + }, { "name": "llm_mode", "required": False, - "description": "The mode for streaming results: 'sync' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response.", + "description": ( + "The mode for streaming results: 'sync' or 'stream'. 'stream' " + "will just stream the results to the named flow. 'none' will " + "wait for the full response." + ), "default": "none", }, { @@ -52,7 +69,11 @@ { "name": "set_response_uuid_in_user_properties", "required": False, - "description": "Whether to set the response_uuid in the user_properties of the input_message. This will allow other components to correlate streaming chunks with the full response.", + "description": ( + "Whether to set the response_uuid in the user_properties of the " + "input_message. This will allow other components to correlate " + "streaming chunks with the full response." + ), "default": False, "type": "boolean", }, @@ -90,8 +111,6 @@ } -import uuid - class OpenAIChatModelBase(ComponentBase): def __init__(self, module_info, **kwargs): super().__init__(module_info, **kwargs) @@ -101,16 +120,22 @@ def init(self): self.model = self.get_config("model") self.temperature = self.get_config("temperature") self.stream_to_flow = self.get_config("stream_to_flow") + self.stream_to_next_component = self.get_config("stream_to_next_component") self.llm_mode = self.get_config("llm_mode") self.stream_batch_size = self.get_config("stream_batch_size") - self.set_response_uuid_in_user_properties = self.get_config("set_response_uuid_in_user_properties") + self.set_response_uuid_in_user_properties = self.get_config( + "set_response_uuid_in_user_properties" + ) + if self.stream_to_flow and self.stream_to_next_component: + raise ValueError( + "stream_to_flow and stream_to_next_component are mutually exclusive" + ) def invoke(self, message, data): messages = data.get("messages", []) client = OpenAI( - api_key=self.get_config("api_key"), - base_url=self.get_config("base_url") + api_key=self.get_config("api_key"), base_url=self.get_config("base_url") ) if self.llm_mode == "stream": @@ -134,7 +159,7 @@ def invoke_stream(self, client, message, messages): messages=messages, model=self.model, temperature=self.temperature, - stream=True + stream=True, ): if chunk.choices[0].delta.content is not None: content = chunk.choices[0].delta.content @@ -148,11 +173,31 @@ def invoke_stream(self, client, message, messages): aggregate_result, response_uuid, first_chunk, - False + False, + ) + elif self.stream_to_next_component: + self.send_to_next_component( + message, + current_batch, + aggregate_result, + response_uuid, + first_chunk, + False, ) current_batch = "" first_chunk = False + if self.stream_to_next_component: + # Just return the last chunk + return { + "content": aggregate_result, + "chunk": current_batch, + "uuid": response_uuid, + "first_chunk": first_chunk, + "last_chunk": True, + "streaming": True, + } + if self.stream_to_flow: self.send_streaming_message( message, @@ -160,12 +205,20 @@ def invoke_stream(self, client, message, messages): aggregate_result, response_uuid, first_chunk, - True + True, ) return {"content": aggregate_result, "uuid": response_uuid} - def send_streaming_message(self, input_message, chunk, aggregate_result, response_uuid, first_chunk=False, last_chunk=False): + def send_streaming_message( + self, + input_message, + chunk, + aggregate_result, + response_uuid, + first_chunk=False, + last_chunk=False, + ): message = Message( payload={ "chunk": chunk, @@ -177,3 +230,34 @@ def send_streaming_message(self, input_message, chunk, aggregate_result, respons user_properties=input_message.get_user_properties(), ) self.send_to_flow(self.stream_to_flow, message) + + def send_to_next_component( + self, + input_message, + chunk, + aggregate_result, + response_uuid, + first_chunk=False, + last_chunk=False, + ): + message = Message( + payload={ + "chunk": chunk, + "aggregate_result": aggregate_result, + "response_uuid": response_uuid, + "first_chunk": first_chunk, + "last_chunk": last_chunk, + }, + user_properties=input_message.get_user_properties(), + ) + + result = { + "content": aggregate_result, + "chunk": chunk, + "uuid": response_uuid, + "first_chunk": first_chunk, + "last_chunk": last_chunk, + "streaming": True, + } + + self.process_post_invoke(result, message) diff --git a/src/solace_ai_connector/components/general/openai/openai_chat_model_with_history.py b/src/solace_ai_connector/components/general/openai/openai_chat_model_with_history.py index 5fde36f..ba7fe64 100644 --- a/src/solace_ai_connector/components/general/openai/openai_chat_model_with_history.py +++ b/src/solace_ai_connector/components/general/openai/openai_chat_model_with_history.py @@ -46,6 +46,11 @@ def __init__(self, **kwargs): def invoke(self, message, data): session_id = data.get("session_id") clear_history_but_keep_depth = data.get("clear_history_but_keep_depth") + try: + if clear_history_but_keep_depth is not None: + clear_history_but_keep_depth = max(0, int(clear_history_but_keep_depth)) + except (TypeError, ValueError): + clear_history_but_keep_depth = 0 messages = data.get("messages", []) with self.get_lock(self.history_key): diff --git a/src/solace_ai_connector/components/inputs_outputs/broker_base.py b/src/solace_ai_connector/components/inputs_outputs/broker_base.py index b6fd113..fac4207 100644 --- a/src/solace_ai_connector/components/inputs_outputs/broker_base.py +++ b/src/solace_ai_connector/components/inputs_outputs/broker_base.py @@ -37,9 +37,12 @@ class BrokerBase(ComponentBase): def __init__(self, module_info, **kwargs): super().__init__(module_info, **kwargs) self.broker_properties = self.get_broker_properties() - self.messaging_service = ( - MessagingServiceBuilder().from_properties(self.broker_properties).build() - ) + if self.broker_properties["broker_type"] not in ["test", "test_streaming"]: + self.messaging_service = ( + MessagingServiceBuilder() + .from_properties(self.broker_properties) + .build() + ) self.current_broker_message = None self.messages_to_ack = [] self.connected = False diff --git a/src/solace_ai_connector/components/inputs_outputs/broker_input.py b/src/solace_ai_connector/components/inputs_outputs/broker_input.py index 841c8ee..3aabd8d 100644 --- a/src/solace_ai_connector/components/inputs_outputs/broker_input.py +++ b/src/solace_ai_connector/components/inputs_outputs/broker_input.py @@ -44,7 +44,8 @@ { "name": "temporary_queue", "required": False, - "description": "Whether to create a temporary queue that will be deleted after disconnection, defaulted to True if broker_queue_name is not provided", + "description": "Whether to create a temporary queue that will be deleted " + "after disconnection, defaulted to True if broker_queue_name is not provided", "default": False, }, { diff --git a/src/solace_ai_connector/components/inputs_outputs/broker_request_response.py b/src/solace_ai_connector/components/inputs_outputs/broker_request_response.py index 3e4a2cd..b363776 100644 --- a/src/solace_ai_connector/components/inputs_outputs/broker_request_response.py +++ b/src/solace_ai_connector/components/inputs_outputs/broker_request_response.py @@ -3,6 +3,8 @@ import threading import uuid import json +import queue +from copy import deepcopy # from typing import Dict, Any @@ -63,6 +65,19 @@ "description": "Expiry time for cached requests in milliseconds", "default": 60000, }, + { + "name": "streaming", + "required": False, + "description": "The response will arrive in multiple pieces. If True, " + "the streaming_complete_expression must be set and will be used to " + "determine when the last piece has arrived.", + }, + { + "name": "streaming_complete_expression", + "required": False, + "description": "The source expression to determine when the last piece of a " + "streaming response has arrived.", + }, ], "input_schema": { "type": "object", @@ -79,6 +94,16 @@ "type": "object", "description": "User properties to send with the request message", }, + "stream": { + "type": "boolean", + "description": "Whether this will have a streaming response", + "default": False, + }, + "streaming_complete_expression": { + "type": "string", + "description": "Expression to determine when the last piece of a " + "streaming response has arrived. Required if stream is True.", + }, }, "required": ["payload", "topic"], }, @@ -115,6 +140,11 @@ def __init__(self, **kwargs): self.reply_queue_name = f"reply-queue-{uuid.uuid4()}" self.reply_topic = f"reply/{uuid.uuid4()}" self.response_thread = None + self.streaming = self.get_config("streaming") + self.streaming_complete_expression = self.get_config( + "streaming_complete_expression" + ) + self.broker_type = self.broker_properties.get("broker_type", "solace") self.broker_properties["temporary_queue"] = True self.broker_properties["queue_name"] = self.reply_queue_name self.broker_properties["subscriptions"] = [ @@ -123,7 +153,13 @@ def __init__(self, **kwargs): "qos": 1, } ] - self.connect() + self.test_mode = False + + if self.broker_type == "solace": + self.connect() + elif self.broker_type == "test" or self.broker_type == "test_streaming": + self.test_mode = True + self.setup_test_pass_through() self.start() def start(self): @@ -135,8 +171,16 @@ def setup_reply_queue(self): self.reply_queue_name, [self.reply_topic], temporary=True ) + def setup_test_pass_through(self): + self.pass_through_queue = queue.Queue() + def start_response_thread(self): - self.response_thread = threading.Thread(target=self.handle_responses) + if self.test_mode: + self.response_thread = threading.Thread( + target=self.handle_test_pass_through + ) + else: + self.response_thread = threading.Thread(target=self.handle_responses) self.response_thread.start() def handle_responses(self): @@ -148,61 +192,76 @@ def handle_responses(self): except Exception as e: log.error("Error handling response: %s", e) + def handle_test_pass_through(self): + while not self.stop_signal.is_set(): + try: + message = self.pass_through_queue.get(timeout=1) + decoded_payload = self.decode_payload(message.get_payload()) + message.set_payload(decoded_payload) + self.process_response(message) + except queue.Empty: + continue + except Exception as e: + log.error("Error handling test passthrough: %s", e) + def process_response(self, broker_message): - payload = broker_message.get_payload_as_string() - if payload is None: - payload = broker_message.get_payload_as_bytes() - payload = self.decode_payload(payload) - topic = broker_message.get_destination_name() - user_properties = broker_message.get_properties() + if self.test_mode: + payload = broker_message.get_payload() + topic = broker_message.get_topic() + user_properties = broker_message.get_user_properties() + else: + payload = broker_message.get_payload_as_string() + if payload is None: + payload = broker_message.get_payload_as_bytes() + payload = self.decode_payload(payload) + topic = broker_message.get_destination_name() + user_properties = broker_message.get_properties() metadata_json = user_properties.get( "__solace_ai_connector_broker_request_reply_metadata__" ) if not metadata_json: - log.warning("Received response without metadata: %s", payload) + log.error("Received response without metadata: %s", payload) return try: metadata_stack = json.loads(metadata_json) except json.JSONDecodeError: - log.warning( - "Received response with invalid metadata JSON: %s", metadata_json - ) + log.error("Received response with invalid metadata JSON: %s", metadata_json) return if not metadata_stack: - log.warning("Received response with empty metadata stack: %s", payload) + log.error("Received response with empty metadata stack: %s", payload) return try: current_metadata = metadata_stack.pop() except IndexError: - log.warning( + log.error( "Received response with invalid metadata stack: %s", metadata_stack ) return request_id = current_metadata.get("request_id") if not request_id: - log.warning("Received response without request_id in metadata: %s", payload) + log.error("Received response without request_id in metadata: %s", payload) return cached_request = self.cache_service.get_data(request_id) if not cached_request: - log.warning("Received response for unknown request_id: %s", request_id) + log.error("Received response for unknown request_id: %s", request_id) return + stream = cached_request.get("stream", False) + streaming_complete_expression = cached_request.get( + "streaming_complete_expression" + ) + response = { "payload": payload, "topic": topic, "user_properties": user_properties, } - result = { - "request": cached_request, - "response": response, - } - # Update the metadata in the response if metadata_stack: response["user_properties"][ @@ -221,8 +280,23 @@ def process_response(self, broker_message): "__solace_ai_connector_broker_request_reply_topic__", None ) - self.process_post_invoke(result, Message(payload=result)) - self.cache_service.remove_data(request_id) + message = Message( + payload=payload, + user_properties=user_properties, + topic=topic, + ) + self.process_post_invoke(response, message) + + # Only remove the cache entry if this isn't a streaming response or + # if it is the last piece of a streaming response + last_piece = True + if stream and streaming_complete_expression: + is_last = message.get_data(streaming_complete_expression) + if not is_last: + last_piece = False + + if last_piece: + self.cache_service.remove_data(request_id) def invoke(self, message, data): request_id = str(uuid.uuid4()) @@ -230,6 +304,12 @@ def invoke(self, message, data): if "user_properties" not in data: data["user_properties"] = {} + stream = False + if "stream" in data: + stream = data["stream"] + if "streaming_complete_expression" in data: + streaming_complete_expression = data["streaming_complete_expression"] + metadata = {"request_id": request_id, "reply_topic": self.reply_topic} if ( @@ -266,13 +346,39 @@ def invoke(self, message, data): "__solace_ai_connector_broker_request_reply_topic__" ] = self.reply_topic - encoded_payload = self.encode_payload(data["payload"]) + if self.test_mode: + if self.broker_type == "test_streaming": + # The payload should be an array. Send one message per item in the array + if not isinstance(data["payload"], list): + raise ValueError("Payload must be a list for test_streaming broker") + for item in data["payload"]: + encoded_payload = self.encode_payload(item) + self.pass_through_queue.put( + Message( + payload=encoded_payload, + user_properties=deepcopy(data["user_properties"]), + topic=data["topic"], + ) + ) + else: + encoded_payload = self.encode_payload(data["payload"]) + self.pass_through_queue.put( + Message( + payload=encoded_payload, + user_properties=data["user_properties"], + topic=data["topic"], + ) + ) + else: + encoded_payload = self.encode_payload(data["payload"]) + self.messaging_service.send_message( + destination_name=data["topic"], + payload=encoded_payload, + user_properties=data["user_properties"], + ) - self.messaging_service.send_message( - destination_name=data["topic"], - payload=encoded_payload, - user_properties=data["user_properties"], - ) + data["stream"] = stream + data["streaming_complete_expression"] = streaming_complete_expression self.cache_service.add_data( key=request_id, diff --git a/src/solace_ai_connector/components/inputs_outputs/stdin_input.py b/src/solace_ai_connector/components/inputs_outputs/stdin_input.py index a4fb83e..568333c 100644 --- a/src/solace_ai_connector/components/inputs_outputs/stdin_input.py +++ b/src/solace_ai_connector/components/inputs_outputs/stdin_input.py @@ -1,5 +1,7 @@ # An input component that reads from STDIN +import threading + from copy import deepcopy from ..component_base import ComponentBase from ...common.message import Message @@ -9,9 +11,17 @@ "class_name": "Stdin", "description": ( "STDIN input component. The component will prompt for " - "input, which will then be placed in the message payload using the output schema below." + "input, which will then be placed in the message payload using the output schema below. " + "The component will wait for its output message to be acknowledged before prompting for " + "the next input." ), - "config_parameters": [], + "config_parameters": [ + { + "name": "prompt", + "required": False, + "description": "The prompt to display when asking for input", + } + ], "output_schema": { "type": "object", "properties": { @@ -27,16 +37,28 @@ class Stdin(ComponentBase): def __init__(self, **kwargs): super().__init__(info, **kwargs) + self.need_acknowledgement = True + self.next_input_signal = threading.Event() + self.next_input_signal.set() def get_next_message(self): + # Wait for the next input signal + self.next_input_signal.wait() + + # Reset the event for the next use + self.next_input_signal.clear() + # Get the next message from STDIN - obj = {"text": input(self.config.get("prompt", "Enter text: "))} + obj = {"text": input(self.config.get("prompt", "\nEnter text: "))} # Create and return a message object return Message(payload=obj) - # def get_input_data(self, message): - # return message.payload - def invoke(self, message, data): return deepcopy(message.get_payload()) + + def acknowledge_message(self): + self.next_input_signal.set() + + def get_acknowledgement_callback(self): + return self.acknowledge_message diff --git a/src/solace_ai_connector/components/inputs_outputs/stdout_output.py b/src/solace_ai_connector/components/inputs_outputs/stdout_output.py index 0309f1f..edba4aa 100644 --- a/src/solace_ai_connector/components/inputs_outputs/stdout_output.py +++ b/src/solace_ai_connector/components/inputs_outputs/stdout_output.py @@ -6,7 +6,15 @@ info = { "class_name": "Stdout", "description": "STDOUT output component", - "config_parameters": [], + "config_parameters": [ + { + "name": "add_new_line_between_messages", + "required": False, + "description": "Add a new line between messages", + "type": "boolean", + "default": True, + } + ], "input_schema": { "type": "object", "properties": { @@ -22,8 +30,15 @@ class Stdout(ComponentBase): def __init__(self, **kwargs): super().__init__(info, **kwargs) + self.add_newline = self.get_config("add_new_line_between_messages") def invoke(self, message, data): # Print the message to STDOUT - print(yaml.dump(data)) + if isinstance(data, dict) or isinstance(data, list): + print(yaml.dump(data)) + else: + print(data, end="") + if self.add_newline: + print() + return data diff --git a/src/solace_ai_connector/flow/flow.py b/src/solace_ai_connector/flow/flow.py index 9adfb27..ea5091c 100644 --- a/src/solace_ai_connector/flow/flow.py +++ b/src/solace_ai_connector/flow/flow.py @@ -66,6 +66,9 @@ def __init__( self.cache_service = connector.cache_service if connector else None self.create_components() + def get_input_queue(self): + return self.flow_input_queue + def create_components(self): # Loop through the components and create them for index, component in enumerate(self.flow_config.get("components", [])): @@ -77,23 +80,21 @@ def create_components(self): for component in component_group: component.set_next_component(self.component_groups[index + 1][0]) + self.flow_input_queue = self.component_groups[0][0].get_input_queue() + + def run(self): # Now one more time to create threads and run them - for index, component_group in enumerate(self.component_groups): + for _index, component_group in enumerate(self.component_groups): for component in component_group: thread = component.create_thread_and_run() self.threads.append(thread) - self.flow_input_queue = self.component_groups[0][0].get_input_queue() - def create_component_group(self, component, index): component_module = component.get("component_module", "") base_path = component.get("component_base_path", None) component_package = component.get("component_package", None) num_instances = component.get("num_instances", 1) - # component_config = component.get("component_config", {}) - # component_name = component.get("component_name", "") - # imported_module = import_from_directories(component_module) imported_module = import_module(component_module, base_path, component_package) try: @@ -136,6 +137,12 @@ def create_component_group(self, component, index): def get_flow_input_queue(self): return self.flow_input_queue + # This will set the next component in all the components in the + # last component group + def set_next_component(self, component): + for comp in self.component_groups[-1]: + comp.set_next_component(component) + def wait_for_threads(self): for thread in self.threads: thread.join() diff --git a/src/solace_ai_connector/flow/request_response_flow_controller.py b/src/solace_ai_connector/flow/request_response_flow_controller.py new file mode 100644 index 0000000..36dda71 --- /dev/null +++ b/src/solace_ai_connector/flow/request_response_flow_controller.py @@ -0,0 +1,156 @@ +""" +This file will handle sending a message to a named flow and then +receiving the output message from that flow. It will also support the result +message being a streamed message that comes in multiple parts. + +Each component can optionally create multiple of these using the configuration: + +```yaml +- name: example_flow + components: + - component_name: example_component + component_module: custom_component + request_response_flow_controllers: + - name: example_controller + flow_name: llm_flow + streaming: true + streaming_complete_expression: input.payload:streaming.last_message + request_expiry_ms: 300000 +``` + +""" + +import queue +import time +from typing import Dict, Any + +from ..common.message import Message +from ..common.event import Event, EventType + + +# This is a very basic component which will be stitched onto the final component in the flow +class RequestResponseControllerOuputComponent: + def __init__(self, controller): + self.controller = controller + + def enqueue(self, event): + self.controller.enqueue_response(event) + + +# This is the main class that will be used to send messages to a flow and receive the response +class RequestResponseFlowController: + def __init__(self, config: Dict[str, Any], connector): + self.config = config + self.connector = connector + self.broker_config = config.get("broker_config") + self.request_expiry_ms = config.get("request_expiry_ms", 300000) + self.request_expiry_s = self.request_expiry_ms / 1000 + self.input_queue = None + self.response_queue = None + self.enqueue_time = None + self.request_outstanding = False + + self.flow = self.create_broker_request_response_flow() + self.setup_queues(self.flow) + self.flow.run() + + def create_broker_request_response_flow(self): + self.broker_config["request_expiry_ms"] = self.request_expiry_ms + config = { + "name": "_internal_broker_request_response_flow", + "components": [ + { + "component_name": "_internal_broker_request_response", + "component_module": "broker_request_response", + "component_config": self.broker_config, + } + ], + } + return self.connector.create_flow(flow=config, index=0, flow_instance_index=0) + + def setup_queues(self, flow): + # Input queue to send the message to the flow + self.input_queue = flow.get_input_queue() + + # Response queue to receive the response from the flow + self.response_queue = queue.Queue() + rrcComponent = RequestResponseControllerOuputComponent(self) + flow.set_next_component(rrcComponent) + + def do_broker_request_response( + self, request_message, stream=False, streaming_complete_expression=None + ): + # Send the message to the broker + self.send_message(request_message, stream, streaming_complete_expression) + + # Now we will wait for the response + now = time.time() + elapsed_time = now - self.enqueue_time + remaining_timeout = self.request_expiry_s - elapsed_time + if stream: + # If we are in streaming mode, we will return individual messages + # until we receive the last message. Use the expression to determine + # if this is the last message + while True: + try: + event = self.response_queue.get(timeout=remaining_timeout) + if event.event_type == EventType.MESSAGE: + message = event.data + last_message = message.get_data(streaming_complete_expression) + yield message, last_message + if last_message: + return + except queue.Empty: + if (time.time() - self.enqueue_time) > self.request_expiry_s: + raise TimeoutError( # pylint: disable=raise-missing-from + "Timeout waiting for response" + ) + except Exception as e: + raise e + + now = time.time() + elapsed_time = now - self.enqueue_time + remaining_timeout = self.request_expiry_s - elapsed_time + + # If we are not in streaming mode, we will return a single message + # and then stop the iterator + try: + event = self.response_queue.get(timeout=remaining_timeout) + if event.event_type == EventType.MESSAGE: + message = event.data + yield message, True + return + except queue.Empty: + if (time.time() - self.enqueue_time) > self.request_expiry_s: + raise TimeoutError( # pylint: disable=raise-missing-from + "Timeout waiting for response" + ) + except Exception as e: + raise e + + def send_message( + self, message: Message, stream=False, streaming_complete_expression=None + ): + # Make a new message, but copy the data from the original message + if not self.input_queue: + raise ValueError(f"Input queue for flow {self.flow.name} not found") + + # Need to set the previous object to the required input for the + # broker_request_response component + message.set_previous( + { + "payload": message.get_payload(), + "user_properties": message.get_user_properties(), + "topic": message.get_topic(), + "stream": stream, + "streaming_complete_expression": streaming_complete_expression, + }, + ) + + event = Event(EventType.MESSAGE, message) + self.enqueue_time = time.time() + self.request_outstanding = True + self.input_queue.put(event) + + def enqueue_response(self, event): + self.response_queue.put(event) diff --git a/src/solace_ai_connector/main.py b/src/solace_ai_connector/main.py index 0adb414..a62cf00 100644 --- a/src/solace_ai_connector/main.py +++ b/src/solace_ai_connector/main.py @@ -1,5 +1,6 @@ import os import sys +import re import yaml from .solace_ai_connector import SolaceAiConnector @@ -12,7 +13,7 @@ def load_config(file): yaml_str = f.read() # Substitute the environment variables using os.environ - yaml_str = os.path.expandvars(yaml_str) + yaml_str = expandvars_with_defaults(yaml_str) # Load the YAML string using yaml.safe_load return yaml.safe_load(yaml_str) @@ -22,6 +23,19 @@ def load_config(file): sys.exit(1) +def expandvars_with_defaults(text): + """Expand environment variables with support for default values. + Supported syntax: ${VAR_NAME} or ${VAR_NAME, default_value}""" + pattern = re.compile(r"\$\{([^}:\s]+)(?:\s*,\s*([^}]*))?\}") + + def replacer(match): + var_name = match.group(1) + default_value = match.group(2) if match.group(2) is not None else "" + return os.environ.get(var_name, default_value) + + return pattern.sub(replacer, text) + + def merge_config(dict1, dict2): """Merge a new configuration into an existing configuration.""" merged = {} diff --git a/src/solace_ai_connector/solace_ai_connector.py b/src/solace_ai_connector/solace_ai_connector.py index f41f50f..40240cc 100644 --- a/src/solace_ai_connector/solace_ai_connector.py +++ b/src/solace_ai_connector/solace_ai_connector.py @@ -62,6 +62,8 @@ def create_flows(self): flow_input_queue = flow_instance.get_flow_input_queue() self.flow_input_queues[flow.get("name")] = flow_input_queue self.flows.append(flow_instance) + for flow in self.flows: + flow.run() def create_flow(self, flow: dict, index: int, flow_instance_index: int): """Create a single flow""" @@ -98,15 +100,6 @@ def wait_for_flows(self): self.stop() self.cleanup() - def stop(self): - """Stop the Solace AI Event Connector""" - log.info("Stopping Solace AI Event Connector") - self.stop_signal.set() - self.timer_manager.stop() # Stop the timer manager first - self.wait_for_flows() - if self.trace_thread: - self.trace_thread.join() - def cleanup(self): """Clean up resources and ensure all threads are properly joined""" log.info("Cleaning up Solace AI Event Connector") @@ -202,6 +195,13 @@ def get_flows(self): """Return the flows""" return self.flows + def get_flow(self, flow_name): + """Return a specific flow by name""" + for flow in self.flows: + if flow.name == flow_name: + return flow + return None + def setup_cache_service(self): """Setup the cache service""" cache_config = self.config.get("cache", {}) diff --git a/tests/utils_for_test_files.py b/src/solace_ai_connector/test_utils/utils_for_test_files.py similarity index 58% rename from tests/utils_for_test_files.py rename to src/solace_ai_connector/test_utils/utils_for_test_files.py index 2bb9dfc..2b3421b 100644 --- a/tests/utils_for_test_files.py +++ b/src/solace_ai_connector/test_utils/utils_for_test_files.py @@ -1,8 +1,6 @@ -"""Collection of functions to be used in test files""" - +import os import queue import sys -import os import yaml sys.path.insert(0, os.path.abspath("src")) @@ -10,6 +8,7 @@ from solace_ai_connector.solace_ai_connector import SolaceAiConnector from solace_ai_connector.common.log import log from solace_ai_connector.common.event import Event, EventType +from solace_ai_connector.common.message import Message # from solace_ai_connector.common.message import Message @@ -61,12 +60,16 @@ def enqueue(self, message): self.next_component_queue.put(event) -def create_connector(config_yaml, event_handlers=None, error_queue=None): - """Create a connector from a config""" +def create_connector(config_or_yaml, event_handlers=None, error_queue=None): + """Create a connector from a config that can be an object or a yaml string""" + + config = config_or_yaml + if isinstance(config_or_yaml, str): + config = yaml.safe_load(config_or_yaml) # Create the connector connector = SolaceAiConnector( - yaml.safe_load(config_yaml), + config, event_handlers=event_handlers, error_queue=error_queue, ) @@ -76,15 +79,97 @@ def create_connector(config_yaml, event_handlers=None, error_queue=None): return connector -def create_test_flows(config_yaml, queue_timeout=None, error_queue=None, queue_size=0): +def run_component_test( + module_or_name, + validation_func, + component_config=None, + input_data=None, + input_messages=None, + input_selection=None, + input_transforms=None, + max_response_timeout=None, +): + if not input_data and not input_messages: + raise ValueError("Either input_data or input_messages must be provided") + + if input_data and input_messages: + raise ValueError("Only one of input_data or input_messages can be provided") + + if input_data and not isinstance(input_data, list): + input_data = [input_data] + + if input_messages and not isinstance(input_messages, list): + input_messages = [input_messages] + + if not input_messages: + input_messages = [] + + if input_selection: + if isinstance(input_selection, str): + input_selection = {"source_expression": input_selection} + + connector = None + try: + connector, flows = create_test_flows( + { + "flows": [ + { + "name": "test_flow", + "components": [ + { + "component_name": "test_component", + "component_module": module_or_name, + "component_config": component_config or {}, + "input_selection": input_selection, + "input_transforms": input_transforms, + } + ], + } + ] + }, + queue_timeout=max_response_timeout, + ) + + if input_data: + for data in input_data: + message = Message(payload=data) + message.set_previous(data) + input_messages.append(message) + + # Send each message through, one at a time + output_data_list = [] + output_message_list = [] + for message in input_messages: + send_message_to_flow(flows[0], message) + output_message = get_message_from_flow(flows[0]) + if not output_message: + # This only happens if the max_response_timeout is reached + output_message_list.append(None) + output_data_list.append(None) + continue + output_data_list.append(output_message.get_data("previous")) + output_message_list.append(output_message) + + validation_func(output_data_list, output_message_list, message) + + finally: + if connector: + dispose_connector(connector) + + +def create_test_flows( + config_or_yaml, queue_timeout=None, error_queue=None, queue_size=0 +): # Create the connector - connector = create_connector(config_yaml, error_queue=error_queue) + connector = create_connector(config_or_yaml, error_queue=error_queue) flows = connector.get_flows() # For each of the flows, add the input and output components flow_info = [] for flow in flows: + if flow.flow_config.get("test_ignore", False): + continue input_component = TestInputComponent( flow.component_groups[0][0].get_input_queue() ) @@ -123,6 +208,8 @@ def send_message_to_flow(flow_info, message): def get_message_from_flow(flow_info): output_component = flow_info["output_component"] event = output_component.get_output() + if not event: + return event if event.event_type != EventType.MESSAGE: raise ValueError("Expected a message event") return event.data diff --git a/tests/test_acks.py b/tests/test_acks.py index c067fb5..bf0b1ea 100644 --- a/tests/test_acks.py +++ b/tests/test_acks.py @@ -1,11 +1,11 @@ """This file tests acks in a flow""" import sys -import queue sys.path.append("src") +import queue -from utils_for_test_files import ( # pylint: disable=wrong-import-position +from solace_ai_connector.test_utils.utils_for_test_files import ( # pylint: disable=wrong-import-position # create_connector, # create_and_run_component, dispose_connector, diff --git a/tests/test_aggregate.py b/tests/test_aggregate.py index a288410..8826f6a 100644 --- a/tests/test_aggregate.py +++ b/tests/test_aggregate.py @@ -1,8 +1,11 @@ """Some tests to verify the aggregate component works as expected""" +import sys + +sys.path.append("src") import time -from utils_for_test_files import ( +from solace_ai_connector.test_utils.utils_for_test_files import ( create_test_flows, dispose_connector, send_message_to_flow, diff --git a/tests/test_config_file.py b/tests/test_config_file.py index 5bd34f7..593bc0e 100644 --- a/tests/test_config_file.py +++ b/tests/test_config_file.py @@ -1,19 +1,26 @@ """Test various things related to the configuration file""" import sys -import yaml import pytest +import yaml sys.path.append("src") -from utils_for_test_files import ( # pylint: disable=wrong-import-position +from solace_ai_connector.test_utils.utils_for_test_files import ( # pylint: disable=wrong-import-position create_connector, + create_test_flows, + dispose_connector, + send_message_to_flow, + get_message_from_flow, ) from solace_ai_connector.solace_ai_connector import ( # pylint: disable=wrong-import-position SolaceAiConnector, ) +from solace_ai_connector.common.message import Message +import solace_ai_connector.components.general.pass_through + # from solace_ai_connector.common.log import log @@ -143,6 +150,46 @@ def test_no_component_module(): assert str(e) == "component_module not provided in flow 0, component 0" +def test_static_import_and_object_config(): + """Test that we can statically import a module and pass an object for the config""" + + config = { + "log": {"log_file_level": "DEBUG", "log_file": "solace_ai_connector.log"}, + "flows": [ + { + "name": "test_flow", + "components": [ + { + "component_name": "delay1", + "component_module": solace_ai_connector.components.general.pass_through, + "component_config": {"delay": 0.1}, + "input_selection": {"source_expression": "input.payload"}, + } + ], + } + ], + } + connector = None + try: + connector, flows = create_test_flows(config) + + # Test pushing a simple message through the delay component + message = Message(payload={"text": "Hello, World!"}) + send_message_to_flow(flows[0], message) + + # Get the output message + output_message = get_message_from_flow(flows[0]) + + # Check that the output is correct + assert output_message.get_data("previous") == {"text": "Hello, World!"} + + except Exception as e: + pytest.fail(f"Test failed with exception: {e}") + finally: + if "connector" in locals(): + dispose_connector(connector) + + def test_bad_module(): """Test that the program exits if the component module is not found""" try: diff --git a/tests/test_error_flows.py b/tests/test_error_flows.py index b8ff4d7..8e7edfe 100644 --- a/tests/test_error_flows.py +++ b/tests/test_error_flows.py @@ -2,11 +2,11 @@ import sys -# import queue - sys.path.append("src") -from utils_for_test_files import ( # pylint: disable=wrong-import-position +# import queue + +from solace_ai_connector.test_utils.utils_for_test_files import ( # pylint: disable=wrong-import-position create_test_flows, # create_and_run_component, dispose_connector, diff --git a/tests/test_filter.py b/tests/test_filter.py index b43b72f..478cc94 100644 --- a/tests/test_filter.py +++ b/tests/test_filter.py @@ -1,8 +1,12 @@ """Some tests to verify the filter component works as expected""" +import sys + +sys.path.append("src") + # import pytest -from utils_for_test_files import ( +from solace_ai_connector.test_utils.utils_for_test_files import ( create_test_flows, # create_connector, dispose_connector, diff --git a/tests/test_flows.py b/tests/test_flows.py index 6196ae1..4687fda 100644 --- a/tests/test_flows.py +++ b/tests/test_flows.py @@ -1,9 +1,12 @@ """This test file tests all things to do with the flows and the components that make up the flows""" +import sys + +sys.path.append("src") import pytest import time -from utils_for_test_files import ( +from solace_ai_connector.test_utils.utils_for_test_files import ( create_test_flows, create_connector, dispose_connector, diff --git a/tests/test_invoke.py b/tests/test_invoke.py index 58d0b77..fa0de0f 100644 --- a/tests/test_invoke.py +++ b/tests/test_invoke.py @@ -5,13 +5,14 @@ sys.path.append("src") -from utils_for_test_files import ( # pylint: disable=wrong-import-position + +from solace_ai_connector.test_utils.utils_for_test_files import ( create_and_run_component, ) -from solace_ai_connector.common.utils import ( # pylint: disable=wrong-import-position +from solace_ai_connector.common.utils import ( resolve_config_values, ) -from solace_ai_connector.common.message import ( # pylint: disable=wrong-import-position +from solace_ai_connector.common.message import ( Message, ) @@ -1083,16 +1084,13 @@ def test_invoke_with_uuid_generator(): response = resolve_config_values( { "a": { - "invoke": { - "module": "invoke_functions", - "function": "uuid" - }, + "invoke": {"module": "invoke_functions", "function": "uuid"}, }, } - ) - + ) + # Check if the output is of type string assert type(response["a"]) == str # Check if the output is a valid UUID - assert len(response["a"]) == 36 \ No newline at end of file + assert len(response["a"]) == 36 diff --git a/tests/test_iterate.py b/tests/test_iterate.py index a33bacc..cffb763 100644 --- a/tests/test_iterate.py +++ b/tests/test_iterate.py @@ -1,8 +1,12 @@ """Some tests to verify the iterate component works as expected""" +import sys + +sys.path.append("src") + # import pytest -from utils_for_test_files import ( +from solace_ai_connector.test_utils.utils_for_test_files import ( create_test_flows, # create_connector, dispose_connector, diff --git a/tests/test_message_get_set_data.py b/tests/test_message_get_set_data.py index f2258d1..33622ed 100644 --- a/tests/test_message_get_set_data.py +++ b/tests/test_message_get_set_data.py @@ -1,11 +1,11 @@ """This test fixture will test the get_data and set_data methods of the Message class""" +import sys +sys.path.append("src") import json import base64 -import sys import pytest -sys.path.append("src") from solace_ai_connector.common.message import Message # Create a few different messages to test with diff --git a/tests/test_request_response_controller.py b/tests/test_request_response_controller.py new file mode 100644 index 0000000..6ca6073 --- /dev/null +++ b/tests/test_request_response_controller.py @@ -0,0 +1,244 @@ +import sys + +sys.path.append("src") + +from solace_ai_connector.test_utils.utils_for_test_files import ( + create_test_flows, + dispose_connector, + send_message_to_flow, + get_message_from_flow, +) +from solace_ai_connector.common.message import Message + + +def test_request_response_flow_controller_basic(): + """Test basic functionality of the RequestResponseFlowController""" + + def test_invoke_handler(component, message, _data): + # Call the request_response + message = component.do_broker_request_response(message) + try: + assert message.get_data("previous") == { + "payload": {"text": "Hello, World!"}, + "topic": None, + "user_properties": {}, + } + except AssertionError as e: + return e + return "Pass" + + config = { + "flows": [ + { + "name": "test_flow", + "components": [ + { + "component_name": "requester", + "component_module": "handler_callback", + "component_config": { + "invoke_handler": test_invoke_handler, + }, + "broker_request_response": { + "enabled": True, + "broker_config": { + "broker_type": "test", + "broker_url": "test", + "broker_username": "test", + "broker_password": "test", + "broker_vpn": "test", + "payload_encoding": "utf-8", + "payload_format": "json", + }, + "request_expiry_ms": 500000, + }, + } + ], + }, + ] + } + connector, flows = create_test_flows(config) + + test_flow = flows[0] + + try: + + # Send a message to the input flow + send_message_to_flow(test_flow, Message(payload={"text": "Hello, World!"})) + + # Get the output message + output_message = get_message_from_flow(test_flow) + + result = output_message.get_data("previous") + + # if the result is an AssertionError, then raise it + if isinstance(result, AssertionError): + raise result + + assert result == "Pass" + + except Exception as e: + print(e) + assert False + + finally: + dispose_connector(connector) + + +# Test simple streaming request response +# Use the iterate component to break a single message into multiple messages +def test_request_response_flow_controller_streaming(): + """Test streaming functionality of the RequestResponseFlowController""" + + def test_invoke_handler(component, message, data): + result = [] + for message, last_message in component.do_broker_request_response( + message, stream=True, streaming_complete_expression="input.payload:last" + ): + payload = message.get_data("input.payload") + result.append(payload) + if last_message: + assert payload == {"text": "Chunk3", "last": True} + + assert result == [ + {"text": "Chunk1", "last": False}, + {"text": "Chunk2", "last": False}, + {"text": "Chunk3", "last": True}, + ] + + return "Pass" + + config = { + "flows": [ + { + "name": "test_flow", + "components": [ + { + "component_name": "requester", + "component_module": "handler_callback", + "component_config": { + "invoke_handler": test_invoke_handler, + }, + "broker_request_response": { + "enabled": True, + "broker_config": { + "broker_type": "test_streaming", + "broker_url": "test", + "broker_username": "test", + "broker_password": "test", + "broker_vpn": "test", + "payload_encoding": "utf-8", + "payload_format": "json", + }, + "request_expiry_ms": 500000, + }, + } + ], + }, + ] + } + connector, flows = create_test_flows(config) + + test_flow = flows[0] + + try: + + # Send a message to the input flow + send_message_to_flow( + test_flow, + Message( + payload=[ + {"text": "Chunk1", "last": False}, + {"text": "Chunk2", "last": False}, + {"text": "Chunk3", "last": True}, + ] + ), + ) + + # Get the output message + output_message = get_message_from_flow(test_flow) + + assert output_message.get_data("previous") == "Pass" + + except Exception as e: + print(e) + assert False + + finally: + dispose_connector(connector) + + +# Test the timeout functionality +def test_request_response_flow_controller_timeout(): + """Test timeout functionality of the RequestResponseFlowController""" + + def test_invoke_handler(component, message, data): + # # Call the request_response_flow + # data_iter = component.send_request_response_flow_message( + # "test_controller", message, {"test": "data"} + # ) + + # # This will timeout + # try: + # for message, data, _last_message in data_iter(): + # assert message.get_data("previous") == {"test": "data"} + # assert message.get_data("input.payload") == {"text": "Hello, World!"} + # except TimeoutError: + # return "timeout" + # return "done" + + # Do it the new way + try: + for message, _last_message in component.do_broker_request_response( + message, stream=True, streaming_complete_expression="input.payload:last" + ): + pass + except TimeoutError: + return "Timeout" + return "Fail" + + config = { + "flows": [ + { + "name": "test_flow", + "components": [ + { + "component_name": "requester", + "component_module": "handler_callback", + "component_config": { + "invoke_handler": test_invoke_handler, + }, + "broker_request_response": { + "enabled": True, + "broker_config": { + "broker_type": "test_streaming", + "broker_url": "test", + "broker_username": "test", + "broker_password": "test", + "broker_vpn": "test", + "payload_encoding": "utf-8", + "payload_format": "json", + }, + "request_expiry_ms": 2000, + }, + } + ], + }, + ] + } + connector, flows = create_test_flows(config) + + test_flow = flows[0] + + try: + + # Send a message with an empty list in the payload to the test_streaming broker type + # This will not send any chunks and should timeout + send_message_to_flow(test_flow, Message(payload=[])) + + # Get the output message + output_message = get_message_from_flow(test_flow) + + assert output_message.get_data("previous") == "Timeout" + + finally: + dispose_connector(connector) diff --git a/tests/test_timer_input.py b/tests/test_timer_input.py index 343a7d8..b8897e2 100644 --- a/tests/test_timer_input.py +++ b/tests/test_timer_input.py @@ -1,9 +1,12 @@ """Test the timer input component""" +import sys + +sys.path.append("src") import time import pytest -from utils_for_test_files import ( +from solace_ai_connector.test_utils.utils_for_test_files import ( create_test_flows, create_connector, dispose_connector, diff --git a/tests/test_transforms.py b/tests/test_transforms.py index 9b00150..43e0fed 100644 --- a/tests/test_transforms.py +++ b/tests/test_transforms.py @@ -4,14 +4,16 @@ sys.path.append("src") -from utils_for_test_files import ( # pylint: disable=wrong-import-position +from solace_ai_connector.test_utils.utils_for_test_files import ( # pylint: disable=wrong-import-position create_connector, create_and_run_component, + run_component_test, # dispose_connector, ) from solace_ai_connector.common.message import ( # pylint: disable=wrong-import-position Message, ) +import solace_ai_connector.components.general.pass_through def test_basic_copy_transform(): @@ -44,6 +46,64 @@ def test_basic_copy_transform(): assert output_message.get_data("previous") == "Hello, World!" +def test_transform_with_run_component_test(): + """This test is actually testing the test infrastructure method: run_component_test""" + + def validation_func(output_data, output_message, _input_message): + assert output_data[0] == "Hello, World!" + assert output_message[0].get_data("user_data.temp") == { + "payload": {"text": "Hello, World!", "greeting": "Static Greeting!"} + } + + run_component_test( + "pass_through", + validation_func, + input_data={"text": "Hello, World!"}, + input_transforms=[ + { + "type": "copy", + "source_expression": "input.payload", + "dest_expression": "user_data.temp:payload", + }, + { + "type": "copy", + "source_value": "Static Greeting!", + "dest_expression": "user_data.temp:payload.greeting", + }, + ], + input_selection={"source_expression": "user_data.temp:payload.text"}, + ) + + +def test_transform_with_run_component_test_with_static_import(): + """This test is actually testing the test infrastructure method: run_component_test""" + + def validation_func(output_data, output_message, _input_message): + assert output_data == ["Hello, World!"] + assert output_message[0].get_data("user_data.temp") == { + "payload": {"text": "Hello, World!", "greeting": "Static Greeting!"} + } + + run_component_test( + solace_ai_connector.components.general.pass_through, + validation_func, + input_data={"text": "Hello, World!"}, + input_transforms=[ + { + "type": "copy", + "source_expression": "input.payload", + "dest_expression": "user_data.temp:payload", + }, + { + "type": "copy", + "source_value": "Static Greeting!", + "dest_expression": "user_data.temp:payload.greeting", + }, + ], + input_selection={"source_expression": "user_data.temp:payload.text"}, + ) + + def test_basic_map_transform(): """Test the basic map transform""" # Create a simple configuration