Skip to content

Commit

Permalink
Add KafkaProducerDependency to the Factory
Browse files Browse the repository at this point in the history
This makes it easier to spin up a Kafka producer from either HTTP path
operations or in the handlers for the Kafka consumer.

Now the Kafka consumer gets it's PydanticSchemaManager from the factory
rather than creating an entirely new one.

This mocking and kafka producer modules are based on Squarebot's; these
need to be moved up into Kafkit.
  • Loading branch information
jonathansick committed Jul 17, 2023
1 parent 24684cc commit aee98d6
Show file tree
Hide file tree
Showing 10 changed files with 333 additions and 16 deletions.
24 changes: 24 additions & 0 deletions src/ook/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,30 @@ class Configuration(BaseSettings):
env="OOK_REGISTRY_URL", title="Schema Registry URL"
)

subject_suffix: str = Field(
"",
title="Schema subject name suffix",
env="OOK_SUBJECT_SUFFIX",
description=(
"Suffix to add to Schema Registry suffix names. This is useful "
"when deploying for testing/staging and you do not "
"want to affect the production subject and its "
"compatibility lineage."
),
)

# TODO convert to enum?
subject_compatibility: str = Field(
"FORWARD_TRANSITIVE",
title="Schema subject compatibility",
env="OOK_SUBJECT_COMPATIBILITY",
description=(
"Compatibility level to apply to Schema Registry subjects. Use "
"NONE for testing and development, but prefer FORWARD_TRANSITIVE "
"for production."
),
)

