Skip to content

Commit

Permalink
Merge branch 'airtai:main' into quickstart
Browse files Browse the repository at this point in the history
  • Loading branch information
Sehat1137 authored Sep 8, 2024
2 parents 5874285 + 54223e1 commit c57c055
Show file tree
Hide file tree
Showing 19 changed files with 151 additions and 68 deletions.
32 changes: 0 additions & 32 deletions .github/workflows/pr_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ jobs:
env:
COVERAGE_FILE: coverage/.coverage.${{ runner.os }}-py${{ matrix.python-version }}-${{ matrix.pydantic-version }}
CONTEXT: ${{ runner.os }}-py${{ matrix.python-version }}-${{ matrix.pydantic-version }}
- run: ls coverage
- name: Store coverage files
uses: actions/upload-artifact@v4
with:
Expand All @@ -90,36 +89,6 @@ jobs:
if-no-files-found: error
include-hidden-files: true

test-orjson:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install Dependencies
if: steps.cache.outputs.cache-hit != 'true'
run: |
python -m pip install uv
uv pip install --system .[optionals,testing] orjson
- run: mkdir coverage
- name: Test
run: >
bash scripts/test.sh
-m "(slow and (${{env.ALL_PYTEST_MARKERS}})) or (${{env.ALL_PYTEST_MARKERS}})"
env:
COVERAGE_FILE: coverage/.coverage.orjson
CONTEXT: orjson
- name: Store coverage files
uses: actions/upload-artifact@v4
with:
name: .coverage.orjson
path: coverage
if-no-files-found: error
include-hidden-files: true

test-macos-latest:
if: github.event.pull_request.draft == false
runs-on: macos-latest
Expand Down Expand Up @@ -519,7 +488,6 @@ jobs:
- coverage-combine
- test-macos-latest
- test-windows-latest
- test-orjson

runs-on: ubuntu-latest

Expand Down
4 changes: 2 additions & 2 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@
"filename": "docs/docs/en/release.md",
"hashed_secret": "35675e68f4b5af7b995d9205ad0fc43842f16450",
"is_verified": false,
"line_number": 1647,
"line_number": 1655,
"is_secret": false
}
],
Expand All @@ -178,5 +178,5 @@
}
]
},
"generated_at": "2024-09-05T13:48:33Z"
"generated_at": "2024-09-06T15:06:47Z"
}
8 changes: 8 additions & 0 deletions docs/docs/en/release.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ hide:
---

# Release Notes
## 0.5.22

### What's Changed

