Skip to content

Commit

Permalink
Refactor Factory with a ProcessContext
Browse files Browse the repository at this point in the history
The purpose of this refactor is to make it easier to get a factory in
circumstances other than the FastAPI request handler (such as in a CLI
handler). The ProcessContext holds all clients that can persist across
requests. This design also nicely refactors the code for initializing
FastAPI dependencies out of the main module and into the
ProcessContext.create class method.

This is mimicking the structure of Gafaelfawr's Factory and
ProcessContext.
  • Loading branch information
jonathansick committed Aug 25, 2023
1 parent 8b8483f commit 75bbc6e
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 80 deletions.
61 changes: 35 additions & 26 deletions src/ook/dependencies/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,11 @@
from dataclasses import dataclass
from typing import Any

from aiokafka import AIOKafkaProducer
from algoliasearch.search_client import SearchClient
from fastapi import Depends, Request
from httpx import AsyncClient
from kafkit.fastapi.dependencies.aiokafkaproducer import (
kafka_producer_dependency,
)
from kafkit.fastapi.dependencies.pydanticschemamanager import (
pydantic_schema_manager_dependency,
)
from kafkit.registry.manager import PydanticSchemaManager
from safir.dependencies.http_client import http_client_dependency
from safir.dependencies.logger import logger_dependency
from structlog.stdlib import BoundLogger

from ook.dependencies.algoliasearch import algolia_client_dependency

from ..factory import Factory
from ..factory import Factory, ProcessContext

