Skip to content

Commit

Permalink
New features: inline broker request-response, temporary queues, impro…
Browse files Browse the repository at this point in the history
…ved docs and examples and better testing (#39)

* Examples Update + Code Refactor (#25)

* Removed StorageManager

* Added examples for OpenAI, Bedrock, Anthropic, and VertexAI

* Updating old examples (1/2)

* Updating old examples (2/2)

* Added support for temporary queue + UUID queue name (#26)

* Add assembly component and auto-generated documents (#27)

* Added the assembly component

* Auto-generated documents

* Added type check

* Update the cache service expiry logic + Update the assembly component to use cache expiry for timeout

* Moved assembly to the correct place

* Added MoA Example + UUID Invoke Function (#28)

* MoA example: Broadcast to multiple agents

* Added MoA event manager, added uuid invoke_function + test, updated auto-generated docs

* Added assembly layer to MoA example

* Update documentation for new users + Refactored component_input & source_expression (#29)

* Refactored component_input to input_selection

* Updated, added, and enhanced the documentation with new users in mind

* Refactored source_expression function to evaluate_expression (backward compatible)

* Added tips and tricks section + info and examples on custom modules

* tiny format update

* tiny update

* Fixed solace disconnection issues on shutting down (#30)

* Add RAG example for AI connector + delete action for vector index (#31)

* Added a RAG example for AI connector

* Added delete option to vectordb

* Changed id to ids

* chore: Refactor make_history_start_with_user_message method (#32)

Fix the method to not trim the first entry if it is a "system" role

* Keep history depth needs to be a positive integer and test refactor (#33)

* chore: Refactor clear_history_but_keep_depth method to handle negative depth values

* chore: small change to how this is solved

* chore: one more try

* refactor: move utils_for_test_files.py to solace_ai_connector module

* refactor: removed the orginal utils_for_test_files.py

* refactor: update import statements in test files

* refactor: add sys.path.append("src") to test files

* refactor: standardize import order and sys.path.append in test files

* refactor: a bit more test infrastructure changes

* feat: allow component_module to accept module objects directly

* feat: add types module import to utils.py

* test: add static import and object config test

* refactor: update test_static_import_and_object_config to use create_test_flows

* refactor: Improve test structure and remove duplicate test case

* fix: remove duplicate import of yaml module

* refactor: Modify test config to use dict instead of YAML string

* refactor: convert config_yaml from string to dictionary

* refactor: update static import test to use pass_through component

* test: Add delay component message passing test

* feat: add test for delay component message processing

* feat: Added a new test function (test_one_component) to make it very easy to just run some quick tests on a single input -> expected output tests on a single component

* feat: added input_transforms to the test_one_component so that input transforms can be tested with it

* chore: a bit of cleanup and new tests for test_one_component

* chore: rename test_one_component because it was being picked up as a test by the pytest scanner

* fix: fixed a typo

* Fix for anthropic example (#35)

* Updating version dependency (#37)

* Fixed url and file name in getting started (#38)

* Add guide for RAG (#39)

* Added guide for RAG

* update wording

* Added link to other docs from RAG guide (#40)

* chore: added a timeout setting for running component tests so that you can test situations where you don't expect any output (#34)

* AI-124: Add a feature to provide simple blocking broker request/response ability for components (#42)

* feat: add request_response_controller.py

* feat: implement RequestResponseFlowManager and RequestResponseController classes

* style: format code with black and improve readability

* feat: implement RequestResponseController for flow-based request-response handling

* feat: implement RequestResponseController for handling request-response patterns

* fix: import SolaceAiConnector for type checking

* refactor: restructure Flow class and improve code organization

* feat: implement multiple named RequestResponseControllers per component

* refactor: initialize request-response controllers in ComponentBase

* test: add request_response_controller functionality tests

* feat: finished implementation and added some tests

* refactor: rename RequestResponseController to RequestResponseFlowController

* refactor: rename RequestResponseController to RequestResponseFlowController

* refactor: some name changes

* fix: update test function names for RequestResponseFlowController

* refactor: more name changes

* Ed/req_resp_examples_and_fixes (#41)

* feat: Added a request_response_flow example and fixed a few issues along the way

* feat: Reworked the broker_request_response built-in ability of components to be simpler. Instead of having to have a defined flow and then name that flow, it will automatically create a flow with a single broker_request_response component in it. Now there is a straightforward interating function call to allow components to issue a request and get streaming or non-streaming responses from that flow.

* chore: fix the request_response example and remove the old one

* docs: add broker request-response configuration

* docs: added advanced_component_features.md

* docs: add broker request-response configuration details

* docs: add payload encoding and format to broker config

* docs: add cache service and timer manager to advanced_component_features.md

* docs: add configuration requirement for broker request-response

* docs: update broker request-response section with configuration info

* docs: a bit more detail about do_broker_request_response

* docs: add link to advanced features page in table of contents

* docs: add link to advanced features page

* docs: reorder table of contents in index.md

* docs: add custom components documentation

* docs: Remove advanced component features from table of contents

* docs: clean up a double inclusion of the same section

* docs: small example change

* chore: remove dead code

* chore: add some extra comments to explain some test code

* docs: Update description of STDIN input component

Update the description of the STDIN input component to clarify that it waits for its output message to be acknowledged before prompting for the next input. This change is made in the `stdin_input.py` file.

* chore: add is_broker_request_response_enabled method

* chore: Some changes after review

* feat: AI-129: add ability to specify a default value for a an environment variable in a .yaml config file (#43)

* DATAGO-85484 Bump min python version

---------

Co-authored-by: Cyrus Mobini <[email protected]>
Co-authored-by: Art Morozov <[email protected]>
Co-authored-by: Art Morozov <[email protected]>
  • Loading branch information
4 people authored Sep 26, 2024
1 parent 846742d commit 7cf742d
Show file tree
Hide file tree
Showing 48 changed files with 1,822 additions and 142 deletions.
9 changes: 6 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ permissions:

jobs:
ci:
uses: SolaceDev/solace-public-workflows/.github/workflows/[email protected]
uses: SolaceDev/solace-public-workflows/.github/workflows/hatch_ci.yml@latest
with:
min-python-version: "3.9"
secrets:
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
SONAR_HOST_URL: ${{ vars.SONAR_HOST_URL }}
Expand All @@ -29,8 +31,9 @@ jobs:
ssh-key: ${{ secrets.COMMIT_KEY }}

- name: Set up Hatch
uses: SolaceDev/solace-public-workflows/.github/actions/[email protected]

uses: SolaceDev/solace-public-workflows/.github/actions/hatch-setup@latest
with:
min-python-version: "3.9"
- name: Set Up Docker Buildx
id: builder
uses: docker/setup-buildx-action@v3
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ permissions:

jobs:
release:
uses: SolaceDev/solace-public-workflows/.github/workflows/hatch_release_pypi.yml@v1.0.1
uses: SolaceDev/solace-public-workflows/.github/workflows/hatch_release_pypi.yml@latest
with:
ENVIRONMENT: pypi
version: ${{ github.event.inputs.version }}
Expand Down
1 change: 0 additions & 1 deletion config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ log:

shared_config:
- broker_config: &broker_connection
broker_connection_share: ${SOLACE_BROKER_URL}
broker_type: solace
broker_url: ${SOLACE_BROKER_URL}
broker_username: ${SOLACE_BROKER_USERNAME}
Expand Down
120 changes: 120 additions & 0 deletions docs/advanced_component_features.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Advanced Component Features

This document describes advanced features available to custom components in the Solace AI Connector.

## Table of Contents
- [Broker Request-Response](#broker-request-response)
- [Cache Manager](#cache-manager)
- [Timer Features](#timer-features)

## Broker Request-Response

Components can perform a request and get a response from the broker using the `do_broker_request_response` method. This method supports both simple request-response and streamed responses. To use this feature, the component's configuration must include a `broker_request_response` section. For details on how to configure this section, refer to the [Broker Request-Response Configuration](configuration.md#broker-request-response-configuration) in the configuration documentation.

This feature would be used in the invoke method of a custom component. When the `do_broker_request_response` method is called, the component will send a message to the broker and then block until a response (or a series of streamed chunks) is received. This makes it very easy to call services that are available via the broker.

### Usage

```python
response = self.do_broker_request_response(message, stream=False)
```

For streamed responses:

```python
for chunk, is_last in self.do_broker_request_response(message, stream=True, streaming_complete_expression="input.payload:streaming.last_message"):
# Process each chunk
if is_last:
break
```

### Parameters

- `message`: The message to send to the broker. This must have a topic and payload.
- `stream` (optional): Boolean indicating whether to expect a streamed response. Default is False.
- `streaming_complete_expression` (optional): An expression to evaluate on each response chunk to determine if it's the last one. This is required when `stream=True`.

### Return Value

- For non-streamed responses: Returns the response message.
- For streamed responses: Returns a generator that yields tuples of (chunk, is_last). Each chunk is a fully formed message with the format of the response. `is_last` is a boolean indicating if the chunk is the last one.

## Memory Cache

The cache service provides a flexible way to store and retrieve data with optional expiration. It supports different storage backends and offers features like automatic expiry checks.

### Features

1. Multiple storage backends:
- In-memory storage
- SQLAlchemy-based storage (for persistent storage)

2. Key-value storage with metadata and expiry support
3. Automatic expiry checks in a background thread
4. Thread-safe operations

### Usage

Components can access the cache service through `self.cache_service`. Here are some common operations:

```python
# Set a value with expiry
self.cache_service.set("key", "value", expiry=300) # Expires in 300 seconds

# Get a value
value = self.cache_service.get("key")

# Delete a value
self.cache_service.delete("key")

# Get all values (including metadata and expiry)
all_data = self.cache_service.get_all()
```

### Configuration

The cache service can be configured in the main configuration file:

```yaml
cache:
backend: "memory" # or "sqlalchemy"
connection_string: "sqlite:///cache.db" # for SQLAlchemy backend
```
## Timer Features
The timer manager allows components to schedule one-time or recurring timer events. This is useful for implementing delayed actions, periodic tasks, or timeouts.
### Features
1. One-time and recurring timers
2. Customizable timer IDs for easy management
3. Optional payloads for timer events
### Usage
Components can access the timer manager through `self.timer_manager`. Here are some common operations:

```python
# Add a one-time timer
self.add_timer(delay_ms=5000, timer_id="my_timer", payload={"key": "value"})
# Add a recurring timer
self.add_timer(delay_ms=5000, timer_id="recurring_timer", interval_ms=10000, payload={"type": "recurring"})
# Cancel a timer
self.cancel_timer(timer_id="my_timer")
```

### Handling Timer Events

To handle timer events, components should implement the `handle_timer_event` method:

```python
def handle_timer_event(self, timer_data):
timer_id = timer_data["timer_id"]
payload = timer_data["payload"]
# Process the timer event
```

Timer events are automatically dispatched to the appropriate component by the timer manager.
33 changes: 32 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ Each component configuration is a dictionary with the following keys:
- `input_selection`: <dictionary> - A `source_expression` or `source_value` to use as the input to the component. Check [Expression Syntax](#expression-syntax) for more details. [Optional: If not specified, the complete previous component output will be used]
- `queue_depth`: <int> - The depth of the input queue for the component.
- `num_instances`: <int> - The number of instances of the component to run (Starts multiple threads to process messages)

- `broker_request_response`: <dictionary> - Configuration for the broker request-response functionality. [Optional]

### component_module

Expand Down Expand Up @@ -359,6 +359,37 @@ The `queue_depth` is an integer that specifies the depth of the input queue for

The `num_instances` is an integer that specifies the number of instances of the component to run. This is the number of threads that will be started to process messages from the input queue. By default, the number of instances is 1.

### Broker Request-Response Configuration

The `broker_request_response` configuration allows components to perform request-response operations with a broker. It has the following structure:

```yaml
broker_request_response:
enabled: <boolean>
broker_config:
broker_type: <string>
broker_url: <string>
broker_username: <string>
broker_password: <string>
broker_vpn: <string>
payload_encoding: <string>
payload_format: <string>
request_expiry_ms: <int>
```

- `enabled`: Set to `true` to enable broker request-response functionality for the component.
- `broker_config`: Configuration for the broker connection.
- `broker_type`: Type of the broker (e.g., "solace").
- `broker_url`: URL of the broker.
- `broker_username`: Username for broker authentication.
- `broker_password`: Password for broker authentication.
- `broker_vpn`: VPN name for the broker connection.
- `payload_encoding`: Encoding for the payload (e.g., "utf-8", "base64").
- `payload_format`: Format of the payload (e.g., "json", "text").
- `request_expiry_ms`: Expiry time for requests in milliseconds.

For more details on using this functionality, see the [Advanced Component Features](advanced_component_features.md#broker-request-response) documentation.

### Built-in components

The AI Event Connector comes with a number of built-in components that can be used to process messages. For a list of all built-in components, see the [Components](components/index.md) documentation.
Expand Down
90 changes: 90 additions & 0 deletions docs/custom_components.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Custom Components

## Purpose

Custom components provide a way to extend the functionality of the Solace AI Connector beyond what's possible with the built-in components and configuration options. Sometimes, it's easier and more efficient to add custom code than to build a complex configuration file, especially for specialized or unique processing requirements.

## Requirements of a Custom Component

To create a custom component, you need to follow these requirements:

1. **Inherit from ComponentBase**: Your custom component class should inherit from the `ComponentBase` class.

2. **Info Section**: Define an `info` dictionary with the following keys:
- `class_name`: The name of your custom component class.
- `config_parameters`: A list of dictionaries describing the configuration parameters for your component.
- `input_schema`: A dictionary describing the expected input schema.
- `output_schema`: A dictionary describing the expected output schema.

3. **Implement the `invoke` method**: This is the main method where your component's logic will be implemented.

Here's a basic template for a custom component:

```python
from solace_ai_connector.components.component_base import ComponentBase

info = {
"class_name": "MyCustomComponent",
"config_parameters": [
{
"name": "my_param",
"type": "string",
"required": True,
"description": "A custom parameter"
}
],
"input_schema": {
"type": "object",
"properties": {
"input_data": {"type": "string"}
}
},
"output_schema": {
"type": "object",
"properties": {
"output_data": {"type": "string"}
}
}
}

class MyCustomComponent(ComponentBase):
def __init__(self, **kwargs):
super().__init__(info, **kwargs)
self.my_param = self.get_config("my_param")

def invoke(self, message, data):
# Your custom logic here
result = f"{self.my_param}: {data['input_data']}"
return {"output_data": result}
```

## Overrideable Methods

While the `invoke` method is the main one you'll implement, there are several other methods you can override to customize your component's behavior:

1. `invoke(self, message, data)`: The main processing method for your component.
2. `get_next_event(self)`: Customize how your component receives events.
3. `send_message(self, message)`: Customize how your component sends messages to the next component.
4. `handle_timer_event(self, timer_data)`: Handle timer events if your component uses timers.
5. `handle_cache_expiry_event(self, timer_data)`: Handle cache expiry events if your component uses the cache service.
6. `process_pre_invoke(self, message)`: Customize preprocessing before `invoke` is called.
7. `process_post_invoke(self, result, message)`: Customize postprocessing after `invoke` is called.

## Advanced Features

Custom components can take advantage of advanced features provided by the Solace AI Connector. These include:

- Broker request-response functionality
- Cache services
- Timer management

For more information on these advanced features and how to use them in your custom components, please refer to the [Advanced Component Features](advanced_component_features.md) documentation.

By creating custom components, you can extend the Solace AI Connector to meet your specific needs while still benefiting from the framework's built-in capabilities for event processing, flow management, and integration with Solace event brokers.

## Example

See the [Tips and Tricks page](tips_and_tricks.md) for an example of creating a custom component.


[]
4 changes: 2 additions & 2 deletions docs/getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ area in the Subscriber side of the "Try me!" page.
Download the OpenAI connector example configuration file:

```sh
curl https://raw.githubusercontent.com/SolaceLabs/solace-ai-connector/main/examples/llm/openai_chat.yaml > openai_chat.yaml
curl https://raw.githubusercontent.com/SolaceLabs/solace-ai-connector/refs/heads/main/examples/llm/langchain_openai_with_history_chat.yaml > langchain_openai_with_history_chat.yaml
```

For this one, you need to also define the following additional environment variables:
Expand All @@ -94,7 +94,7 @@ pip install langchain_openai openai
Run the connector:

```sh
solace-ai-connector openai_chat.yaml
solace-ai-connector langchain_openai_with_history_chat.yaml
```

Use the "Try Me!" function on the broker's browser UI (or some other means) to publish an event like this:
Expand Down
Loading

0 comments on commit 7cf742d

Please sign in to comment.