enable_kafka_consumer: bool = Field(
True,
env="OOK_ENABLE_CONSUMER",
Expand Down
19 changes: 18 additions & 1 deletion src/ook/dependencies/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,16 @@
from dataclasses import dataclass
from typing import Any

from aiokafka import AIOKafkaProducer
from fastapi import Depends, Request
from httpx import AsyncClient
from kafkit.fastapi.dependencies.aiokafkaproducer import (
kafka_producer_dependency,
)
from kafkit.fastapi.dependencies.pydanticschemamanager import (
pydantic_schema_manager_dependency,
)
from kafkit.registry.manager import PydanticSchemaManager
from safir.dependencies.http_client import http_client_dependency
from safir.dependencies.logger import logger_dependency
from structlog.stdlib import BoundLogger
Expand Down Expand Up @@ -72,12 +80,21 @@ async def __call__(
request: Request,
logger: BoundLogger = Depends(logger_dependency),
http_client: AsyncClient = Depends(http_client_dependency),
kafka_producer: AIOKafkaProducer = Depends(kafka_producer_dependency),
schema_manager: PydanticSchemaManager = Depends(
pydantic_schema_manager_dependency
),
) -> RequestContext:
"""Create a per-request context and return it."""
return RequestContext(
request=request,
logger=logger,
factory=Factory(logger=logger, http_client=http_client),
factory=Factory(
logger=logger,
http_client=http_client,
kafka_producer=kafka_producer,
schema_manager=schema_manager,
),
)


Expand Down
14 changes: 7 additions & 7 deletions src/ook/handlers/kafka/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@

from aiokafka import AIOKafkaConsumer, ConsumerRecord
from dataclasses_avroschema.avrodantic import AvroBaseModel
from httpx import AsyncClient
from kafkit.registry import UnmanagedSchemaError
from kafkit.registry.httpx import RegistryApi
from kafkit.registry.manager import PydanticSchemaManager
from structlog import get_logger

from ook.config import config
from ook.domain.kafka import LtdUrlIngestV1, UrlIngestKeyV1
from ook.handlers.kafka.handlers import handle_ltd_document_ingest
from ook.services.factory import Factory


class HandlerProtocol(Protocol):
Expand Down Expand Up @@ -194,13 +194,13 @@ async def add_route(
)


async def consume_kafka_messages(http_client: AsyncClient) -> None:
async def consume_kafka_messages() -> None:
"""Consume Kafka messages."""
# Set up the schema manager
registry = RegistryApi(http_client=http_client, url=config.registry_url)
schema_manager = PydanticSchemaManager(registry=registry)
logger = get_logger("ook")
factory = await Factory.create(logger=logger)
schema_manager = factory.schema_manager
aiokafka_consumer = AIOKafkaConsumer(
[config.ingest_kafka_topic], # TODO add topics
[config.ingest_kafka_topic],
bootstrap_servers=config.kafka.bootstrap_servers,
group_id=config.kafka_consumer_group_id,
security_protocol=config.kafka.security_protocol,
Expand Down
39 changes: 35 additions & 4 deletions src/ook/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,19 @@

from fastapi import FastAPI
from fastapi.openapi.utils import get_openapi
from kafkit.fastapi.dependencies.aiokafkaproducer import (
kafka_producer_dependency,
)
from kafkit.fastapi.dependencies.pydanticschemamanager import (
pydantic_schema_manager_dependency,
)
from safir.dependencies.http_client import http_client_dependency
from safir.logging import configure_logging, configure_uvicorn_logging
from safir.middleware.x_forwarded import XForwardedMiddleware
from structlog import get_logger

from .config import config
from .domain.kafka import LtdUrlIngestV1, UrlIngestKeyV1
from .handlers.external.paths import external_router
from .handlers.internal.paths import internal_router
from .handlers.kafka.router import consume_kafka_messages
Expand All @@ -33,13 +40,35 @@
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator:
"""Context manager for the application lifespan."""
logger = get_logger()
logger = get_logger("ook")
logger.info("Ook is starting up.")

http_client = await http_client_dependency()

logger.info(
"Schema Registry configuration",
registry_url=config.registry_url,
subject_suffix=config.subject_suffix,
subject_compatibility=config.subject_compatibility,
)

# Initialize the Pydantic Schema Manager and register models
await pydantic_schema_manager_dependency.initialize(
http_client=http_client,
registry_url=config.registry_url,
models=[
UrlIngestKeyV1,
LtdUrlIngestV1,
],
suffix=config.subject_suffix,
compatibility=config.subject_compatibility,
)

# Initialize the Kafka producer
await kafka_producer_dependency.initialize(config.kafka)

if config.enable_kafka_consumer:
kafka_consumer_task = asyncio.create_task(
consume_kafka_messages(await http_client_dependency())
)
kafka_consumer_task = asyncio.create_task(consume_kafka_messages())

logger.info("Ook start up complete.")

Expand All @@ -52,6 +81,8 @@ async def lifespan(app: FastAPI) -> AsyncIterator:
kafka_consumer_task.cancel()
await kafka_consumer_task

await kafka_producer_dependency.stop()

await http_client_dependency.aclose()

logger.info("Ook shut down up complete.")
Expand Down
37 changes: 35 additions & 2 deletions src/ook/services/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,16 @@

from __future__ import annotations

from aiokafka import AIOKafkaProducer
from algoliasearch.search_client import SearchClient
from httpx import AsyncClient
from kafkit.fastapi.dependencies.aiokafkaproducer import (
kafka_producer_dependency,
)
from kafkit.fastapi.dependencies.pydanticschemamanager import (
pydantic_schema_manager_dependency,
)
from kafkit.registry.manager import PydanticSchemaManager
from safir.dependencies.http_client import http_client_dependency
from safir.github import GitHubAppClientFactory
from structlog.stdlib import BoundLogger
Expand All @@ -14,27 +22,52 @@
from .algoliadocindex import AlgoliaDocIndexService
from .classification import ClassificationService
from .githubmetadata import GitHubMetadataService
from .kafkaproducer import PydanticKafkaProducer
from .landerjsonldingest import LtdLanderJsonLdIngestService


class Factory:
"""A factory for creating Ook services."""

def __init__(
self, *, logger: BoundLogger, http_client: AsyncClient
self,
*,
logger: BoundLogger,
http_client: AsyncClient,
kafka_producer: AIOKafkaProducer,
schema_manager: PydanticSchemaManager,
) -> None:
self._http_client = http_client
self._logger = logger
self._kafka_producer = kafka_producer
self._schema_manager = schema_manager

@classmethod
async def create(cls, *, logger: BoundLogger) -> Factory:
"""Create a Factory (for use outside a request context)."""
return cls(logger=logger, http_client=await http_client_dependency())
return cls(
logger=logger,
http_client=await http_client_dependency(),
kafka_producer=await kafka_producer_dependency(),
schema_manager=await pydantic_schema_manager_dependency(),
)

def set_logger(self, logger: BoundLogger) -> None:
"""Set the logger for the factory."""
self._logger = logger

@property
def kafka_producer(self) -> PydanticKafkaProducer:
"""The PydanticKafkaProducer."""
return PydanticKafkaProducer(
producer=self._kafka_producer, schema_manager=self._schema_manager
)

@property
def schema_manager(self) -> PydanticSchemaManager:
"""The PydanticSchemaManager."""
return self._schema_manager

@property
def http_client(self) -> AsyncClient:
"""The shared HTTP client."""
Expand Down
89 changes: 89 additions & 0 deletions src/ook/services/kafkaproducer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
"""Kafka producer service that accepts with Pydantic models."""

from __future__ import annotations

from asyncio import Future

import aiokafka
from dataclasses_avroschema.avrodantic import AvroBaseModel
from kafkit.registry import manager


class PydanticKafkaProducer:
"""Kafka producer that sends Pydantic models for message values and keys,
built around aiokafka.
Parameters
----------
producer
The aiokafka producer.
schema_manager
The Pydantic schema manager used by the Pydantic Kafka producer.
"""

def __init__(
self,
producer: aiokafka.AIOKafkaProducer,
schema_manager: manager.PydanticSchemaManager,
) -> None:
self._producer = producer
self._schema_manager = schema_manager

@property
def aiokafka_producer(self) -> aiokafka.AIOKafkaProducer:
"""The aiokafka producer (access-only)."""
return self._producer

@property
def schema_manager(self) -> manager.PydanticSchemaManager:
"""The Pydantic schema manager used by the Pydantic Kafka
producer (access-only).
"""
return self._schema_manager

async def send(
self,
*,
topic: str,
value: AvroBaseModel,
key: AvroBaseModel | None = None,
partition: None | None = None,
timestamp_ms: int | None = None,
headers: dict[str, bytes] | None = None,
) -> Future:
"""Send a message to a Kafka topic.
Parameters
----------
topic
The topic to send the message to.
value
The message value.
key
The message key.
partition
The partition to send the message to.
timestamp_ms
The timestamp of the message.
headers
The headers of the message.
Returns
-------
asyncio.Future
A future that resolves when the message is sent.
"""
serialized_value = await self._schema_manager.serialize(value)
if key:
serialized_key = await self._schema_manager.serialize(key)
else:
serialized_key = None

return await self._producer.send(
topic,
value=serialized_value,
key=serialized_key,
partition=partition,
timestamp_ms=timestamp_ms,
headers=headers,
)
27 changes: 25 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,35 @@

from __future__ import annotations

from collections.abc import AsyncIterator
from collections.abc import AsyncIterator, Iterator
from unittest.mock import Mock

import pytest
import pytest_asyncio
from asgi_lifespan import LifespanManager
from fastapi import FastAPI
from httpx import AsyncClient

from ook import main

from .support.kafkaproducer import patch_aiokafkaproducer
from .support.schemamanager import (
MockPydanticSchemaManager,
patch_schema_manager,
)


@pytest.fixture
def mock_schema_manager() -> Iterator[MockPydanticSchemaManager]:
"""Return a mock PydanticSchemaManager for testing."""
yield from patch_schema_manager()


@pytest.fixture
def mock_kafka_producer() -> Iterator[Mock]:
"""Return a mock KafkaProducer for testing."""
yield from patch_aiokafkaproducer()


@pytest_asyncio.fixture
async def http_client() -> AsyncIterator[AsyncClient]:
Expand All @@ -19,7 +39,10 @@ async def http_client() -> AsyncIterator[AsyncClient]:


@pytest_asyncio.fixture
async def app() -> AsyncIterator[FastAPI]:
async def app(
mock_kafka_producer: Mock,
mock_schema_manager: MockPydanticSchemaManager,
) -> AsyncIterator[FastAPI]:
"""Return a configured test application.
Wraps the application in a lifespan manager so that startup and shutdown
Expand Down
1 change: 1 addition & 0 deletions tests/support/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Helpers and mocks for the tests."""
Loading

0 comments on commit aee98d6

Please sign in to comment.