* fix: FastAPI 0.112.4+ compatibility by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1766](https://github.com/airtai/faststream/pull/1766){.external-link target="_blank"}

**Full Changelog**: [#0.5.21...0.5.22](https://github.com/airtai/faststream/compare/0.5.21...0.5.22){.external-link target="_blank"}

## 0.5.21

### What's Changed
Expand Down
2 changes: 0 additions & 2 deletions docs/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ plugins:
import:
- https://docs.python.org/3/objects.inv
options:
# extensions:
# - griffe_typingdoc
preload_modules:
- httpx
- starlette
Expand Down
10 changes: 6 additions & 4 deletions faststream/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
)
from faststream.broker.core.usecase import BrokerUsecase
from faststream.types import (
AnyCallable,
AnyDict,
AnyHttpUrl,
AsyncFunc,
Expand Down Expand Up @@ -75,10 +76,11 @@ def __init__(
Union["ExternalDocs", "ExternalDocsDict", "AnyDict"]
] = None,
identifier: Optional[str] = None,
on_startup: Sequence[Callable[P_HookParams, T_HookReturn]] = (),
after_startup: Sequence[Callable[P_HookParams, T_HookReturn]] = (),
on_shutdown: Sequence[Callable[P_HookParams, T_HookReturn]] = (),
after_shutdown: Sequence[Callable[P_HookParams, T_HookReturn]] = (),
on_startup: Sequence["AnyCallable"] = (),
after_startup: Sequence["AnyCallable"] = (),
on_shutdown: Sequence["AnyCallable"] = (),
after_shutdown: Sequence["AnyCallable"] = (),
# all options should be copied to AsgiFastStream class
) -> None:
context.set_global("app", self)

Expand Down
19 changes: 18 additions & 1 deletion faststream/asgi/app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import traceback
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING, Any, AsyncIterator, Optional, Sequence, Tuple, Union
from typing import (
TYPE_CHECKING,
Any,
AsyncIterator,
Optional,
Sequence,
Tuple,
Union,
)

import anyio

Expand All @@ -24,6 +32,7 @@
)
from faststream.broker.core.usecase import BrokerUsecase
from faststream.types import (
AnyCallable,
AnyDict,
AnyHttpUrl,
Lifespan,
Expand Down Expand Up @@ -53,6 +62,10 @@ def __init__(
Union["ExternalDocs", "ExternalDocsDict", "AnyDict"]
] = None,
identifier: Optional[str] = None,
on_startup: Sequence["AnyCallable"] = (),
after_startup: Sequence["AnyCallable"] = (),
on_shutdown: Sequence["AnyCallable"] = (),
after_shutdown: Sequence["AnyCallable"] = (),
) -> None:
super().__init__(
broker=broker,
Expand All @@ -67,6 +80,10 @@ def __init__(
tags=tags,
external_docs=external_docs,
identifier=identifier,
on_startup=on_startup,
after_startup=after_startup,
on_shutdown=on_shutdown,
after_shutdown=after_shutdown,
)

self.routes = list(asgi_routes)
Expand Down
4 changes: 4 additions & 0 deletions faststream/confluent/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ async def publish( # type: ignore[override]
correlation_id: Optional[str] = None,
*,
reply_to: str = "",
no_confirm: bool = False,
# extra options to be compatible with test client
**kwargs: Any,
) -> Optional[Any]:
Expand All @@ -497,6 +498,7 @@ async def publish( # type: ignore[override]
headers=headers,
correlation_id=correlation_id,
reply_to=reply_to,
no_confirm=no_confirm,
**kwargs,
)

Expand Down Expand Up @@ -535,6 +537,7 @@ async def publish_batch(
headers: Optional[Dict[str, str]] = None,
reply_to: str = "",
correlation_id: Optional[str] = None,
no_confirm: bool = False,
) -> None:
assert self._producer, NOT_CONNECTED_YET # nosec B101

Expand All @@ -552,6 +555,7 @@ async def publish_batch(
headers=headers,
reply_to=reply_to,
correlation_id=correlation_id,
no_confirm=no_confirm,
)

@override
Expand Down
53 changes: 49 additions & 4 deletions faststream/confluent/client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import asyncio
import logging
from contextlib import suppress
from time import time
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterable,
List,
Expand All @@ -24,8 +27,18 @@
from faststream.utils.functions import call_or_await

if TYPE_CHECKING:
from typing_extensions import NotRequired, TypedDict

from faststream.types import AnyDict, LoggerProto

class _SendKwargs(TypedDict):
value: Optional[Union[str, bytes]]
key: Optional[Union[str, bytes]]
headers: Optional[List[Tuple[str, Union[str, bytes]]]]
partition: NotRequired[int]
timestamp: NotRequired[int]
on_delivery: NotRequired[Callable[..., None]]


class AsyncConfluentProducer:
"""An asynchronous Python Kafka client using the "confluent-kafka" package."""
Expand Down Expand Up @@ -101,9 +114,21 @@ def __init__(

self.producer = Producer(final_config, logger=self.logger)

self.__running = True
self._poll_task = asyncio.create_task(self._poll_loop())

async def _poll_loop(self) -> None:
while self.__running:
with suppress(Exception):
await call_or_await(self.producer.poll, 0.1)

async def stop(self) -> None:
"""Stop the Kafka producer and flush remaining messages."""
await call_or_await(self.producer.flush)
if self.__running:
self.__running = False
if not self._poll_task.done():
self._poll_task.cancel()
await call_or_await(self.producer.flush)

async def send(
self,
Expand All @@ -113,9 +138,10 @@ async def send(
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[List[Tuple[str, Union[str, bytes]]]] = None,
no_confirm: bool = False,
) -> None:
"""Sends a single message to a Kafka topic."""
kwargs: AnyDict = {
kwargs: _SendKwargs = {
"value": value,
"key": key,
"headers": headers,
Expand All @@ -127,16 +153,34 @@ async def send(
if timestamp_ms is not None:
kwargs["timestamp"] = timestamp_ms

if not no_confirm:
result_future: asyncio.Future[Optional[Message]] = asyncio.Future()

def ack_callback(err: Any, msg: Optional[Message]) -> None:
if err or (msg is not None and (err := msg.error())):
result_future.set_exception(KafkaException(err))
else:
result_future.set_result(msg)

kwargs["on_delivery"] = ack_callback

# should be sync to prevent segfault
self.producer.produce(topic, **kwargs)
self.producer.poll(0)

if not no_confirm:
await result_future

def create_batch(self) -> "BatchBuilder":
"""Creates a batch for sending multiple messages."""
return BatchBuilder()

async def send_batch(
self, batch: "BatchBuilder", topic: str, *, partition: Optional[int]
self,
batch: "BatchBuilder",
topic: str,
*,
partition: Optional[int],
no_confirm: bool = False,
) -> None:
"""Sends a batch of messages to a Kafka topic."""
async with anyio.create_task_group() as tg:
Expand All @@ -149,6 +193,7 @@ async def send_batch(
partition,
msg["timestamp_ms"],
msg["headers"],
no_confirm,
)

async def ping(
Expand Down
10 changes: 9 additions & 1 deletion faststream/confluent/publisher/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ async def publish( # type: ignore[override]
headers: Optional[Dict[str, str]] = None,
correlation_id: str = "",
reply_to: str = "",
no_confirm: bool = False,
) -> None:
"""Publish a message to a topic."""
message, content_type = encode_message(message)
Expand All @@ -65,6 +66,7 @@ async def publish( # type: ignore[override]
partition=partition,
timestamp_ms=timestamp_ms,
headers=[(i, (j or "").encode()) for i, j in headers_to_send.items()],
no_confirm=no_confirm,
)

async def stop(self) -> None:
Expand All @@ -79,6 +81,7 @@ async def publish_batch(
headers: Optional[Dict[str, str]] = None,
reply_to: str = "",
correlation_id: str = "",
no_confirm: bool = False,
) -> None:
"""Publish a batch of messages to a topic."""
batch = self._producer.create_batch()
Expand Down Expand Up @@ -109,7 +112,12 @@ async def publish_batch(
headers=[(i, j.encode()) for i, j in final_headers.items()],
)

await self._producer.send_batch(batch, topic, partition=partition)
await self._producer.send_batch(
batch,
topic,
partition=partition,
no_confirm=no_confirm,
)

@override
async def request(self, *args: Any, **kwargs: Any) -> Optional[Any]:
Expand Down
6 changes: 5 additions & 1 deletion faststream/confluent/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ async def publish(
headers: Optional[Dict[str, str]] = None,
correlation_id: Optional[str] = None,
reply_to: str = "",
no_confirm: bool = False,
# publisher specific
_extra_middlewares: Iterable["PublisherMiddleware"] = (),
) -> Optional[Any]:
Expand All @@ -184,6 +185,7 @@ async def publish(
kwargs: AnyDict = {
"key": key or self.key,
# basic args
"no_confirm": no_confirm,
"topic": topic or self.topic,
"partition": partition or self.partition,
"timestamp_ms": timestamp_ms,
Expand Down Expand Up @@ -243,8 +245,9 @@ async def publish(
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[Dict[str, str]] = None,
reply_to: str = "",
correlation_id: Optional[str] = None,
reply_to: str = "",
no_confirm: bool = False,
# publisher specific
_extra_middlewares: Iterable["PublisherMiddleware"] = (),
) -> None:
Expand All @@ -258,6 +261,7 @@ async def publish(

kwargs: AnyDict = {
"topic": topic or self.topic,
"no_confirm": no_confirm,
"partition": partition or self.partition,
"timestamp_ms": timestamp_ms,
"headers": headers or self.headers,
Expand Down
Loading

0 comments on commit c57c055

Please sign in to comment.