__all__ = [
"ContextDependency",
Expand Down Expand Up @@ -76,32 +63,54 @@ class ContextDependency:
"""

def __init__(self) -> None:
pass
self._process_context: ProcessContext | None = None

async def __call__(
self,
request: Request,
logger: BoundLogger = Depends(logger_dependency),
http_client: AsyncClient = Depends(http_client_dependency),
kafka_producer: AIOKafkaProducer = Depends(kafka_producer_dependency),
schema_manager: PydanticSchemaManager = Depends(
pydantic_schema_manager_dependency
),
algolia_client: SearchClient = Depends(algolia_client_dependency),
) -> RequestContext:
"""Create a per-request context and return it."""
return RequestContext(
request=request,
logger=logger,
factory=Factory(
logger=logger,
http_client=http_client,
kafka_producer=kafka_producer,
schema_manager=schema_manager,
algolia_client=algolia_client,
logger=logger, process_context=self.process_context
),
)

@property
def process_context(self) -> ProcessContext:
"""The underlying process context, primarily for use in tests."""
if not self._process_context:
raise RuntimeError("ContextDependency not initialized")
return self._process_context

async def initialize(self) -> None:
"""Initialize the process-wide shared context.
Parameters
----------
config
Gafaelfawr configuration.
"""
if self._process_context:
await self._process_context.aclose()
self._process_context = await ProcessContext.create()

def create_factory(self, logger: BoundLogger) -> Factory:
"""Create a factory for use outside a request context."""
return Factory(
logger=logger,
process_context=self.process_context,
)

async def aclose(self) -> None:
"""Clean up the per-process configuration."""
if self._process_context:
await self._process_context.aclose()
self._process_context = None


context_dependency = ContextDependency()
"""The dependency that will return the per-request context."""
126 changes: 104 additions & 22 deletions src/ook/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

from __future__ import annotations

from collections.abc import AsyncIterator
from contextlib import aclosing, asynccontextmanager
from dataclasses import dataclass
from typing import Self

from aiokafka import AIOKafkaProducer
from algoliasearch.search_client import SearchClient
from httpx import AsyncClient
Expand All @@ -18,6 +23,7 @@

from .config import config
from .dependencies.algoliasearch import algolia_client_dependency
from .domain.kafka import LtdUrlIngestV1, UrlIngestKeyV1
from .services.algoliadocindex import AlgoliaDocIndexService
from .services.classification import ClassificationService
from .services.githubmetadata import GitHubMetadataService
Expand All @@ -27,35 +33,110 @@
from .services.sphinxtechnoteingest import SphinxTechnoteIngestService


@dataclass(kw_only=True, frozen=True, slots=True)
class ProcessContext:
"""Holds singletons in the context of a Ook process, which might be a
API server or a CLI command.
"""

http_client: AsyncClient
"""Shared HTTP client."""

kafka_producer: AIOKafkaProducer
"""The aiokafka producer."""

schema_manager: PydanticSchemaManager
"""Pydantic schema manager."""

algolia_client: SearchClient
"""Algolia client."""

@classmethod
async def create(cls) -> ProcessContext:
"""Create a ProcessContext."""
http_client = await http_client_dependency()

# Initialize the Pydantic Schema Manager and register models
await pydantic_schema_manager_dependency.initialize(
http_client=http_client,
registry_url=config.registry_url,
models=[
UrlIngestKeyV1,
LtdUrlIngestV1,
],
suffix=config.subject_suffix,
compatibility=config.subject_compatibility,
)

# Initialize the Kafka producer
await kafka_producer_dependency.initialize(config.kafka)

kafka_producer = await kafka_producer_dependency()
schema_manager = await pydantic_schema_manager_dependency()
algolia_client = await algolia_client_dependency()

return cls(
http_client=http_client,
kafka_producer=kafka_producer,
schema_manager=schema_manager,
algolia_client=algolia_client,
)

async def aclose(self) -> None:
"""Clean up a process context.
Called during shutdown, or before recreating the process context using
a different configuration.
"""
kafka_producer = await kafka_producer_dependency()
await kafka_producer.stop()

algolia_client = await algolia_client_dependency()
await algolia_client.close()

http_client = await http_client_dependency()
await http_client.aclose()


class Factory:
"""A factory for creating Ook services."""

def __init__(
self,
*,
logger: BoundLogger,
http_client: AsyncClient,
kafka_producer: AIOKafkaProducer,
schema_manager: PydanticSchemaManager,
algolia_client: SearchClient,
process_context: ProcessContext,
) -> None:
self._http_client = http_client
self._process_context = process_context
self._logger = logger
self._kafka_producer = kafka_producer
self._schema_manager = schema_manager
self._algolia_client = algolia_client

@classmethod
async def create(cls, *, logger: BoundLogger) -> Factory:
async def create(cls, *, logger: BoundLogger) -> Self:
"""Create a Factory (for use outside a request context)."""
context = await ProcessContext.create()
return cls(
logger=logger,
http_client=await http_client_dependency(),
kafka_producer=await kafka_producer_dependency(),
schema_manager=await pydantic_schema_manager_dependency(),
algolia_client=await algolia_client_dependency(),
process_context=context,
)

@classmethod
@asynccontextmanager
async def create_standalone(
cls, *, logger: BoundLogger
) -> AsyncIterator[Self]:
"""Create a standalone factory, outside the FastAPI process, as a
context manager.
Use this for creating a factory in CLI commands.
"""
factory = await cls.create(logger=logger)
async with aclosing(factory):
yield factory

async def aclose(self) -> None:
"""Shut down the factory and the internal process context."""
await self._process_context.aclose()

def set_logger(self, logger: BoundLogger) -> None:
"""Set the logger for the factory."""
self._logger = logger
Expand All @@ -64,22 +145,23 @@ def set_logger(self, logger: BoundLogger) -> None:
def kafka_producer(self) -> PydanticKafkaProducer:
"""The PydanticKafkaProducer."""
return PydanticKafkaProducer(
producer=self._kafka_producer, schema_manager=self._schema_manager
producer=self._process_context.kafka_producer,
schema_manager=self._process_context.schema_manager,
)

@property
def schema_manager(self) -> PydanticSchemaManager:
"""The PydanticSchemaManager."""
return self._schema_manager
return self._process_context.schema_manager

@property
def http_client(self) -> AsyncClient:
"""The shared HTTP client."""
return self._http_client
return self._process_context.http_client

def create_algolia_doc_index_service(self) -> AlgoliaDocIndexService:
"""Create an Algolia document indexing service."""
index = self._algolia_client.init_index(
index = self._process_context.algolia_client.init_index(
config.algolia_document_index_name
)

Expand All @@ -102,7 +184,7 @@ def create_github_metadata_service(self) -> GitHubMetadataService:
id=config.github_app_id,
key=config.github_app_private_key.get_secret_value(),
name="lsst-sqre/ook",
http_client=self._http_client,
http_client=self.http_client,
)
return GitHubMetadataService(
gh_factory=gh_factory,
Expand All @@ -112,14 +194,14 @@ def create_github_metadata_service(self) -> GitHubMetadataService:
def create_ltd_metadata_service(self) -> LtdMetadataService:
"""Create an LtdMetadataService."""
return LtdMetadataService(
http_client=self._http_client,
http_client=self.http_client,
logger=self._logger,
)

def create_classification_service(self) -> ClassificationService:
"""Create a ClassificationService."""
return ClassificationService(
http_client=self._http_client,
http_client=self.http_client,
github_service=self.create_github_metadata_service(),
ltd_service=self.create_ltd_metadata_service(),
kafka_producer=self.kafka_producer,
Expand All @@ -129,7 +211,7 @@ def create_classification_service(self) -> ClassificationService:
def create_lander_ingest_service(self) -> LtdLanderJsonLdIngestService:
"""Create a LtdLanderJsonLdIngestService."""
return LtdLanderJsonLdIngestService(
http_client=self._http_client,
http_client=self.http_client,
algolia_service=self.create_algolia_doc_index_service(),
github_service=self.create_github_metadata_service(),
logger=self._logger,
Expand All @@ -140,7 +222,7 @@ def create_sphinx_technote_ingest_service(
) -> SphinxTechnoteIngestService:
"""Create a SphinxTechnoteIngestService."""
return SphinxTechnoteIngestService(
http_client=self._http_client,
http_client=self.http_client,
algolia_service=self.create_algolia_doc_index_service(),
github_service=self.create_github_metadata_service(),
logger=self._logger,
Expand Down
5 changes: 3 additions & 2 deletions src/ook/handlers/kafka/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
from structlog.stdlib import BoundLogger

from ook.config import config
from ook.dependencies.context import context_dependency
from ook.domain.kafka import LtdUrlIngestV1, UrlIngestKeyV1
from ook.factory import Factory
from ook.handlers.kafka.handlers import handle_ltd_document_ingest


Expand Down Expand Up @@ -241,7 +241,8 @@ async def add_route(
async def consume_kafka_messages() -> None:
"""Consume Kafka messages."""
logger = get_logger("ook")
factory = await Factory.create(logger=logger)
factory = context_dependency.create_factory(logger)

schema_manager = factory.schema_manager
aiokafka_consumer = AIOKafkaConsumer(
config.ingest_kafka_topic,
Expand Down
34 changes: 4 additions & 30 deletions src/ook/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,13 @@

from fastapi import FastAPI
from fastapi.openapi.utils import get_openapi
from kafkit.fastapi.dependencies.aiokafkaproducer import (
kafka_producer_dependency,
)
from kafkit.fastapi.dependencies.pydanticschemamanager import (
pydantic_schema_manager_dependency,
)
from safir.dependencies.http_client import http_client_dependency
from safir.logging import configure_logging, configure_uvicorn_logging
from safir.middleware.x_forwarded import XForwardedMiddleware
from structlog import get_logger

from ook.dependencies.context import context_dependency

from .config import config
from .dependencies.algoliasearch import algolia_client_dependency
from .domain.kafka import LtdUrlIngestV1, UrlIngestKeyV1
from .handlers.external.paths import external_router
from .handlers.internal.paths import internal_router
from .handlers.kafka.router import consume_kafka_messages
Expand All @@ -44,29 +37,14 @@ async def lifespan(app: FastAPI) -> AsyncIterator:
logger = get_logger("ook")
logger.info("Ook is starting up.")

http_client = await http_client_dependency()

logger.info(
"Schema Registry configuration",
registry_url=config.registry_url,
subject_suffix=config.subject_suffix,
subject_compatibility=config.subject_compatibility,
)

# Initialize the Pydantic Schema Manager and register models
await pydantic_schema_manager_dependency.initialize(
http_client=http_client,
registry_url=config.registry_url,
models=[
UrlIngestKeyV1,
LtdUrlIngestV1,
],
suffix=config.subject_suffix,
compatibility=config.subject_compatibility,
)

# Initialize the Kafka producer
await kafka_producer_dependency.initialize(config.kafka)
await context_dependency.initialize()

if config.enable_kafka_consumer:
kafka_consumer_task = asyncio.create_task(consume_kafka_messages())
Expand All @@ -82,11 +60,7 @@ async def lifespan(app: FastAPI) -> AsyncIterator:
kafka_consumer_task.cancel()
await kafka_consumer_task

await kafka_producer_dependency.stop()

await algolia_client_dependency.close()

await http_client_dependency.aclose()
await context_dependency.aclose()

logger.info("Ook shut down up complete.")

Expand Down

0 comments on commit 75bbc6e

Please sign in to comment.