Skip to content

Commit

Permalink
Two changes: added file_output component and a cleaner way for a comp…
Browse files Browse the repository at this point in the history
…onent to discard a message (#22)

* Added file_output component
* Fixed timer_input component issue that caused the timer interval to shrink over time
  • Loading branch information
efunneko authored Jul 25, 2024
1 parent 34b058b commit 161fcc9
Show file tree
Hide file tree
Showing 12 changed files with 190 additions and 215 deletions.
135 changes: 16 additions & 119 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,61 +9,35 @@ on:
permissions:
id-token: write
checks: write
issues: read
pull-requests: write
contents: write

jobs:
test:
ci:
uses: SolaceDev/solace-public-workflows/.github/workflows/[email protected]
secrets:
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
SONAR_HOST_URL: ${{ vars.SONAR_HOST_URL }}
structure-test:
name: Test Docker Image Structure
runs-on: ubuntu-latest
env:
HATCH_CACHE_DIR: ${{ github.workspace }}/.hatch_cache
HATCH_DATA_DIR: ${{ github.workspace }}/.hatch_data

steps:
- uses: actions/checkout@v4
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
ssh-key: ${{ secrets.COMMIT_KEY }}

- name: Install Hatch
uses: pypa/hatch@install

- name: Restore Hatch Directory
uses: actions/cache/restore@v4
id: cache-restore
with:
path: |
${{ env.HATCH_CACHE_DIR }}
${{ env.HATCH_DATA_DIR }}
key: ${{ runner.os }}-hatch-${{ hashFiles('pyproject.toml','requirements.txt') }}

- name: Install Dependencies
if: steps.cache-restore.outputs.cache-hit != 'true'
run: |
hatch python install 3.8 3.12
- name: Install Dependencies
if: steps.cache-restore.outputs.cache-hit != 'true'
run: |
hatch env create test
- name: Cache Hatch Directory
uses: actions/cache/save@v4
if: steps.cache-restore.outputs.cache-hit != 'true'
id: cache-hatch
with:
path: |
${{ env.HATCH_CACHE_DIR }}
${{ env.HATCH_DATA_DIR }}
key: ${{ runner.os }}-hatch-${{ hashFiles('pyproject.toml','requirements.txt') }}
- name: Set up Hatch
uses: SolaceDev/solace-public-workflows/.github/actions/[email protected]

- name: Set up Docker Buildx
- name: Set Up Docker Buildx
id: builder
uses: docker/setup-buildx-action@v3

- name: Prepare env file
run: |
cp .env_template .env
shell: bash
- name: Build Docker Image
uses: docker/build-push-action@v6
Expand All @@ -74,84 +48,7 @@ jobs:
builder: ${{ steps.builder.outputs.name }}
load: true

- name: Run Lint
continue-on-error: true
run: |
hatch run lint:ruff check -o lint.json --output-format json
shell: bash

- name: Run Structured Tests
run: |
hatch run +py=312 test:make structure-test
- name: Run Structure Tests
shell: bash

- name: Run Unit Tests
shell: bash
run: |
hatch test --cover --all --parallel --junitxml=junit.xml
- name: Combine Coverage Reports
continue-on-error: true
run: |
hatch run +py=312 test:coverage combine
shell: bash

- name: Report coverage
run: |
hatch run +py=312 test:coverage xml
shell: bash

- name: SonarQube Scan
if: always() && github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name == github.repository
uses: sonarsource/[email protected]
env:
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
SONAR_HOST_URL: ${{ vars.SONAR_HOST_URL }}
with:
args: >
-Dsonar.tests=tests/
-Dsonar.verbose=true
-Dsonar.sources=src/
-Dsonar.projectKey=${{github.repository_owner}}_${{github.event.repository.name}}
-Dsonar.python.coverage.reportPaths=coverage.xml
-Dsonar.python.ruff.reportPaths=lint.json
- name: SonarQube Quality Gate check
id: sonarqube-quality-gate-check
if: always() && github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name == github.repository
uses: sonarsource/sonarqube-quality-gate-action@master
env:
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
SONAR_HOST_URL: ${{ vars.SONAR_HOST_URL }}

# Build and verify packages
- name: Build
run: hatch build

- name: Verify Packages
run: |
ls dist/*.tar.gz | hatch run +py=312 test:xargs -n1 twine check
ls dist/*.whl | hatch run +py=312 test:xargs -n1 twine check
shell: bash

- name: Surface failing tests
if: always()
uses: pmeier/pytest-results-action@main
with:
# A list of JUnit XML files, directories containing the former, and wildcard
# patterns to process.
# See @actions/glob for supported patterns.
path: junit.xml

# (Optional) Add a summary of the results at the top of the report
summary: true

# (Optional) Select which results should be included in the report.
# Follows the same syntax as `pytest -r`
display-options: fEX

# (Optional) Fail the workflow if no JUnit XML was found.
fail-on-empty: true

# (Optional) Title of the test results section in the workflow summary
title: Unit Test results
hatch run make structure-test
78 changes: 13 additions & 65 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,71 +7,19 @@ on:
required: true
description: "Version bump type"
options:
- patch
- minor
- major
- patch
- minor
- major

permissions:
id-token: write
checks: write

jobs:
release:
name: Release
timeout-minutes: 20
runs-on: ubuntu-latest
environment:
name: pypi
url: https://pypi.org/p/solace_ai_connector
permissions:
id-token: write
contents: write

steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
ssh-key: ${{ secrets.COMMIT_KEY }}

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.x'

- name: Install hatch
run: |
pip install --upgrade pip
pip install hatch
- name: Get Current Version
run: |
CURRENT_VERSION=$(hatch version)
echo "CURRENT_VERSION=${CURRENT_VERSION}" >> $GITHUB_ENV
- name: Fail if the current version doesn't exist
if: env.CURRENT_VERSION == ''
run: exit 1

- name: Build project for distribution
run: hatch build

- name: Publish package distributions to PyPI
uses: pypa/gh-action-pypi-publish@release/v1

- name: Create Release
uses: ncipollo/release-action@v1
with:
artifacts: "dist/*.whl"
makeLatest: true
generateReleaseNotes: true
tag: ${{ env.CURRENT_VERSION }}

- name: Bump Version
run: |
hatch version "${{ github.event.inputs.version }}"
NEW_VERSION=$(hatch version)
echo "NEW_VERSION=${NEW_VERSION}" >> $GITHUB_ENV
- name: Commit new version
run: |
git config --local user.email "[email protected]"
git config --local user.name "GitHub Action"
git commit -a -m "[ci skip] Bump version to $NEW_VERSION"
git push
uses: SolaceDev/solace-public-workflows/.github/workflows/[email protected]
with:
version: ${{ github.event.inputs.version }}
pypi-project: solace-ai-connector
secrets:
COMMIT_KEY: ${{ secrets.COMMIT_KEY }}
29 changes: 29 additions & 0 deletions docs/components/file_output.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# FileOutput

File output component

## Configuration Parameters

```yaml
component_name: <user-supplied-name>
component_module: file_output
component_config:
```
No configuration parameters
## Component Input Schema
```
{
content: <string>,
file_path: <string>,
mode: <string>
}
```
| Field | Required | Description |
| --- | --- | --- |
| content | True | |
| file_path | True | The path to the file to write to |
| mode | False | The mode to open the file in: w (write), a (append). Default is w. |
1 change: 1 addition & 0 deletions docs/components/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
| [broker_output](broker_output.md) | Connect to a messaging broker and send messages to it. Note that this component requires that the data is transformed into the input schema. |
| [delay](delay.md) | A simple component that simply passes the input to the output, but with a configurable delay. |
| [error_input](error_input.md) | Receive processing errors from the Solace AI Event Connector. Note that the component_input configuration is ignored. This component should be used to create a flow that handles errors from other flows. |
| [file_output](file_output.md) | File output component |
| [iterate](iterate.md) | Take a single message that is a list and output each item in that list as a separate message |
| [langchain_chat_model](langchain_chat_model.md) | Provide access to all the LangChain chat models via configuration |
| [langchain_chat_model_with_history](langchain_chat_model_with_history.md) | A chat model based on LangChain that includes keeping per-session history of the conversation. Note that this component will only take the first system message and the first human message in the messages array. |
Expand Down
2 changes: 2 additions & 0 deletions docs/components/langchain_chat_model_with_history.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ component_config:
stream_to_flow: <string>
llm_mode: <string>
stream_batch_size: <string>
set_response_uuid_in_user_properties: <boolean>
```
| Parameter | Required | Default | Description |
Expand All @@ -38,6 +39,7 @@ component_config:
| stream_to_flow | False | | Name the flow to stream the output to - this must be configured for llm_mode='stream'. |
| llm_mode | False | | The mode for streaming results: 'sync' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response. |
| stream_batch_size | False | 15 | The minimum number of words in a single streaming result. Default: 15. |
| set_response_uuid_in_user_properties | False | False | Whether to set the response_uuid in the user_properties of the input_message. This will allow other components to correlate streaming chunks with the full response. |
## Component Input Schema
Expand Down
6 changes: 5 additions & 1 deletion docs/components/timer_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,9 @@ component_config:
## Component Output Schema
```

<<<<<<< HEAD
<None>
=======
<any>
>>>>>>> origin/main
```
16 changes: 0 additions & 16 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,6 @@ packages = ["src/solace_ai_connector"]
[tool.hatch.version]
path = "src/solace_ai_connector/__init__.py"

[tool.hatch.envs.test]
dependencies = [
"pytest>=8.2.2",
"coverage>=7.5.4",
"twine>=5.1.1",
]

[tool.hatch.envs.lint]
detached = true
dependencies = [
"ruff>=0.5.0",
]

[tool.ruff]
lint.select = ["E4", "E7", "E9", "F"]
lint.ignore = ["F401", "E731"]

[[tool.hatch.envs.test.matrix]]
python = ["38", "312"]
7 changes: 7 additions & 0 deletions src/solace_ai_connector/common/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def __init__(self, payload=None, topic=None, user_properties=None):
# <data_type>:<data_name>
# Where:
# <data_type> is one of the following:
# input - Object containing the payload, topic, and user_properties
# input.payload - The payload of the message
# input.topic - The topic of the message as a string
# input.topic_levels - The topic of the message as a list of each level of the topic
Expand Down Expand Up @@ -99,6 +100,12 @@ def get_data_object(
):
data_type = expression.split(":")[0]

if data_type == "input":
return {
"payload": self.payload,
"topic": self.topic,
"user_properties": self.user_properties,
}
if data_type == "input.payload":
return self.payload
if data_type == "input.topic":
Expand Down
11 changes: 10 additions & 1 deletion src/solace_ai_connector/components/component_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def __init__(self, module_info, **kwargs):
self.need_acknowledgement = False
self.stop_thread_event = threading.Event()
self.current_message = None
self.current_message_has_been_discarded = False

self.log_identifier = f"[{self.instance_name}.{self.flow_name}.{self.name}] "

Expand Down Expand Up @@ -159,9 +160,13 @@ def process_message(self, message):
self.trace_data(data)

# Invoke the component
self.current_message_has_been_discarded = False
result = self.invoke(message, data)

if result is not None:
if self.current_message_has_been_discarded:
# Call the message acknowledgements
message.call_acknowledgements()
elif result is not None:
# Do all the things we need to do after invoking the component
# Note that there are times where we don't want to
# send the message to the next component
Expand Down Expand Up @@ -193,6 +198,10 @@ def process_post_invoke(self, result, message):
self.current_message = message
self.send_message(message)

def discard_current_message(self):
# If the message is to be discarded, we need to acknowledge any previous components
self.current_message_has_been_discarded = True

def get_acknowledgement_callback(self):
# This should be overridden by the component if it needs to acknowledge messages
return None
Expand Down
Loading

0 comments on commit 161fcc9

Please sign in to comment.