Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Docs: How to use confluent schema registry with Avro #1774

Open
powersemmi opened this issue Sep 9, 2024 · 3 comments
Open

Docs: How to use confluent schema registry with Avro #1774

powersemmi opened this issue Sep 9, 2024 · 3 comments
Labels
Confluent Issues related to `faststream.confluent` module documentation Improvements or additions to documentation

Comments

@powersemmi
Copy link

powersemmi commented Sep 9, 2024

It seems to me that the documentation lacks an example of using avro with confluence scheme registry. I will attach a working example.

import io
import json
import logging
from dataclasses import dataclass
from typing import Any, Awaitable, Callable, Literal

import fastavro
from confluent_kafka.schema_registry import SchemaRegistryClient
from faststream import FastStream
from faststream.confluent import KafkaBroker, KafkaMessage
from faststream.types import DecodedMessage
from pydantic import BaseModel

logger = logging.getLogger(__name__)
registry = SchemaRegistryClient({'url': "http://localhost:8081"})


@dataclass
class Schema:
    schema_id: int
    schema_obj: dict


# Use the cache, because if the scheme changes before the code is reloaded,
# it will be clear that the service has not yet switched to the new version
SCHEMES_CACHE: dict[str, Schema] = {}


def confluent_encoder(msg: dict, topic: str, subject_type: Literal['value', 'key'] = 'value') -> bytes:
    """
    To comply with the kafka wipe-format, add at the beginning of the message
    1 null byte + 4 bytes of the circuit id from the register
    https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
    """
    subject_name = f'{topic}-{subject_type}'
    if schema := SCHEMES_CACHE.get(subject_name):
        value_schema = registry.get_latest_version(subject_name)
        schema_id = value_schema.schema_id
        schema_obj = json.loads(value_schema.schema.schema_str)
        SCHEMES_CACHE[subject_name] = Schema(schema_id=schema_id, schema_obj=schema_obj)
    else:
        schema_id = schema.schema_id
        schema_obj = schema.schema_obj
    bytes_writer = io.BytesIO()
    fastavro.schemaless_writer(fo=bytes_writer, schema=schema_obj, record=msg)
    return b'\x00' + schema_id.to_bytes(4, byteorder='big') + bytes_writer.getvalue()


def confluent_decoder(msg: KafkaMessage) -> DecodedMessage:
    """
    We pull the schema id out of the message and use it to get the actual schema
    """
    scheme_id = int.from_bytes(msg.body[1:5], byteorder='big', signed=False)
    msg_data = msg.body[5:]
    schema = registry.get_schema(scheme_id)
    if schema.schema_type == 'AVRO':
        schema_obj = json.loads(schema.schema_str)
        return fastavro.schemaless_reader(fo=io.BytesIO(msg_data), writer_schema=schema_obj)
    else:
        raise ValueError(f'Service not support {schema.schema_type}')


def publisher_encoder_factory(topic: str) -> Callable[[Any], Awaitable[Any]]:
    async def encode_message(call_next: Callable[..., Awaitable[Any]], msg: BaseModel | dict, **options: Any) -> Any:
        match msg:
            case BaseModel():
                msg_to_process = msg.model_dump(mode='json')
            case dict():
                msg_to_process = json.dumps(msg)
            case _:
                raise ValueError(f'{type(msg)=} is type not in (BaseModel, dict)')
        return await call_next(confluent_encoder(msg=msg_to_process, topic=topic), **options)

    return encode_message


broker = KafkaBroker()
app = FastStream(broker)

publisher = broker.publisher(
    topic='test',
    middlewares=[publisher_encoder_factory(topic='test')],
)


class Consumer(BaseModel):
    name: str
    age: int


@broker.subscriber("test", decoder=confluent_decoder)
async def consume(body: Consumer):
    logger.info("name=%s, age=%s", body.name, body.age)


@app.after_startup
async def publish():
    msg = {"name": "John", "age": 25}

    await publisher.publish(msg)

Avro scheme sample

{
    "type": "record",
    "namespace": "Person",
    "name": "Person",
    "fields": [
        {"doc": "Name", "type": "string", "name": "name"},
        {"doc": "Age", "type": "int", "name": "age"}
    ]
}
@powersemmi powersemmi added the enhancement New feature or request label Sep 9, 2024
@Lancetnik Lancetnik added documentation Improvements or additions to documentation Confluent Issues related to `faststream.confluent` module and removed enhancement New feature or request labels Sep 9, 2024
@Lancetnik
Copy link
Member

I think, we should please such reciepe in Kafka How-To section
I schedule this task, but we are pretty busy with a code, so any contribution are welcome
Thank you for the investigation and code example a lot! You made a great work!

@powersemmi powersemmi changed the title Docs: How to use avro + schema registry Docs: How to use confluent schema registry Sep 10, 2024
@powersemmi powersemmi changed the title Docs: How to use confluent schema registry Docs: How to use confluent schema registry with Avro Sep 10, 2024
@powersemmi
Copy link
Author

@Lancetnik And it seems to me that an example can be an addition to serialization/examples

@Lancetnik
Copy link
Member

@Lancetnik And it seems to me that an example can be an addition to serialization/examples

We can just add link to this section due your example is Confluent-specific

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Confluent Issues related to `faststream.confluent` module documentation Improvements or additions to documentation
Projects
Status: Quick wins
Development

No branches or pull requests

2 participants