diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 3ade07b..ddaef9f 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -1,5 +1,8 @@ name: CI +env: + PYTHON_VERSION: "3.12" + "on": merge_group: {} pull_request: {} @@ -21,25 +24,22 @@ jobs: lint: runs-on: ubuntu-latest + timeout-minutes: 15 steps: - uses: actions/checkout@v4 - name: Set up Python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: - python-version: "3.11" + python-version: ${{ env.PYTHON_VERSION }} - name: Run pre-commit - uses: pre-commit/action@v3.0.0 + uses: pre-commit/action@v3.0.1 test: runs-on: ubuntu-latest - - strategy: - matrix: - python: - - "3.11" + timeout-minutes: 15 steps: - uses: actions/checkout@v4 @@ -47,15 +47,16 @@ jobs: - name: Set up Python uses: actions/setup-python@v4 with: - python-version: "3.11" - - - name: Install nox - run: | - pip install --upgrade pip - pip install --upgrade nox + python-version: ${{ env.PYTHON_VERSION }} - name: Run nox - run: "nox -s typing test" + uses: lsst-sqre/run-nox@v1 + with: + cache-dependency: "requirements/*.txt" + cache-key-prefix: test + nox-sessions: "typing test" + python-version: ${{ env.PYTHON_VERSION }} + nox-package: "nox[uv] testcontainers[kafka]" docs: runs-on: ubuntu-latest @@ -76,15 +77,16 @@ jobs: - name: Set up Python uses: actions/setup-python@v4 with: - python-version: "3.11" - - - name: Install nox - run: | - pip install --upgrade pip - pip install --upgrade nox + python-version: ${{ env.PYTHON_VERSION }} - name: Run nox - run: "nox -s docs" + uses: lsst-sqre/run-nox@v1 + with: + cache-dependency: "requirements/*.txt" + cache-key-prefix: docs + nox-sessions: "docs" + python-version: ${{ env.PYTHON_VERSION }} + nox-package: "nox[uv] testcontainers[kafka]" # Only attempt documentation uploads for long-lived branches, tagged # releases, and pull requests from ticket branches. This avoids version diff --git a/.github/workflows/dependencies.yaml b/.github/workflows/dependencies.yaml deleted file mode 100644 index 3ea8e2e..0000000 --- a/.github/workflows/dependencies.yaml +++ /dev/null @@ -1,33 +0,0 @@ -name: Dependency Update - -"on": - schedule: - - cron: "0 12 * * 1" - workflow_dispatch: {} - -jobs: - update: - runs-on: ubuntu-latest - timeout-minutes: 10 - - steps: - - uses: actions/checkout@v4 - - - name: Run neophile - uses: lsst-sqre/run-neophile@v1 - with: - python-version: "3.11" - mode: pr - types: pre-commit - app-id: ${{ secrets.NEOPHILE_APP_ID }} - app-secret: ${{ secrets.NEOPHILE_PRIVATE_KEY }} - - - name: Report status - if: always() - uses: ravsamhq/notify-slack-action@v2 - with: - status: ${{ job.status }} - notify_when: "failure" - notification_title: "Periodic dependency update for {repo} failed" - env: - SLACK_WEBHOOK_URL: ${{ secrets.SLACK_ALERT_WEBHOOK }} diff --git a/.github/workflows/periodic-ci.yaml b/.github/workflows/periodic-ci.yaml index aa88b4a..00bf3fb 100644 --- a/.github/workflows/periodic-ci.yaml +++ b/.github/workflows/periodic-ci.yaml @@ -5,6 +5,9 @@ name: Periodic CI +env: + PYTHON_VERSION: "3.12" + "on": schedule: - cron: "0 12 * * 1" @@ -15,30 +18,22 @@ jobs: runs-on: ubuntu-latest timeout-minutes: 10 - strategy: - matrix: - python: - - "3.11" - steps: - uses: actions/checkout@v4 - # Use the oldest supported version of Python to update dependencies, - # not the matrixed Python version, since this accurately reflects - # how dependencies should later be updated. - - name: Run neophile - uses: lsst-sqre/run-neophile@v1 + - name: Update dependencies + uses: lsst-sqre/run-nox@v1 with: - python-version: "3.11" - mode: update - - - name: Install nox - run: | - pip install --upgrade pip - pip install --upgrade nox + nox-sessions: "update-deps" + python-version: ${{ env.PYTHON_VERSION }} - name: Run nox - run: "nox -s typing test docs" + uses: lsst-sqre/run-nox@v1 + with: + nox-sessions: "typing test docs" + python-version: ${{ env.PYTHON_VERSION }} + use-cache: false + nox-package: "nox[uv] testcontainers[kafka]" - name: Report status if: always() diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b9a8d97..42a3dce 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,6 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.5.0 + rev: v4.6.0 hooks: - id: trailing-whitespace - id: check-yaml @@ -11,12 +11,12 @@ repos: args: [--autofix, --indent=2, '--top-keys=name,doc,type'] - repo: https://github.com/charliermarsh/ruff-pre-commit - rev: v0.0.290 + rev: v0.5.7 hooks: - id: ruff args: [--fix, --exit-non-zero-on-fix] - repo: https://github.com/psf/black - rev: 23.11.0 + rev: 24.8.0 hooks: - id: black diff --git a/CHANGELOG.md b/CHANGELOG.md index ee67dd2..f8cd2d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,21 @@ + +## 0.10.0 (2024-08-14) + +### New features + +- Ook now uses [faststream](https://faststream.airt.ai/latest/) for managing its Kafka consumer and producer. This is also how the Squarebot ecosystem operates. With this change, Ook no longer uses the Confluent Schema Registry. Schemas are instead developed as Pydantic models. + +### Other changes + +- Use `uv` for installing and compiling dependencies in `noxfile.py`. +- Update GitHub Actions workflows to use the [lsst-sqre/run-nox](https://github.com/lsst-sqre/run-nox) GitHub Action. +- Adopt `ruff-shared.toml` for shared Ruff configuration (from https://github.com/lsst/templates) +- Update Docker base to Python 3.12.5-slim-bookworm. +- Switch to [testcontainers](https://testcontainers.com) for running Kafka during test sessions. The Kafka brokers is automatically started by the `nox` sessions. + ## 0.9.1 (2024-01-29) diff --git a/Dockerfile b/Dockerfile index 0f3b55f..65f6245 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,7 +14,7 @@ # - Runs a non-root user. # - Sets up the entrypoint and port. -FROM python:3.11.5-slim-bullseye as base-image +FROM python:3.12.5-slim-bookworm as base-image # Update system packages COPY scripts/install-base-packages.sh . diff --git a/docker-compose.yaml b/docker-compose.yaml deleted file mode 100644 index 9bea249..0000000 --- a/docker-compose.yaml +++ /dev/null @@ -1,47 +0,0 @@ ---- -version: "2" -services: - zookeeper: - image: confluentinc/cp-zookeeper:7.4.0 - hostname: zookeeper - container_name: zookeeper - ports: - - "2181:2181" - environment: - ZOOKEEPER_CLIENT_PORT: 2181 - ZOOKEEPER_TICK_TIME: 2000 - - broker: - image: confluentinc/cp-kafka:7.4.0 - hostname: broker - container_name: broker - depends_on: - - zookeeper - ports: - - "29092:29092" - - "9092:9092" - - "9101:9101" - environment: - KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - KAFKA_JMX_PORT: 9101 - KAFKA_JMX_HOSTNAME: localhost - - schema-registry: - image: confluentinc/cp-schema-registry:7.4.0 - hostname: schema-registry - container_name: schema-registry - depends_on: - - broker - ports: - - "8081:8081" - environment: - SCHEMA_REGISTRY_HOST_NAME: schema-registry - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "broker:29092" - SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 diff --git a/docs/documenteer.toml b/docs/documenteer.toml index 8778b34..6f3ac04 100644 --- a/docs/documenteer.toml +++ b/docs/documenteer.toml @@ -50,6 +50,7 @@ nitpick_ignore = [ ["py:class", "starlette.routing.Route"], ["py:class", "starlette.routing.BaseRoute"], ["py:exc", "starlette.exceptions.HTTPException"], + ["py:class", "pydantic_settings.sources.CliSettingsSource"], ] [sphinx.intersphinx.projects] diff --git a/docs/user-guide/configuration-reference.rst b/docs/user-guide/configuration-reference.rst index e1a96c0..3ef4b50 100644 --- a/docs/user-guide/configuration-reference.rst +++ b/docs/user-guide/configuration-reference.rst @@ -6,6 +6,3 @@ Configuration reference .. autopydantic_settings:: ook.config.KafkaConnectionSettings -.. autoclass:: kafkit.settings.KafkaSecurityProtocol - -.. autoclass:: kafkit.settings.KafkaSaslMechanism diff --git a/noxfile.py b/noxfile.py index 2e61600..29f1ce7 100644 --- a/noxfile.py +++ b/noxfile.py @@ -4,13 +4,12 @@ nox.options.sessions = ["lint", "typing", "test", "docs"] # Other nox defaults -nox.options.default_venv_backend = "venv" +nox.options.default_venv_backend = "uv" nox.options.reuse_existing_virtualenvs = True # Pip installable dependencies PIP_DEPENDENCIES = [ - ("--upgrade", "pip", "setuptools", "wheel"), ("-r", "requirements/main.txt"), ("-r", "requirements/dev.txt"), ("-e", "."), @@ -19,26 +18,29 @@ def _install(session: nox.Session) -> None: """Install the application and all dependencies into the session.""" + session.install("--upgrade", "uv") for deps in PIP_DEPENDENCIES: session.install(*deps) -def _make_env_vars() -> dict[str, str]: +def _make_env_vars(overrides: dict[str, str] | None = None) -> dict[str, str]: """Create a environment variable dictionary for test sessions that enables the app to start up. """ - return { + env_vars = { "SAFIR_PROFILE": "development", "SAFIR_LOG_LEVEL": "DEBUG", "SAFIR_ENVIRONMENT_URL": "http://example.com/", "KAFKA_BOOTSTRAP_SERVERS": "localhost:9092", - "OOK_REGISTRY_URL": "http://localhost:8081", "OOK_ENABLE_CONSUMER": "false", "ALGOLIA_APP_ID": "test", "ALGOLIA_API_KEY": "test", "OOK_GITHUB_APP_ID": "1234", "OOK_GITHUB_APP_PRIVATE_KEY": "test", } + if overrides: + env_vars.update(overrides) + return env_vars def _install_dev(session: nox.Session, bin_prefix: str = "") -> None: @@ -49,10 +51,18 @@ def _install_dev(session: nox.Session, bin_prefix: str = "") -> None: precommit = f"{bin_prefix}pre-commit" # Install dev dependencies + session.run(python, "-m", "pip", "install", "uv", external=True) for deps in PIP_DEPENDENCIES: - session.run(python, "-m", "pip", "install", *deps, external=True) + session.run(python, "-m", "uv", "pip", "install", *deps, external=True) session.run( - python, "-m", "pip", "install", "nox", "pre-commit", external=True + python, + "-m", + "uv", + "pip", + "install", + "nox", + "pre-commit", + external=True, ) # Install pre-commit hooks session.run(precommit, "install", external=True) @@ -94,58 +104,77 @@ def typing(session: nox.Session) -> None: @nox.session def test(session: nox.Session) -> None: """Run pytest.""" + from testcontainers.kafka import KafkaContainer + _install(session) - session.run( - "pytest", - "--cov=ook", - "--cov-branch", - *session.posargs, - env=_make_env_vars(), - ) + + with KafkaContainer().with_kraft() as kafka: + session.run( + "pytest", + "--cov=ook", + "--cov-branch", + *session.posargs, + env=_make_env_vars( + {"KAFKA_BOOTSTRAP_SERVERS": kafka.get_bootstrap_server()} + ), + ) @nox.session def docs(session: nox.Session) -> None: """Build the docs.""" + from testcontainers.kafka import KafkaContainer + _install(session) + session.install("setuptools") # for sphinxcontrib-redoc (pkg_resources) doctree_dir = (session.cache_dir / "doctrees").absolute() - with session.chdir("docs"): - session.run( - "sphinx-build", - "-W", - "--keep-going", - "-n", - "-T", - "-b", - "html", - "-d", - str(doctree_dir), - ".", - "./_build/html", - env=_make_env_vars(), - ) + + with KafkaContainer().with_kraft() as kafka: + with session.chdir("docs"): + session.run( + "sphinx-build", + # "-W", # Disable warnings-as-errors for now + "--keep-going", + "-n", + "-T", + "-b", + "html", + "-d", + str(doctree_dir), + ".", + "./_build/html", + env=_make_env_vars( + {"KAFKA_BOOTSTRAP_SERVERS": kafka.get_bootstrap_server()} + ), + ) @nox.session(name="docs-linkcheck") def docs_linkcheck(session: nox.Session) -> None: """Linkcheck the docs.""" + from testcontainers.kafka import KafkaContainer + _install(session) + session.install("setuptools") # for sphinxcontrib-redoc (pkg_resources) doctree_dir = (session.cache_dir / "doctrees").absolute() - with session.chdir("docs"): - session.run( - "sphinx-build", - "-W", - "--keep-going", - "-n", - "-T", - "-b", - "linkcheck", - "-d", - str(doctree_dir), - ".", - "./_build/html", - env=_make_env_vars(), - ) + with KafkaContainer().with_kraft() as kafka: + with session.chdir("docs"): + session.run( + "sphinx-build", + # "-W", # Disable warnings-as-errors for now + "--keep-going", + "-n", + "-T", + "-b", + "linkcheck", + "-d", + str(doctree_dir), + ".", + "./_build/html", + env=_make_env_vars( + {"KAFKA_BOOTSTRAP_SERVERS": kafka.get_bootstrap_server()} + ), + ) @nox.session(name="scriv-create") @@ -165,30 +194,28 @@ def scriv_collect(session: nox.Session) -> None: @nox.session(name="update-deps") def update_deps(session: nox.Session) -> None: """Update pinned server dependencies and pre-commit hooks.""" - session.install( - "--upgrade", "pip-tools", "pip", "setuptools", "wheel", "pre-commit" - ) + session.install("--upgrade", "uv", "wheel", "pre-commit") session.run("pre-commit", "autoupdate") # Dependencies are unpinned for compatibility with the unpinned client # dependency. session.run( - "pip-compile", + "uv", + "pip", + "compile", "--upgrade", "--build-isolation", - "--allow-unsafe", - "--resolver=backtracking", "--output-file", "requirements/main.txt", "requirements/main.in", ) session.run( - "pip-compile", + "uv", + "pip", + "compile", "--upgrade", "--build-isolation", - "--allow-unsafe", - "--resolver=backtracking", "--output-file", "requirements/dev.txt", "requirements/dev.in", @@ -200,11 +227,16 @@ def update_deps(session: nox.Session) -> None: @nox.session(name="run") def run(session: nox.Session) -> None: """Run the application in development mode.""" - # Note this doesn't work right now because Kafka is needed for the app. + from testcontainers.kafka import KafkaContainer + _install(session) - session.run( - "uvicorn", - "ook.main:app", - "--reload", - env=_make_env_vars(), - ) + + with KafkaContainer().with_kraft() as kafka: + session.run( + "uvicorn", + "ook.main:app", + "--reload", + env=_make_env_vars( + {"KAFKA_BOOTSTRAP_SERVERS": kafka.get_bootstrap_server()} + ), + ) diff --git a/pyproject.toml b/pyproject.toml index d9d7132..102a0b4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -113,51 +113,16 @@ warn_untyped_fields = true # Reference for settings: https://beta.ruff.rs/docs/settings/ # Reference for rules: https://beta.ruff.rs/docs/rules/ [tool.ruff] -exclude = [ - "docs/**", -] -line-length = 79 -ignore = [ - "ANN101", # self should not have a type annotation - "ANN102", # cls should not have a type annotation - "ANN401", # sometimes Any is the right type - "ARG001", # unused function arguments are often legitimate - "ARG002", # unused method arguments are often legitimate - "ARG005", # unused lambda arguments are often legitimate - "BLE001", # we want to catch and report Exception in background tasks - "C414", # nested sorted is how you sort by multiple keys with reverse - "COM812", # omitting trailing commas allows black autoreformatting - "D102", # sometimes we use docstring inheritence - "D104", # don't see the point of documenting every package - "D105", # our style doesn't require docstrings for magic methods - "D106", # Pydantic uses a nested Config class that doesn't warrant docs - "D205", # Summary-only docs are fine for some methods - "EM101", # justification (duplicate string in traceback) is silly - "EM102", # justification (duplicate string in traceback) is silly - "FIX002", # TODOs are okay - "FBT003", # positional booleans are normal for Pydantic field defaults - "G004", # forbidding logging f-strings is appealing, but not our style - "RET505", # disagree that omitting else always makes code more readable - "PLR0913", # factory pattern uses constructors with many arguments - "PLR2004", # too aggressive about magic values - "S105", # good idea but too many false positives on non-passwords - "S106", # good idea but too many false positives on non-passwords - "SIM102", # sometimes the formatting of nested if statements is clearer - "SIM117", # sometimes nested with contexts are clearer - "T201", # allow print - "TCH001", # we decided to not maintain separate TYPE_CHECKING blocks - "TCH002", # we decided to not maintain separate TYPE_CHECKING blocks - "TCH003", # we decided to not maintain separate TYPE_CHECKING blocks - "TD002", # skip checking TODOs - "TD003", # skip checking TODOs - "TD004", # skip requiring TODOs to have issue links - "TID252", # if we're going to use relative imports, use them always - "TRY003", # good general advice but lint is way too aggressive +extend = "ruff-shared.toml" + +[tool.ruff.lint] +# Skip unused variable rules (`F841`). +extend-ignore = [ + "TD002", # don't require authors for TODOs + "TD004", # don't require colons for TODOs ] -select = ["ALL"] -target-version = "py311" -[tool.ruff.per-file-ignores] +[tool.ruff.lint.extend-per-file-ignores] "src/ook/handlers/**" = [ "D103", # FastAPI handlers should not have docstrings ] @@ -173,6 +138,7 @@ target-version = "py311" "PT012", # way too aggressive about limiting pytest.raises blocks "S101", # tests should use assert "SLF001", # tests are allowed to access private members + "T201", # tests can print ] "noxfile.py" = [ "D100", # noxfile doesn't need docstrings @@ -189,43 +155,16 @@ target-version = "py311" "TRY301", # Bad exception raising ] -[tool.ruff.isort] +[tool.ruff.lint.isort] known-first-party = ["ook", "tests"] split-on-trailing-comma = false -[tool.ruff.flake8-bugbear] -extend-immutable-calls = [ - "fastapi.Form", - "fastapi.Header", - "fastapi.Depends", - "fastapi.Path", - "fastapi.Query", -] - -# These are too useful as attributes or methods to allow the conflict with the -# built-in to rule out their use. -[tool.ruff.flake8-builtins] -builtins-ignorelist = [ - "all", - "any", - "help", - "id", - "list", - "type", -] - -[tool.ruff.flake8-pytest-style] -fixture-parentheses = false -mark-parentheses = false - -[tool.ruff.pep8-naming] +[tool.ruff.lint.pep8-naming] classmethod-decorators = [ - "pydantic.root_validator", - "pydantic.validator", + "pydantic.root_validator", # for pydantic v1 + "pydantic.validator", # for pydantic v1 ] -[tool.ruff.pydocstyle] -convention = "numpy" [tool.scriv] categories = [ diff --git a/requirements/dev.in b/requirements/dev.in index 309b688..32b3da5 100644 --- a/requirements/dev.in +++ b/requirements/dev.in @@ -21,5 +21,6 @@ pytest-cov respx types-dateparser types-PyYAML -documenteer[guide] == 1.0.0a6 # before pydantic 2 migration +documenteer[guide] autodoc_pydantic +testcontainers[kafka] diff --git a/requirements/dev.txt b/requirements/dev.txt index 064eea3..985b6ee 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -1,266 +1,360 @@ -# -# This file is autogenerated by pip-compile with Python 3.11 -# by the following command: -# -# pip-compile --allow-unsafe --output-file=requirements/dev.txt requirements/dev.in -# -alabaster==0.7.13 +# This file was autogenerated by uv via the following command: +# uv pip compile --build-isolation --output-file requirements/dev.txt requirements/dev.in +alabaster==1.0.0 # via sphinx -anyio==3.7.1 +annotated-types==0.7.0 # via # -c requirements/main.txt - # httpcore + # pydantic +anyio==4.4.0 + # via + # -c requirements/main.txt + # httpx +appnope==0.1.4 + # via ipykernel asgi-lifespan==2.1.0 # via -r requirements/dev.in -attrs==23.1.0 +asttokens==2.4.1 + # via stack-data +attrs==24.2.0 # via # -c requirements/main.txt # jsonschema + # jupyter-cache # referencing -autodoc-pydantic==1.9.0 +autodoc-pydantic==2.2.0 # via -r requirements/dev.in -babel==2.12.1 +babel==2.16.0 # via sphinx -beautifulsoup4==4.12.2 +beautifulsoup4==4.12.3 # via pydata-sphinx-theme -certifi==2023.7.22 +certifi==2024.7.4 # via # -c requirements/main.txt # httpcore # httpx # requests + # sphinx-prompt cfgv==3.4.0 # via pre-commit -charset-normalizer==3.2.0 +charset-normalizer==3.3.2 # via # -c requirements/main.txt # requests -click==8.1.3 +click==8.1.7 # via # -c requirements/main.txt # documenteer -contourpy==1.1.1 - # via matplotlib -coverage[toml]==7.3.1 + # jupyter-cache +comm==0.2.2 + # via ipykernel +coverage==7.6.1 # via # -r requirements/dev.in # pytest-cov -cycler==0.11.0 - # via matplotlib -distlib==0.3.7 +debugpy==1.8.5 + # via ipykernel +decorator==5.1.1 + # via ipython +distlib==0.3.8 # via virtualenv -documenteer[guide]==1.0.0a6 +docker==7.1.0 + # via testcontainers +documenteer==1.4.0 # via -r requirements/dev.in -docutils==0.20.1 +docutils==0.21.2 # via + # documenteer # myst-parser # pybtex-docutils # pydata-sphinx-theme # sphinx + # sphinx-jinja # sphinx-prompt # sphinxcontrib-bibtex -filelock==3.12.4 +executing==2.0.1 + # via stack-data +fastjsonschema==2.20.0 + # via nbformat +filelock==3.15.4 # via virtualenv -fonttools==4.42.1 - # via matplotlib -gitdb==4.0.10 +gitdb==4.0.11 # via gitpython -gitpython==3.1.37 +gitpython==3.1.43 # via documenteer h11==0.14.0 # via # -c requirements/main.txt # httpcore -httpcore==0.18.0 +httpcore==1.0.5 # via # -c requirements/main.txt # httpx -httpx==0.25.0 +httpx==0.27.0 # via # -c requirements/main.txt # -r requirements/dev.in # respx -identify==2.5.29 +identify==2.6.0 # via pre-commit -idna==3.4 +idna==3.7 # via # -c requirements/main.txt # anyio # httpx # requests + # sphinx-prompt imagesize==1.4.1 # via sphinx +importlib-metadata==8.2.0 + # via + # jupyter-cache + # myst-nb iniconfig==2.0.0 # via pytest -jinja2==3.1.2 +ipykernel==6.29.5 + # via myst-nb +ipython==8.26.0 + # via + # ipykernel + # myst-nb +jedi==0.19.1 + # via ipython +jinja2==3.1.4 # via # myst-parser # sphinx + # sphinx-jinja + # sphinxcontrib-redoc +jsonschema==4.23.0 + # via + # nbformat # sphinxcontrib-redoc -jsonschema==4.19.1 - # via sphinxcontrib-redoc -jsonschema-specifications==2023.7.1 +jsonschema-specifications==2023.12.1 # via jsonschema -kiwisolver==1.4.5 - # via matplotlib -latexcodec==2.0.1 +jupyter-cache==1.0.0 + # via myst-nb +jupyter-client==8.6.2 + # via + # ipykernel + # nbclient +jupyter-core==5.7.2 + # via + # ipykernel + # jupyter-client + # nbclient + # nbformat +latexcodec==3.0.0 # via pybtex -linkify-it-py==2.0.2 +linkify-it-py==2.0.3 # via markdown-it-py -markdown-it-py[linkify]==3.0.0 +markdown-it-py==3.0.0 # via + # -c requirements/main.txt # documenteer # mdit-py-plugins # myst-parser -markupsafe==2.1.3 +markupsafe==2.1.5 # via jinja2 -matplotlib==3.8.0 - # via sphinxext-opengraph -mdit-py-plugins==0.4.0 +matplotlib-inline==0.1.7 + # via + # ipykernel + # ipython +mdit-py-plugins==0.4.1 # via myst-parser mdurl==0.1.2 - # via markdown-it-py -mypy==1.5.1 + # via + # -c requirements/main.txt + # markdown-it-py +mypy==1.11.1 # via -r requirements/dev.in mypy-extensions==1.0.0 # via mypy -myst-parser==2.0.0 +myst-nb==1.1.1 # via documenteer -nodeenv==1.8.0 - # via pre-commit -numpy==1.26.0 +myst-parser==4.0.0 + # via + # documenteer + # myst-nb +nbclient==0.10.0 + # via + # jupyter-cache + # myst-nb +nbformat==5.10.4 # via - # contourpy - # matplotlib -packaging==23.1 + # jupyter-cache + # myst-nb + # nbclient +nest-asyncio==1.6.0 + # via ipykernel +nodeenv==1.9.1 + # via pre-commit +packaging==24.1 # via # -c requirements/main.txt - # matplotlib + # ipykernel # pydata-sphinx-theme # pytest # sphinx -pillow==10.0.1 - # via matplotlib -platformdirs==3.10.0 - # via virtualenv -pluggy==1.3.0 +parso==0.8.4 + # via jedi +pexpect==4.9.0 + # via ipython +platformdirs==4.2.2 + # via + # jupyter-core + # virtualenv +pluggy==1.5.0 # via pytest -pre-commit==3.4.0 +pre-commit==3.8.0 # via -r requirements/dev.in +prompt-toolkit==3.0.47 + # via ipython +psutil==6.0.0 + # via ipykernel +ptyprocess==0.7.0 + # via pexpect +pure-eval==0.2.3 + # via stack-data pybtex==0.24.0 # via # pybtex-docutils # sphinxcontrib-bibtex pybtex-docutils==1.0.3 # via sphinxcontrib-bibtex -pydantic==1.10.12 +pydantic==2.8.2 # via # -c requirements/main.txt # -r requirements/dev.in # autodoc-pydantic # documenteer + # pydantic-settings +pydantic-core==2.20.1 + # via + # -c requirements/main.txt + # pydantic +pydantic-settings==2.4.0 + # via + # -c requirements/main.txt + # autodoc-pydantic pydata-sphinx-theme==0.12.0 # via documenteer -pygments==2.16.1 +pygments==2.18.0 # via + # -c requirements/main.txt + # ipython # pydata-sphinx-theme # sphinx # sphinx-prompt -pyparsing==3.1.1 - # via matplotlib -pytest==7.4.2 +pylatexenc==2.10 + # via documenteer +pytest==8.3.2 # via # -r requirements/dev.in # pytest-asyncio # pytest-cov -pytest-asyncio==0.21.1 +pytest-asyncio==0.23.8 # via -r requirements/dev.in -pytest-cov==4.1.0 +pytest-cov==5.0.0 # via -r requirements/dev.in -python-dateutil==2.8.2 +python-dateutil==2.9.0.post0 + # via + # -c requirements/main.txt + # jupyter-client +python-dotenv==1.0.1 # via # -c requirements/main.txt - # matplotlib -pyyaml==6.0.1 + # pydantic-settings +pyyaml==6.0.2 # via # -c requirements/main.txt # documenteer + # jupyter-cache + # myst-nb # myst-parser # pre-commit # pybtex # sphinxcontrib-redoc -referencing==0.30.2 +pyzmq==26.1.0 + # via + # ipykernel + # jupyter-client +referencing==0.35.1 # via # jsonschema # jsonschema-specifications -requests==2.31.0 +requests==2.32.3 # via # -c requirements/main.txt + # docker # documenteer # sphinx -respx==0.20.2 + # sphinxcontrib-youtube +respx==0.21.1 # via -r requirements/dev.in -rpds-py==0.10.3 +rpds-py==0.20.0 # via # jsonschema # referencing +setuptools==72.1.0 + # via documenteer six==1.16.0 # via # -c requirements/main.txt - # latexcodec + # asttokens # pybtex # python-dateutil # sphinxcontrib-redoc smmap==5.0.1 # via gitdb -sniffio==1.3.0 +sniffio==1.3.1 # via # -c requirements/main.txt # anyio # asgi-lifespan - # httpcore # httpx snowballstemmer==2.2.0 # via sphinx soupsieve==2.5 # via beautifulsoup4 -sphinx==7.2.6 +sphinx==8.0.2 # via # autodoc-pydantic # documenteer + # myst-nb # myst-parser # pydata-sphinx-theme # sphinx-autodoc-typehints # sphinx-automodapi # sphinx-copybutton # sphinx-design + # sphinx-jinja # sphinx-prompt - # sphinxcontrib-applehelp # sphinxcontrib-bibtex - # sphinxcontrib-devhelp - # sphinxcontrib-htmlhelp # sphinxcontrib-jquery - # sphinxcontrib-qthelp # sphinxcontrib-redoc - # sphinxcontrib-serializinghtml + # sphinxcontrib-youtube # sphinxext-opengraph -sphinx-autodoc-typehints==1.22 + # sphinxext-rediraffe +sphinx-autodoc-typehints==2.2.3 # via documenteer -sphinx-automodapi==0.16.0 +sphinx-automodapi==0.17.0 # via documenteer sphinx-copybutton==0.5.2 # via documenteer -sphinx-design==0.5.0 +sphinx-design==0.6.1 # via documenteer -sphinx-prompt==1.8.0 +sphinx-jinja==2.0.2 # via documenteer -sphinxcontrib-applehelp==1.0.7 +sphinx-prompt==1.9.0 + # via documenteer +sphinxcontrib-applehelp==2.0.0 # via sphinx -sphinxcontrib-bibtex==2.6.1 +sphinxcontrib-bibtex==2.6.2 # via documenteer -sphinxcontrib-devhelp==1.0.5 +sphinxcontrib-devhelp==2.0.0 # via sphinx -sphinxcontrib-htmlhelp==2.0.4 +sphinxcontrib-htmlhelp==2.1.0 # via sphinx sphinxcontrib-jquery==4.1 # via documenteer @@ -268,32 +362,70 @@ sphinxcontrib-jsmath==1.0.1 # via sphinx sphinxcontrib-mermaid==0.9.2 # via documenteer -sphinxcontrib-qthelp==1.0.6 +sphinxcontrib-qthelp==2.0.0 # via sphinx sphinxcontrib-redoc==1.6.0 # via documenteer -sphinxcontrib-serializinghtml==1.1.9 +sphinxcontrib-serializinghtml==2.0.0 # via sphinx -sphinxext-opengraph==0.8.2 +sphinxcontrib-youtube==1.4.1 + # via documenteer +sphinxext-opengraph==0.9.1 + # via documenteer +sphinxext-rediraffe==0.2.7 # via documenteer -types-dateparser==1.1.4.10 +sqlalchemy==2.0.32 + # via jupyter-cache +stack-data==0.6.3 + # via ipython +tabulate==0.9.0 + # via jupyter-cache +testcontainers==4.7.2 # via -r requirements/dev.in -types-pyyaml==6.0.12.11 +tomlkit==0.13.0 + # via documenteer +tornado==6.4.1 + # via + # ipykernel + # jupyter-client +traitlets==5.14.3 + # via + # comm + # ipykernel + # ipython + # jupyter-client + # jupyter-core + # matplotlib-inline + # nbclient + # nbformat +types-dateparser==1.2.0.20240420 # via -r requirements/dev.in -typing-extensions==4.8.0 +types-pyyaml==6.0.12.20240808 + # via -r requirements/dev.in +typing-extensions==4.12.2 # via # -c requirements/main.txt # mypy + # myst-nb # pydantic -uc-micro-py==1.0.2 + # pydantic-core + # sqlalchemy + # testcontainers +uc-micro-py==1.0.3 # via linkify-it-py -urllib3==2.0.5 +urllib3==2.2.2 # via # -c requirements/main.txt + # docker + # documenteer # requests -virtualenv==20.24.5 + # sphinx-prompt + # testcontainers +virtualenv==20.26.3 # via pre-commit - -# The following packages are considered to be unsafe in a requirements file: -setuptools==68.2.2 - # via nodeenv +wcwidth==0.2.13 + # via prompt-toolkit +wrapt==1.16.0 + # via testcontainers +zipp==3.20.0 + # via importlib-metadata diff --git a/requirements/main.in b/requirements/main.in index a61118f..e83808b 100644 --- a/requirements/main.in +++ b/requirements/main.in @@ -13,14 +13,13 @@ starlette uvicorn[standard] # Other dependencies. -aiokafka -pydantic < 2.0.0 -safir>=4.3.0,<5.0.0 +pydantic >= 2.0.0 +pydantic_settings +safir +faststream[kafka]<0.5.0 algoliasearch>=3.0,<4.0 -# kafkit[pydantic,httpx,aiokafka] @ git+https://github.com/lsst-sqre/kafkit.git@tickets/DM-39646 -kafkit[pydantic,httpx,aiokafka]>=1.0.0a1 -dataclasses-avroschema<0.51.0 # before Pydantic 2 migration -click<8.1.4 # see https://github.com/pallets/click/issues/2558 +dataclasses-avroschema +click dateparser lxml cssselect # for lxml diff --git a/requirements/main.txt b/requirements/main.txt index 5ed93db..b83c0fc 100644 --- a/requirements/main.txt +++ b/requirements/main.txt @@ -1,51 +1,48 @@ -# -# This file is autogenerated by pip-compile with Python 3.11 -# by the following command: -# -# pip-compile --allow-unsafe --output-file=requirements/main.txt requirements/main.in -# -aiohttp==3.7.4.post0 +# This file was autogenerated by uv via the following command: +# uv pip compile --build-isolation --output-file requirements/main.txt requirements/main.in +aiohappyeyeballs==2.3.5 + # via aiohttp +aiohttp==3.10.3 # via -r requirements/main.in -aiokafka==0.8.1 - # via - # -r requirements/main.in - # kafkit +aiokafka==0.10.0 + # via faststream +aiosignal==1.3.1 + # via aiohttp algoliasearch==3.0.0 # via -r requirements/main.in -anyio==3.7.1 +annotated-types==0.7.0 + # via pydantic +anyio==4.4.0 # via - # fastapi - # httpcore + # fast-depends + # faststream + # httpx # starlette # watchfiles async-timeout==3.0.1 # via # -r requirements/main.in - # aiohttp # aiokafka -attrs==23.1.0 +attrs==24.2.0 # via aiohttp casefy==0.1.7 # via dataclasses-avroschema -certifi==2023.7.22 +certifi==2024.7.4 # via # httpcore # httpx # requests -cffi==1.15.1 +cffi==1.17.0 # via cryptography -chardet==4.0.0 - # via aiohttp -charset-normalizer==3.2.0 - # via - # aiohttp - # requests -click==8.1.3 +charset-normalizer==3.3.2 + # via requests +click==8.1.7 # via # -r requirements/main.in # safir + # typer # uvicorn -cryptography==41.0.4 +cryptography==43.0.0 # via # pyjwt # safir @@ -53,118 +50,133 @@ cssselect==1.2.0 # via -r requirements/main.in dacite==1.8.1 # via dataclasses-avroschema -dataclasses-avroschema[pydantic]==0.50.2 - # via - # -r requirements/main.in - # kafkit -dateparser==1.1.8 +dataclasses-avroschema==0.62.4 # via -r requirements/main.in -dnspython==2.4.2 - # via email-validator -email-validator==2.0.0.post2 - # via pydantic -fastapi==0.103.1 +dateparser==1.2.0 + # via -r requirements/main.in +fast-depends==2.4.8 + # via faststream +fastapi==0.112.0 # via # -r requirements/main.in # safir -fastavro==1.8.3 +fastavro==1.9.5 + # via dataclasses-avroschema +faststream==0.4.7 + # via -r requirements/main.in +frozenlist==1.4.1 # via - # dataclasses-avroschema - # kafkit + # aiohttp + # aiosignal gidgethub==5.3.0 # via safir h11==0.14.0 # via # httpcore # uvicorn -httpcore==0.18.0 +httpcore==1.0.5 # via httpx -httptools==0.6.0 +httptools==0.6.1 # via uvicorn -httpx==0.25.0 - # via - # kafkit - # safir -idna==3.4 +httpx==0.27.0 + # via safir +idna==3.7 # via # anyio - # email-validator # httpx # requests # yarl -inflector==3.1.0 +inflector==3.1.1 # via dataclasses-avroschema -kafka-python==2.0.2 - # via aiokafka -kafkit[aiokafka,httpx,pydantic]==1.0.0a1 - # via -r requirements/main.in -lxml==4.9.3 +lxml==5.3.0 # via -r requirements/main.in -multidict==6.0.4 +markdown-it-py==3.0.0 + # via rich +mdurl==0.1.2 + # via markdown-it-py +multidict==6.0.5 # via # aiohttp # yarl -packaging==23.1 +packaging==24.1 # via aiokafka -pycparser==2.21 +pycparser==2.22 # via cffi -pydantic[email]==1.10.12 +pydantic==2.8.2 # via # -r requirements/main.in - # dataclasses-avroschema + # fast-depends # fastapi - # kafkit + # pydantic-settings # safir -pyjwt[crypto]==2.8.0 +pydantic-core==2.20.1 + # via + # pydantic + # safir +pydantic-settings==2.4.0 + # via -r requirements/main.in +pygments==2.18.0 + # via rich +pyjwt==2.9.0 # via gidgethub -python-dateutil==2.8.2 - # via dateparser -python-dotenv==1.0.0 - # via uvicorn -pytz==2023.3.post1 +python-dateutil==2.9.0.post0 + # via + # dataclasses-avroschema + # dateparser +python-dotenv==1.0.1 + # via + # pydantic-settings + # uvicorn +pytz==2024.1 # via dateparser -pyyaml==6.0.1 +pyyaml==6.0.2 # via uvicorn -regex==2023.8.8 +regex==2024.7.24 # via dateparser -requests==2.31.0 +requests==2.32.3 # via algoliasearch -safir==4.5.0 +rich==13.7.1 + # via typer +safir==6.2.0 # via -r requirements/main.in +shellingham==1.5.4 + # via typer six==1.16.0 # via python-dateutil -sniffio==1.3.0 +sniffio==1.3.1 # via # anyio - # httpcore # httpx -starlette==0.27.0 +starlette==0.37.2 # via # -r requirements/main.in # fastapi # safir -structlog==23.1.0 +structlog==24.4.0 # via safir -typing-extensions==4.8.0 +typer==0.12.3 + # via faststream +typing-extensions==4.12.2 # via - # aiohttp + # dataclasses-avroschema # fastapi + # faststream # pydantic -tzlocal==5.0.1 + # pydantic-core + # typer +tzlocal==5.2 # via dateparser uritemplate==4.1.1 - # via - # gidgethub - # kafkit -urllib3==2.0.5 + # via gidgethub +urllib3==2.2.2 # via requests -uvicorn[standard]==0.23.2 +uvicorn==0.30.5 # via -r requirements/main.in -uvloop==0.17.0 +uvloop==0.19.0 # via uvicorn -watchfiles==0.20.0 +watchfiles==0.23.0 # via uvicorn -websockets==11.0.3 +websockets==12.0 # via uvicorn -yarl==1.9.2 +yarl==1.9.4 # via aiohttp diff --git a/ruff-shared.toml b/ruff-shared.toml new file mode 100644 index 0000000..823693a --- /dev/null +++ b/ruff-shared.toml @@ -0,0 +1,125 @@ +# Generic shared Ruff configuration file. It should be possible to use this +# file unmodified in different packages provided that one likes the style that +# it enforces. +# +# This file should be used from pyproject.toml as follows: +# +# [tool.ruff] +# extend = "ruff-shared.toml" +# +# It can then be extended with project-specific rules. A common additional +# setting in pyproject.toml is tool.ruff.lint.extend-per-file-ignores, to add +# additional project-specific ignore rules for specific paths. +# +# The rule used with Ruff configuration is to disable every non-deprecated +# lint rule that has legitimate exceptions that are not dodgy code, rather +# than cluttering code with noqa markers. This is therefore a reiatively +# relaxed configuration that errs on the side of disabling legitimate rules. +# +# Reference for settings: https://docs.astral.sh/ruff/settings/ +# Reference for rules: https://docs.astral.sh/ruff/rules/ +exclude = ["docs/**"] +line-length = 79 +target-version = "py312" + +[format] +docstring-code-format = true + +[lint] +ignore = [ + "ANN401", # sometimes Any is the right type + "ARG001", # unused function arguments are often legitimate + "ARG002", # unused method arguments are often legitimate + "ARG005", # unused lambda arguments are often legitimate + "BLE001", # we want to catch and report Exception in background tasks + "C414", # nested sorted is how you sort by multiple keys with reverse + "D102", # sometimes we use docstring inheritence + "D104", # don't see the point of documenting every package + "D105", # our style doesn't require docstrings for magic methods + "D106", # Pydantic uses a nested Config class that doesn't warrant docs + "D205", # our documentation style allows a folded first line + "EM101", # justification (duplicate string in traceback) is silly + "EM102", # justification (duplicate string in traceback) is silly + "FBT003", # positional booleans are normal for Pydantic field defaults + "FIX002", # point of a TODO comment is that we're not ready to fix it + "G004", # forbidding logging f-strings is appealing, but not our style + "RET505", # disagree that omitting else always makes code more readable + "PLR0911", # often many returns is clearer and simpler style + "PLR0913", # factory pattern uses constructors with many arguments + "PLR2004", # too aggressive about magic values + "PLW0603", # yes global is discouraged but if needed, it's needed + "S105", # good idea but too many false positives on non-passwords + "S106", # good idea but too many false positives on non-passwords + "S107", # good idea but too many false positives on non-passwords + "S603", # not going to manually mark every subprocess call as reviewed + "S607", # using PATH is not a security vulnerability + "SIM102", # sometimes the formatting of nested if statements is clearer + "SIM117", # sometimes nested with contexts are clearer + "TCH001", # we decided to not maintain separate TYPE_CHECKING blocks + "TCH002", # we decided to not maintain separate TYPE_CHECKING blocks + "TCH003", # we decided to not maintain separate TYPE_CHECKING blocks + "TD003", # we don't require issues be created for TODOs + "TID252", # if we're going to use relative imports, use them always + "TRY003", # good general advice but lint is way too aggressive + "TRY301", # sometimes raising exceptions inside try is the best flow + "UP040", # PEP 695 type aliases not yet supported by mypy + + # The following settings should be disabled when using ruff format + # per https://docs.astral.sh/ruff/formatter/#conflicting-lint-rules + "W191", + "E111", + "E114", + "E117", + "D206", + "D300", + "Q000", + "Q001", + "Q002", + "Q003", + "COM812", + "COM819", + "ISC001", + "ISC002", + + # Temporary bug workarounds. + "S113", # https://github.com/astral-sh/ruff/issues/12210 +] +select = ["ALL"] + +[lint.per-file-ignores] +"src/*/handlers/**" = [ + "D103", # FastAPI handlers should not have docstrings +] +"tests/**" = [ + "C901", # tests are allowed to be complex, sometimes that's convenient + "D101", # tests don't need docstrings + "D103", # tests don't need docstrings + "PLR0915", # tests are allowed to be long, sometimes that's convenient + "PT012", # way too aggressive about limiting pytest.raises blocks + "S101", # tests should use assert + "S106", # tests are allowed to hard-code dummy passwords + "S301", # allow tests for whether code can be pickled + "SLF001", # tests are allowed to access private members +] + +# These are too useful as attributes or methods to allow the conflict with the +# built-in to rule out their use. +[lint.flake8-builtins] +builtins-ignorelist = [ + "all", + "any", + "help", + "id", + "list", + "type", +] + +[lint.flake8-pytest-style] +fixture-parentheses = false +mark-parentheses = false + +[lint.mccabe] +max-complexity = 11 + +[lint.pydocstyle] +convention = "numpy" diff --git a/src/ook/config.py b/src/ook/config.py index 376aff9..d087cef 100644 --- a/src/ook/config.py +++ b/src/ook/config.py @@ -2,11 +2,207 @@ from __future__ import annotations -from kafkit.settings import KafkaConnectionSettings -from pydantic import AnyHttpUrl, BaseSettings, Field, SecretStr, validator +import ssl +from enum import Enum +from pathlib import Path + +from pydantic import ( + AnyHttpUrl, + DirectoryPath, + Field, + FilePath, + SecretStr, + field_validator, +) +from pydantic_settings import BaseSettings, SettingsConfigDict from safir.logging import LogLevel, Profile -__all__ = ["Configuration", "config"] +__all__ = [ + "Configuration", + "config", + "KafkaConnectionSettings", + "KafkaSecurityProtocol", + "KafkaSaslMechanism", + "KafkaConnectionSettings", +] + + +class KafkaSecurityProtocol(str, Enum): + """Kafka security protocols understood by aiokafka.""" + + PLAINTEXT = "PLAINTEXT" + """Plain-text connection.""" + + SSL = "SSL" + """TLS-encrypted connection.""" + + +class KafkaSaslMechanism(str, Enum): + """Kafka SASL mechanisms understood by aiokafka.""" + + PLAIN = "PLAIN" + """Plain-text SASL mechanism.""" + + SCRAM_SHA_256 = "SCRAM-SHA-256" + """SCRAM-SHA-256 SASL mechanism.""" + + SCRAM_SHA_512 = "SCRAM-SHA-512" + """SCRAM-SHA-512 SASL mechanism.""" + + +class KafkaConnectionSettings(BaseSettings): + """Settings for connecting to Kafka.""" + + bootstrap_servers: str = Field( + ..., + title="Kafka bootstrap servers", + description=( + "A comma-separated list of Kafka brokers to connect to. " + "This should be a list of hostnames or IP addresses, " + "each optionally followed by a port number, separated by " + "commas. " + "For example: ``kafka-1:9092,kafka-2:9092,kafka-3:9092``." + ), + ) + + security_protocol: KafkaSecurityProtocol = Field( + KafkaSecurityProtocol.PLAINTEXT, + description="The security protocol to use when connecting to Kafka.", + ) + + cert_temp_dir: DirectoryPath | None = Field( + None, + description=( + "Temporary writable directory for concatenating certificates." + ), + ) + + cluster_ca_path: FilePath | None = Field( + None, + title="Path to CA certificate file", + description=( + "The path to the CA certificate file to use for verifying the " + "broker's certificate. " + "This is only needed if the broker's certificate is not signed " + "by a CA trusted by the operating system." + ), + ) + + client_ca_path: FilePath | None = Field( + None, + title="Path to client CA certificate file", + description=( + "The path to the client CA certificate file to use for " + "authentication. " + "This is only needed when the client certificate needs to be" + "concatenated with the client CA certificate, which is common" + "for Strimzi installations." + ), + ) + + client_cert_path: FilePath | None = Field( + None, + title="Path to client certificate file", + description=( + "The path to the client certificate file to use for " + "authentication. " + "This is only needed if the broker is configured to require " + "SSL client authentication." + ), + ) + + client_key_path: FilePath | None = Field( + None, + title="Path to client key file", + description=( + "The path to the client key file to use for authentication. " + "This is only needed if the broker is configured to require " + "SSL client authentication." + ), + ) + + client_key_password: SecretStr | None = Field( + None, + title="Password for client key file", + description=( + "The password to use for decrypting the client key file. " + "This is only needed if the client key file is encrypted." + ), + ) + + sasl_mechanism: KafkaSaslMechanism | None = Field( + KafkaSaslMechanism.PLAIN, + title="SASL mechanism", + description=( + "The SASL mechanism to use for authentication. " + "This is only needed if SASL authentication is enabled." + ), + ) + + sasl_username: str | None = Field( + None, + title="SASL username", + description=( + "The username to use for SASL authentication. " + "This is only needed if SASL authentication is enabled." + ), + ) + + sasl_password: SecretStr | None = Field( + None, + title="SASL password", + description=( + "The password to use for SASL authentication. " + "This is only needed if SASL authentication is enabled." + ), + ) + + model_config = SettingsConfigDict( + env_prefix="KAFKA_", case_sensitive=False + ) + + @property + def ssl_context(self) -> ssl.SSLContext | None: + """An SSL context for connecting to Kafka with aiokafka, if the + Kafka connection is configured to use SSL. + """ + if ( + self.security_protocol != KafkaSecurityProtocol.SSL + or self.cluster_ca_path is None + or self.client_cert_path is None + or self.client_key_path is None + ): + return None + + client_cert_path = Path(self.client_cert_path) + + if self.client_ca_path is not None: + # Need to contatenate the client cert and CA certificates. This is + # typical for Strimzi-based Kafka clusters. + if self.cert_temp_dir is None: + raise RuntimeError( + "KAFKIT_KAFKA_CERT_TEMP_DIR must be set when " + "a client CA certificate is provided." + ) + client_ca = Path(self.client_ca_path).read_text() + client_cert = Path(self.client_cert_path).read_text() + sep = "" if client_ca.endswith("\n") else "\n" + new_client_cert = sep.join([client_cert, client_ca]) + new_client_cert_path = Path(self.cert_temp_dir) / "client.crt" + new_client_cert_path.write_text(new_client_cert) + client_cert_path = Path(new_client_cert_path) + + # Create an SSL context on the basis that we're the client + # authenticating the server (the Kafka broker). + ssl_context = ssl.create_default_context( + purpose=ssl.Purpose.SERVER_AUTH, cafile=str(self.cluster_ca_path) + ) + # Add the certificates that the Kafka broker uses to authenticate us. + ssl_context.load_cert_chain( + certfile=str(client_cert_path), keyfile=str(self.client_key_path) + ) + + return ssl_context class Configuration(BaseSettings): @@ -14,13 +210,13 @@ class Configuration(BaseSettings): name: str = Field( "ook", - env="SAFIR_NAME", + validation_alias="SAFIR_NAME", description="The application's name", ) profile: Profile = Field( Profile.production, - env="SAFIR_PROFILE", + validation_alias="SAFIR_PROFILE", description=( "Application logging profile: 'development' or 'production'." ), @@ -29,13 +225,13 @@ class Configuration(BaseSettings): log_level: LogLevel = Field( LogLevel.INFO, title="Log level of the application's logger", - env="SAFIR_LOG_LEVEL", + validation_alias="SAFIR_LOG_LEVEL", ) path_prefix: str = Field( "/ook", title="API URL path prefix", - env="SAFIR_PATH_PREFIX", + validation_alias="SAFIR_PATH_PREFIX", description=( "The URL prefix where the application's externally-accessible " "endpoints are hosted." @@ -45,7 +241,7 @@ class Configuration(BaseSettings): environment_url: AnyHttpUrl = Field( ..., title="Base URL of the environment", - env="SAFIR_ENVIRONMENT_URL", + validation_alias="SAFIR_ENVIRONMENT_URL", description=( "The base URL of the environment where the application is hosted." ), @@ -56,77 +252,54 @@ class Configuration(BaseSettings): description="Kafka connection configuration.", ) - registry_url: AnyHttpUrl = Field( - env="OOK_REGISTRY_URL", title="Schema Registry URL" - ) - - subject_suffix: str = Field( - "", - title="Schema subject name suffix", - env="OOK_SUBJECT_SUFFIX", - description=( - "Suffix to add to Schema Registry suffix names. This is useful " - "when deploying for testing/staging and you do not " - "want to affect the production subject and its " - "compatibility lineage." - ), - ) - - # TODO convert to enum? - subject_compatibility: str = Field( - "FORWARD_TRANSITIVE", - title="Schema subject compatibility", - env="OOK_SUBJECT_COMPATIBILITY", - description=( - "Compatibility level to apply to Schema Registry subjects. Use " - "NONE for testing and development, but prefer FORWARD_TRANSITIVE " - "for production." - ), - ) - enable_kafka_consumer: bool = Field( True, - env="OOK_ENABLE_CONSUMER", + validation_alias="OOK_ENABLE_CONSUMER", description="Enable Kafka consumer.", ) ingest_kafka_topic: str = Field( "ook.ingest", - env="OOK_INGEST_KAFKA_TOPIC", + validation_alias="OOK_INGEST_KAFKA_TOPIC", description="The name of the Kafka topic for the ingest queue.", ) kafka_consumer_group_id: str = Field( - "ook", env="OOK_GROUP_ID", description="Kafka consumer group ID." + "ook", + validation_alias="OOK_GROUP_ID", + description="Kafka consumer group ID.", ) algolia_app_id: str = Field( - env="ALGOLIA_APP_ID", description="The Algolia app ID" + validation_alias="ALGOLIA_APP_ID", description="The Algolia app ID" ) algolia_api_key: SecretStr = Field( - env="ALGOLIA_API_KEY", description="The Algolia API key" + validation_alias="ALGOLIA_API_KEY", description="The Algolia API key" ) algolia_document_index_name: str = Field( "document_dev", - env="ALGOLIA_DOCUMENT_INDEX", + validation_alias="ALGOLIA_DOCUMENT_INDEX", description="Name of the Algolia document index", ) - github_app_id: str | None = Field(None, env="OOK_GITHUB_APP_ID") + github_app_id: int | None = Field( + None, validation_alias="OOK_GITHUB_APP_ID" + ) """The GitHub App ID, as determined by GitHub when setting up a GitHub App. """ github_app_private_key: SecretStr | None = Field( - None, env="OOK_GITHUB_APP_PRIVATE_KEY" + None, validation_alias="OOK_GITHUB_APP_PRIVATE_KEY" ) """The GitHub app private key. See https://docs.github.com/en/apps/creating-github-apps/authenticating-with-a-github-app/managing-private-keys-for-github-apps """ - @validator("github_app_private_key", pre=True) + @field_validator("github_app_private_key", mode="before") + @classmethod def validate_none_secret(cls, v: SecretStr | None) -> SecretStr | None: """Validate a SecretStr setting which may be "None" that is intended to be `None`. diff --git a/src/ook/dependencies/consumercontext.py b/src/ook/dependencies/consumercontext.py new file mode 100644 index 0000000..bb13e50 --- /dev/null +++ b/src/ook/dependencies/consumercontext.py @@ -0,0 +1,105 @@ +"""A dependency for providing context to consumers.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +from aiokafka import ConsumerRecord +from faststream import context +from faststream.kafka.fastapi import KafkaMessage +from structlog import get_logger +from structlog.stdlib import BoundLogger + +from ..factory import Factory, ProcessContext + + +@dataclass(slots=True, kw_only=True) +class ConsumerContext: + """Context for consumers.""" + + logger: BoundLogger + """Logger for the consumer.""" + + factory: Factory + """Factory for creating services.""" + + record: ConsumerRecord | None = None + """The Kafka record being processed.""" + + def rebind_logger(self, **values: Any) -> None: + """Add the given values to the logging context. + + Parameters + ---------- + **values + Additional values that should be added to the logging context. + """ + self.logger = self.logger.bind(**values) + self.factory.set_logger(self.logger) + + +class ConsumerContextDependency: + """Provide a per-message context as a dependency for a FastStream consumer. + + Each message handler class gets a `ConsumerContext`. To save overhead, the + portions of the context that are shared by all requests are collected into + the single process-global `~unfurlbot.factory.ProcessContext` and reused + with each request. + """ + + def __init__(self) -> None: + self._process_context: ProcessContext | None = None + + async def __call__(self) -> ConsumerContext: + """Create a per-request context.""" + # Get the message from the FastStream context + message: KafkaMessage = context.get_local("message") + record = message.raw_message + + # Add the Kafka context to the logger + logger = get_logger(__name__) # eventually use a logger dependency + kafka_context = { + "topic": record.topic, + "offset": record.offset, + "partition": record.partition, + } + logger = logger.bind(kafka=kafka_context) + + return ConsumerContext( + logger=logger, + factory=Factory( + logger=logger, + process_context=self.process_context, + ), + ) + + @property + def process_context(self) -> ProcessContext: + """The underlying process context, primarily for use in tests.""" + if not self._process_context: + raise RuntimeError("ConsumerContextDependency not initialized") + return self._process_context + + async def initialize(self) -> None: + """Initialize the process-wide shared context.""" + if self._process_context: + await self._process_context.aclose() + self._process_context = await ProcessContext.create() + + def create_factory(self, logger: BoundLogger) -> Factory: + """Create a factory for use outside a request context.""" + return Factory( + logger=logger, + process_context=self.process_context, + ) + + async def aclose(self) -> None: + """Clean up the per-process configuration.""" + if self._process_context: + await self._process_context.aclose() + self._process_context = None + + +consumer_context_dependency = ConsumerContextDependency() +"""The dependency that will return the per-request context.""" diff --git a/src/ook/dependencies/context.py b/src/ook/dependencies/context.py index 7429793..60852cf 100644 --- a/src/ook/dependencies/context.py +++ b/src/ook/dependencies/context.py @@ -7,7 +7,7 @@ """ from dataclasses import dataclass -from typing import Any +from typing import Annotated, Any from fastapi import Depends, Request from safir.dependencies.logger import logger_dependency @@ -68,7 +68,7 @@ def __init__(self) -> None: async def __call__( self, request: Request, - logger: BoundLogger = Depends(logger_dependency), + logger: Annotated[BoundLogger, Depends(logger_dependency)], ) -> RequestContext: """Create a per-request context and return it.""" return RequestContext( diff --git a/src/ook/domain/algoliarecord.py b/src/ook/domain/algoliarecord.py index 3b25367..fd1fabe 100644 --- a/src/ook/domain/algoliarecord.py +++ b/src/ook/domain/algoliarecord.py @@ -9,7 +9,7 @@ from enum import Enum from typing import Any, Self -from pydantic import BaseModel, Field, HttpUrl +from pydantic import BaseModel, ConfigDict, Field, HttpUrl __all__ = ["DocumentRecord", "MinimalDocumentModel", "DocumentSourceType"] @@ -87,7 +87,7 @@ class DocumentRecord(BaseModel): "The URL of the record. For subsection, this URL can end with an " "anchor target." ), - example="https://sqr-027.lsst.io/#What-is-observability?", + examples=["https://sqr-027.lsst.io/#What-is-observability?"], ) base_url: HttpUrl = Field( @@ -170,13 +170,7 @@ class DocumentRecord(BaseModel): description="URL of the source repository.", alias="githubRepoURL", ) - - class Config: - allow_population_by_field_name = True - """Enables use of Python name for constructing the record.""" - - extra = "forbid" - """Disable attributes that aren't part of the schema.""" + model_config = ConfigDict(populate_by_name=True, extra="forbid") @staticmethod def generate_object_id( @@ -279,7 +273,7 @@ def make_algolia_record(self) -> DocumentRecord: return DocumentRecord( object_id=object_id, base_url=self.url, - url=self.url, + url=str(self.url), surrogate_key=surrogate_key, source_update_time=format_utc_datetime(now), source_update_timestamp=format_timestamp(now), diff --git a/src/ook/domain/kafka.py b/src/ook/domain/kafka.py index f70efce..bd143ed 100644 --- a/src/ook/domain/kafka.py +++ b/src/ook/domain/kafka.py @@ -4,8 +4,7 @@ from datetime import datetime -from dataclasses_avroschema.avrodantic import AvroBaseModel -from pydantic import Field +from pydantic import BaseModel, Field from ook.domain.algoliarecord import DocumentSourceType @@ -17,7 +16,7 @@ ] -class UrlIngestKeyV1(AvroBaseModel): +class UrlIngestKeyV1(BaseModel): """Kafka message key model for Slack messages sent by Squarebot.""" url: str = Field(..., description="The root URL to ingest.") @@ -29,7 +28,7 @@ class Meta: schema_name = "url_ingest_key_v1" -class LtdEditionV1(AvroBaseModel): +class LtdEditionV1(BaseModel): """Information an LTD edition (a sub-model).""" url: str = Field(..., description="The API URL of the edition resource.") @@ -45,7 +44,7 @@ class LtdEditionV1(AvroBaseModel): ) -class LtdProjectV1(AvroBaseModel): +class LtdProjectV1(BaseModel): """Information about an LTD project (a sub-model).""" url: str = Field(..., description="The API URL of the project resource.") @@ -61,7 +60,7 @@ class LtdProjectV1(AvroBaseModel): slug: str = Field(..., description="The slug of the project.") -class LtdUrlIngestV2(AvroBaseModel): +class LtdUrlIngestV2(BaseModel): """Kafka message value model for a request to ingest a URL hosted on LSST the Docs. diff --git a/src/ook/factory.py b/src/ook/factory.py index c5ca216..38e78c6 100644 --- a/src/ook/factory.py +++ b/src/ook/factory.py @@ -7,27 +7,20 @@ from dataclasses import dataclass from typing import Self -from aiokafka import AIOKafkaProducer from algoliasearch.search_client import SearchClient +from faststream.kafka import KafkaBroker +from faststream.kafka.asyncapi import Publisher from httpx import AsyncClient -from kafkit.fastapi.dependencies.aiokafkaproducer import ( - kafka_producer_dependency, -) -from kafkit.fastapi.dependencies.pydanticschemamanager import ( - pydantic_schema_manager_dependency, -) -from kafkit.registry.manager import PydanticSchemaManager from safir.github import GitHubAppClientFactory from structlog.stdlib import BoundLogger from .config import config from .dependencies.algoliasearch import algolia_client_dependency -from .domain.kafka import LtdUrlIngestV2, UrlIngestKeyV1 +from .kafkarouter import kafka_router from .services.algoliaaudit import AlgoliaAuditService from .services.algoliadocindex import AlgoliaDocIndexService from .services.classification import ClassificationService from .services.githubmetadata import GitHubMetadataService -from .services.kafkaproducer import PydanticKafkaProducer from .services.landerjsonldingest import LtdLanderJsonLdIngestService from .services.ltdmetadataservice import LtdMetadataService from .services.sphinxtechnoteingest import SphinxTechnoteIngestService @@ -43,17 +36,18 @@ class ProcessContext: http_client: AsyncClient """Shared HTTP client.""" - kafka_producer: AIOKafkaProducer - """The aiokafka producer.""" + kafka_broker: KafkaBroker + """The aiokafka broker provided through the FastStream Kafka router.""" - schema_manager: PydanticSchemaManager - """Pydantic schema manager.""" + kafka_ingest_publisher: Publisher algolia_client: SearchClient """Algolia client.""" @classmethod - async def create(cls) -> ProcessContext: + async def create( + cls, kafka_broker: KafkaBroker | None = None + ) -> ProcessContext: """Create a ProcessContext.""" # Not using Safir's http_client_dependency because I found that in # standalone Factory setting the http_client wasn't opened, for some @@ -61,29 +55,17 @@ async def create(cls) -> ProcessContext: # ProcessContext. http_client = AsyncClient() - # Initialize the Pydantic Schema Manager and register models - await pydantic_schema_manager_dependency.initialize( - http_client=http_client, - registry_url=config.registry_url, - models=[ - UrlIngestKeyV1, - LtdUrlIngestV2, - ], - suffix=config.subject_suffix, - compatibility=config.subject_compatibility, - ) + # Use the provided broker (typically for CLI contexts) + broker = kafka_broker if kafka_broker else kafka_router.broker - # Initialize the Kafka producer - await kafka_producer_dependency.initialize(config.kafka) - - kafka_producer = await kafka_producer_dependency() - schema_manager = await pydantic_schema_manager_dependency() algolia_client = await algolia_client_dependency() return cls( http_client=http_client, - kafka_producer=kafka_producer, - schema_manager=schema_manager, + kafka_broker=broker, + kafka_ingest_publisher=broker.publisher( + config.ingest_kafka_topic, title="Ook ingest requests" + ), algolia_client=algolia_client, ) @@ -93,7 +75,6 @@ async def aclose(self) -> None: Called during shutdown, or before recreating the process context using a different configuration. """ - await self.kafka_producer.stop() await self.algolia_client.close_async() await self.http_client.aclose() @@ -111,9 +92,11 @@ def __init__( self._logger = logger @classmethod - async def create(cls, *, logger: BoundLogger) -> Self: + async def create( + cls, *, logger: BoundLogger, kafka_broker: KafkaBroker | None = None + ) -> Self: """Create a Factory (for use outside a request context).""" - context = await ProcessContext.create() + context = await ProcessContext.create(kafka_broker=kafka_broker) return cls( logger=logger, process_context=context, @@ -122,15 +105,18 @@ async def create(cls, *, logger: BoundLogger) -> Self: @classmethod @asynccontextmanager async def create_standalone( - cls, *, logger: BoundLogger + cls, *, logger: BoundLogger, kafka_broker: KafkaBroker | None = None ) -> AsyncIterator[Self]: """Create a standalone factory, outside the FastAPI process, as a context manager. Use this for creating a factory in CLI commands. """ - factory = await cls.create(logger=logger) + factory = await cls.create(logger=logger, kafka_broker=kafka_broker) async with aclosing(factory): + # Manually connect the broker after the publishers are created + # so that the producer can be added to each publisher. + await factory._process_context.kafka_broker.connect() # noqa: SLF001 yield factory async def aclose(self) -> None: @@ -141,19 +127,6 @@ def set_logger(self, logger: BoundLogger) -> None: """Set the logger for the factory.""" self._logger = logger - @property - def kafka_producer(self) -> PydanticKafkaProducer: - """The PydanticKafkaProducer.""" - return PydanticKafkaProducer( - producer=self._process_context.kafka_producer, - schema_manager=self._process_context.schema_manager, - ) - - @property - def schema_manager(self) -> PydanticSchemaManager: - """The PydanticSchemaManager.""" - return self._process_context.schema_manager - @property def http_client(self) -> AsyncClient: """The shared HTTP client.""" @@ -200,11 +173,12 @@ def create_ltd_metadata_service(self) -> LtdMetadataService: def create_classification_service(self) -> ClassificationService: """Create a ClassificationService.""" + publisher = self._process_context.kafka_ingest_publisher return ClassificationService( http_client=self.http_client, github_service=self.create_github_metadata_service(), ltd_service=self.create_ltd_metadata_service(), - kafka_producer=self.kafka_producer, + kafka_ingest_publisher=publisher, logger=self._logger, ) diff --git a/src/ook/handlers/external/models.py b/src/ook/handlers/external/models.py index 0580435..3ee1a94 100644 --- a/src/ook/handlers/external/models.py +++ b/src/ook/handlers/external/models.py @@ -3,9 +3,9 @@ from __future__ import annotations import re -from typing import Any +from typing import Self -from pydantic import AnyHttpUrl, BaseModel, Field, root_validator +from pydantic import AnyHttpUrl, BaseModel, Field, model_validator from safir.metadata import Metadata as SafirMetadata __all__ = [ @@ -19,7 +19,7 @@ class IndexResponse(BaseModel): metadata: SafirMetadata = Field(..., title="Package metadata") - api_docs: AnyHttpUrl = Field(..., tile="API documentation URL") + api_docs: AnyHttpUrl = Field(..., title="API documentation URL") class LtdIngestRequest(BaseModel): @@ -29,33 +29,29 @@ class LtdIngestRequest(BaseModel): product_slugs: list[str] | None = Field(None) - product_slug_pattern: str | None + product_slug_pattern: str | None = None edition_slug: str = "main" - @root_validator - def check_slug(cls, values: dict[str, Any]) -> dict[str, Any]: - product_slug = values.get("product_slug") - product_slugs = values.get("product_slugs") - product_slug_pattern = values.get("product_slug_pattern") - + @model_validator(mode="after") + def check_slug(self) -> Self: if ( - product_slug is None - and product_slugs is None - and product_slug_pattern is None + self.product_slug is None + and self.product_slugs is None + and self.product_slug_pattern is None ): raise ValueError( "One of the ``product_slug``, ``product_slugs`` or " "``product_slug_pattern`` fields is required." ) - if product_slug_pattern is not None: + if self.product_slug_pattern is not None: try: - re.compile(product_slug_pattern) + re.compile(self.product_slug_pattern) except Exception as exc: raise ValueError( "product_slug_pattern {self.product_slug_pattern!r} is " "not a valid Python regular expression." ) from exc - return values + return self diff --git a/src/ook/handlers/external/paths.py b/src/ook/handlers/external/paths.py index 9a23312..2cdd9a5 100644 --- a/src/ook/handlers/external/paths.py +++ b/src/ook/handlers/external/paths.py @@ -1,6 +1,7 @@ """Handlers for the app's external root endpoints, ``/ook/``.""" import asyncio +from typing import Annotated from fastapi import APIRouter, Depends, Request, Response from pydantic import AnyHttpUrl @@ -36,7 +37,7 @@ async def get_index( doc_url = request.url.replace(path=f"/{config.path_prefix}/redoc") return IndexResponse( metadata=metadata, - api_docs=AnyHttpUrl(str(doc_url), scheme=request.url.scheme), + api_docs=AnyHttpUrl(str(doc_url)), ) @@ -47,7 +48,7 @@ async def get_index( ) async def post_ingest_ltd( ingest_request: LtdIngestRequest, - context: RequestContext = Depends(context_dependency), + context: Annotated[RequestContext, Depends(context_dependency)], ) -> Response: """Trigger an ingest of a project in LSST the Docs.""" logger = context.logger diff --git a/src/ook/handlers/kafka.py b/src/ook/handlers/kafka.py new file mode 100644 index 0000000..7e5a732 --- /dev/null +++ b/src/ook/handlers/kafka.py @@ -0,0 +1,65 @@ +"""Consumer for Kafka topics.""" + +from __future__ import annotations + +from typing import Annotated + +from fastapi import Depends + +from ook.config import config +from ook.dependencies.consumercontext import ( + ConsumerContext, + consumer_context_dependency, +) +from ook.domain.algoliarecord import DocumentSourceType +from ook.domain.kafka import LtdUrlIngestV2 +from ook.factory import Factory +from ook.kafkarouter import kafka_router + +__all__ = ["handle_ltd_document_ingest"] + + +@kafka_router.subscriber( + config.ingest_kafka_topic, group_id=config.kafka_consumer_group_id +) +async def handle_ltd_document_ingest( + message: LtdUrlIngestV2, + context: Annotated[ConsumerContext, Depends(consumer_context_dependency)], +) -> None: + """Handle a message requesting an ingest for an LTD document.""" + logger = context.logger + logger = logger.bind( + ltd_slug=message.project.slug, content_type=message.content_type.value + ) + + logger.info( + "Starting processing of LTD document ingest request.", + ) + + factory = await Factory.create(logger=logger) + + content_type = message.content_type + + if content_type == DocumentSourceType.LTD_TECHNOTE: + technote_service = factory.create_technote_ingest_service() + await technote_service.ingest( + published_url=message.url, + project_url=message.project.url, + edition_url=message.edition.url, + ) + elif content_type == DocumentSourceType.LTD_SPHINX_TECHNOTE: + sphinx_technote_service = ( + factory.create_sphinx_technote_ingest_service() + ) + await sphinx_technote_service.ingest( + published_url=message.url, + project_url=message.project.url, + edition_url=message.edition.url, + ) + elif content_type == DocumentSourceType.LTD_LANDER_JSONLD: + lander_service = factory.create_lander_ingest_service() + await lander_service.ingest( + published_url=message.url, + ) + + logger.info("Finished processing LTD document ingest request.") diff --git a/src/ook/handlers/kafka/__init__.py b/src/ook/handlers/kafka/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/ook/handlers/kafka/handlers.py b/src/ook/handlers/kafka/handlers.py deleted file mode 100644 index 730ff58..0000000 --- a/src/ook/handlers/kafka/handlers.py +++ /dev/null @@ -1,91 +0,0 @@ -"""Handler functions for Kafka messages. - -These functions are registered with the PydanticAIOKafkaConsumer in the -router module. -""" - -from __future__ import annotations - -from typing import TYPE_CHECKING, Any - -from dataclasses_avroschema.avrodantic import AvroBaseModel -from structlog import get_logger -from structlog.stdlib import BoundLogger - -from ook.domain.algoliarecord import DocumentSourceType -from ook.domain.kafka import LtdUrlIngestV2, UrlIngestKeyV1 -from ook.factory import Factory - -if TYPE_CHECKING: - from .router import MessageMetadata - -__all__ = ["handle_ltd_document_ingest"] - - -def bind_logger_with_message_metadata( - logger: BoundLogger, - *, - message_metadata: MessageMetadata, - key: AvroBaseModel, - value: AvroBaseModel, -) -> BoundLogger: - """Bind a logger with message metadata.""" - return logger.bind( - kafka_topic=message_metadata.topic, - kafka_partition=message_metadata.partition, - kafka_offset=message_metadata.offset, - kafka_key=key.dict(), - kafka_value=value.dict(), - serialized_key_size=message_metadata.serialized_key_size, - serialized_value_size=message_metadata.serialized_value_size, - kafka_headers=message_metadata.headers, - ) - - -async def handle_ltd_document_ingest( - *, - message_metadata: MessageMetadata, - key: UrlIngestKeyV1, - value: LtdUrlIngestV2, - **kwargs: Any, -) -> None: - """Handle a message requesting an ingest for an LTD document.""" - logger = bind_logger_with_message_metadata( - get_logger("ook"), - message_metadata=message_metadata, - key=key, - value=value, - ) - logger = logger.bind( - ltd_slug=value.project.slug, content_type=value.content_type.value - ) - - logger.info( - "Starting processing of LTD document ingest request.", - ) - - factory = await Factory.create(logger=logger) - - if value.content_type == DocumentSourceType.LTD_TECHNOTE: - technote_service = factory.create_technote_ingest_service() - await technote_service.ingest( - published_url=value.url, - project_url=value.project.url, - edition_url=value.edition.url, - ) - elif value.content_type == DocumentSourceType.LTD_SPHINX_TECHNOTE: - sphinx_technote_service = ( - factory.create_sphinx_technote_ingest_service() - ) - await sphinx_technote_service.ingest( - published_url=value.url, - project_url=value.project.url, - edition_url=value.edition.url, - ) - elif value.content_type == DocumentSourceType.LTD_LANDER_JSONLD: - lander_service = factory.create_lander_ingest_service() - await lander_service.ingest( - published_url=value.url, - ) - - logger.info("Finished processing LTD document ingest request.") diff --git a/src/ook/handlers/kafka/router.py b/src/ook/handlers/kafka/router.py deleted file mode 100644 index a9bd84e..0000000 --- a/src/ook/handlers/kafka/router.py +++ /dev/null @@ -1,266 +0,0 @@ -"""Routes Kafka messages to processing handlers.""" - -from __future__ import annotations - -from collections.abc import Iterable, Sequence -from dataclasses import dataclass -from typing import Any, Protocol, Self, cast - -from aiokafka import AIOKafkaConsumer, ConsumerRecord -from dataclasses_avroschema.avrodantic import AvroBaseModel -from kafkit.registry import UnmanagedSchemaError -from kafkit.registry.manager import PydanticSchemaManager -from structlog import get_logger -from structlog.stdlib import BoundLogger - -from ook.config import config -from ook.dependencies.context import context_dependency -from ook.domain.kafka import LtdUrlIngestV2, UrlIngestKeyV1 -from ook.handlers.kafka.handlers import handle_ltd_document_ingest - - -class HandlerProtocol(Protocol): - """A protocol for a Kafka message handler.""" - - async def __call__( - self, - *, - message_metadata: MessageMetadata, - key: AvroBaseModel, - value: AvroBaseModel, - **kwargs: Any, - ) -> None: - """Handle a Kafka message.""" - - -@dataclass -class Route: - """A pointer to a route for a Kafka message. - - A route is associated with a specific set of topics and Pydantic models - for the key and value. - """ - - callback: HandlerProtocol - """The callback to invoke when a message is routed to this route. - - The callback is a async function that takes three keyword arguments, plus - any additional keyword arguments specified in `kwargs`: - - 1. The Kafka message metadata (`MessageMetadata`). - 2. The deserialized key (`AvroBaseModel`). - 3. The deserialized value (`AvroBaseModel`). - """ - - topics: Sequence[str] - """The Kafka topics that this route is associated with.""" - - key_models: Sequence[type[AvroBaseModel]] - """The Pydantic model types that this route is associated with for the - message key. - """ - - value_models: Sequence[type[AvroBaseModel]] - """The Pydantic model types that this route is associated with for the - message value. - """ - - kwargs: dict[str, Any] | None = None - """Keyword arguments to pass to the callback.""" - - def matches( - self, topic: str, key: AvroBaseModel, value: AvroBaseModel - ) -> bool: - """Determine if this route matches the given topic name and Pydantic - key and value models. - """ - return ( - topic in self.topics - and type(key) in self.key_models - and type(value) in self.value_models - ) - - -@dataclass -class MessageMetadata: - """Metadata about a Kafka message.""" - - topic: str - """The Kafka topic name.""" - - offset: int - """The Kafka message offset in the partition""" - - partition: int - """The Kafka partition.""" - - serialized_key_size: int - """The size of the serialized key, in bytes.""" - - serialized_value_size: int - """The size of the serialized value, in bytes.""" - - headers: dict[str, bytes] - """The Kafka message headers.""" - - @classmethod - def from_consumer_record(cls, record: ConsumerRecord) -> Self: - """Create a MessageMetadata instance from a ConsumerRecord.""" - return cls( - topic=record.topic, - offset=record.offset, - partition=record.partition, - serialized_key_size=record.serialized_key_size, - serialized_value_size=record.serialized_value_size, - headers=record.headers, - ) - - -class PydanticAIOKafkaConsumer: - """A Kafka consumer that deserializes messages into Pydantic models and - routes them to handlers. - """ - - def __init__( - self, - *, - schema_manager: PydanticSchemaManager, - consumer: AIOKafkaConsumer, - logger: BoundLogger, - ) -> None: - self._schema_manager = schema_manager - self._consumer = consumer - self._logger = logger - self._routes: list[Route] = [] - - async def start(self) -> None: - """Start the consumer.""" - await self._consumer.start() - self._logger.info("Started Kafka consumer") - try: - # Consume messages - async for msg in self._consumer: - # print("consumed: ", msg.topic, msg.partition, msg.offset, - # msg.key, msg.value, msg.timestamp) - self._logger.debug( - "Got kafka message", - topic=msg.topic, - partition=msg.partition, - offset=msg.offset, - timestamp=msg.timestamp, - ) - try: - await self._handle_message(msg) - except Exception: - self._logger.exception( - "Error handling message", - topic=msg.topic, - partition=msg.partition, - offset=msg.offset, - ) - self._logger.debug( - "Finished handling message", - topic=msg.topic, - partition=msg.partition, - offset=msg.offset, - ) - finally: - # Will leave consumer group; perform autocommit if enabled. - self._logger.info("Stopping Kafka consumer") - await self._consumer.stop() - - async def _handle_message(self, msg: ConsumerRecord) -> None: - """Handle a Kafka message by deserializing the key and value into - Pydantic models and routing to a handler. - """ - try: - key = await self._schema_manager.deserialize(msg.key) - value = await self._schema_manager.deserialize(msg.value) - except UnmanagedSchemaError: - self._logger.exception( - "Could not deserialize message due to unmanaged schema", - topic=msg.topic, - partition=msg.partition, - offset=msg.offset, - ) - return - message_metadata = MessageMetadata.from_consumer_record(msg) - self._logger.debug( - "Deserialized message", - key=key.dict(), - value=value.dict(), - ) - match_count = 0 - for route in self._routes: - if route.matches(msg.topic, key, value): - match_count += 1 - self._logger.debug( - "Routing message to handler", - handler=route, - ) - kwargs = route.kwargs or {} - await route.callback( - message_metadata=message_metadata, - key=key, - value=value, - **kwargs, - ) - if match_count == 0: - self._logger.warning( - "No matching route for message", - topic=msg.topic, - partition=msg.partition, - offset=msg.offset, - ) - - async def register_models( - self, models: Iterable[type[AvroBaseModel]] - ) -> None: - """Pre-register Pydantic models with the schema manager.""" - await self._schema_manager.register_models(models) - - async def add_route( - self, - callback: HandlerProtocol, - topics: Sequence[str], - key_models: Sequence[type[AvroBaseModel]], - value_models: Sequence[type[AvroBaseModel]], - kwargs: dict[str, Any] | None = None, - ) -> None: - """Register a handler for a Kafka topic given the support key and - value models. - """ - await self.register_models(key_models) - await self.register_models(value_models) - self._routes.append( - Route(callback, topics, key_models, value_models, kwargs) - ) - - -async def consume_kafka_messages() -> None: - """Consume Kafka messages.""" - logger = get_logger("ook") - factory = context_dependency.create_factory(logger) - - schema_manager = factory.schema_manager - aiokafka_consumer = AIOKafkaConsumer( - config.ingest_kafka_topic, - bootstrap_servers=config.kafka.bootstrap_servers, - group_id=config.kafka_consumer_group_id, - security_protocol=config.kafka.security_protocol, - ssl_context=config.kafka.ssl_context, - ) - logger.info("Starting Kafka consumer") - - consumer = PydanticAIOKafkaConsumer( - schema_manager=schema_manager, - consumer=aiokafka_consumer, - logger=logger, - ) - await consumer.add_route( - cast(HandlerProtocol, handle_ltd_document_ingest), - [config.ingest_kafka_topic], - [UrlIngestKeyV1], - [LtdUrlIngestV2], - ) - await consumer.start() diff --git a/src/ook/kafkarouter.py b/src/ook/kafkarouter.py new file mode 100644 index 0000000..d9f71db --- /dev/null +++ b/src/ook/kafkarouter.py @@ -0,0 +1,19 @@ +"""The FastStream Kafka router.""" + +# This module holds kafka_router in a separate module to avoid circular +# imports with the consumer_context_dependency. + +from __future__ import annotations + +from faststream.kafka.fastapi import KafkaRouter +from faststream.security import BaseSecurity + +from .config import config + +__all__ = ["kafka_router"] + + +kafka_security = BaseSecurity(ssl_context=config.kafka.ssl_context) +kafka_router = KafkaRouter( + config.kafka.bootstrap_servers, security=kafka_security +) diff --git a/src/ook/main.py b/src/ook/main.py index 01003c1..01844ee 100644 --- a/src/ook/main.py +++ b/src/ook/main.py @@ -9,7 +9,6 @@ from __future__ import annotations -import asyncio import json from collections.abc import AsyncIterator from contextlib import asynccontextmanager @@ -21,12 +20,15 @@ from safir.middleware.x_forwarded import XForwardedMiddleware from structlog import get_logger +from ook.dependencies.consumercontext import consumer_context_dependency from ook.dependencies.context import context_dependency from .config import config from .handlers.external.paths import external_router from .handlers.internal.paths import internal_router -from .handlers.kafka.router import consume_kafka_messages + +# Import kafka router and also load the handler functions. +from .handlers.kafka import kafka_router # type: ignore [attr-defined] __all__ = ["app", "create_openapi"] @@ -38,29 +40,25 @@ async def lifespan(app: FastAPI) -> AsyncIterator: logger.info("Ook is starting up.") logger.info( - "Schema Registry configuration", - registry_url=config.registry_url, - subject_suffix=config.subject_suffix, - subject_compatibility=config.subject_compatibility, + "Configured Kafka", + bootstrap_servers=config.kafka.bootstrap_servers, + security_protocol=config.kafka.security_protocol.name, + ingest_topic=config.ingest_kafka_topic, + consumer_group=config.kafka_consumer_group_id, ) await context_dependency.initialize() + await consumer_context_dependency.initialize() - if config.enable_kafka_consumer: - kafka_consumer_task = asyncio.create_task(consume_kafka_messages()) - - logger.info("Ook start up complete.") - - yield + async with kafka_router.lifespan_context(app): + logger.info("Ook start up complete.") + yield # Shut down logger.info("Ook is shutting down.") - if config.enable_kafka_consumer: - kafka_consumer_task.cancel() - await kafka_consumer_task - await context_dependency.aclose() + await consumer_context_dependency.aclose() logger.info("Ook shut down up complete.") @@ -81,11 +79,12 @@ async def lifespan(app: FastAPI) -> AsyncIterator: redoc_url=f"{config.path_prefix}/redoc", lifespan=lifespan, ) -"""The main FastAPI application for squarebot.""" +"""The main FastAPI application for ook.""" # Attach the routers. app.include_router(internal_router) app.include_router(external_router, prefix=config.path_prefix) +app.include_router(kafka_router) # Set up middleware app.add_middleware(XForwardedMiddleware) diff --git a/src/ook/services/algoliadocindex.py b/src/ook/services/algoliadocindex.py index fc85608..4a527fd 100644 --- a/src/ook/services/algoliadocindex.py +++ b/src/ook/services/algoliadocindex.py @@ -50,9 +50,9 @@ async def save_document_records( """ # Partition the records by base URL since each URL has a different # surrogate key, and thus is old records need to be deleted separately. - partitioned_records: defaultdict[ - HttpUrl, list[DocumentRecord] - ] = defaultdict(list) + partitioned_records: defaultdict[HttpUrl, list[DocumentRecord]] = ( + defaultdict(list) + ) for record in records: partitioned_records[record.base_url].append(record) for base_url, url_records in partitioned_records.items(): @@ -67,7 +67,7 @@ async def save_document_records( record.export_for_algolia() for record in url_records ] await self._index.save_objects_async(record_objs) - await self.delete_old_records(base_url, surrogate_key) + await self.delete_old_records(str(base_url), surrogate_key) async def save_doc_stub(self, doc: MinimalDocumentModel) -> None: """Add a manually-generated record to the document index.""" diff --git a/src/ook/services/classification.py b/src/ook/services/classification.py index afcbf38..7a24354 100644 --- a/src/ook/services/classification.py +++ b/src/ook/services/classification.py @@ -7,11 +7,11 @@ from datetime import UTC, datetime, timedelta import lxml.html +from faststream.kafka.asyncapi import Publisher from httpx import AsyncClient from safir.datetime import parse_isodatetime from structlog.stdlib import BoundLogger -from ook.config import config from ook.domain.algoliarecord import DocumentSourceType from ook.domain.kafka import ( LtdEditionV1, @@ -19,7 +19,6 @@ LtdUrlIngestV2, UrlIngestKeyV1, ) -from ook.services.kafkaproducer import PydanticKafkaProducer from ook.services.ltdmetadataservice import LtdMetadataService from ..exceptions import LtdSlugClassificationError @@ -54,13 +53,13 @@ def __init__( github_service: GitHubMetadataService, ltd_service: LtdMetadataService, logger: BoundLogger, - kafka_producer: PydanticKafkaProducer, + kafka_ingest_publisher: Publisher, ) -> None: self._http_client = http_client self._logger = logger self._gh_service = github_service self._ltd_service = ltd_service - self._kafka_producer = kafka_producer + self._kafka_ingest_publisher = kafka_ingest_publisher async def queue_ingest_for_updated_ltd_projects( self, window: timedelta @@ -142,10 +141,9 @@ async def queue_ingest_for_ltd_product_slug( ) from e try: - await self._kafka_producer.send( - topic=config.ingest_kafka_topic, - key=kafka_key, - value=kafka_value, + await self._kafka_ingest_publisher.publish( + message=kafka_value.model_dump(mode="json"), + key=kafka_key.model_dump_json().encode("utf-8"), ) except Exception as e: raise LtdSlugClassificationError( @@ -307,6 +305,4 @@ async def _has_metadata_yaml( owner=owner, repo=repo, git_ref=git_ref, path="metadata.yaml" ) response = await self._http_client.get(raw_url) - if response.status_code != 200: - return False - return True + return response.status_code == 200 diff --git a/src/ook/services/kafkaproducer.py b/src/ook/services/kafkaproducer.py deleted file mode 100644 index a0eede3..0000000 --- a/src/ook/services/kafkaproducer.py +++ /dev/null @@ -1,89 +0,0 @@ -"""Kafka producer service that accepts with Pydantic models.""" - -from __future__ import annotations - -from asyncio import Future - -import aiokafka -from dataclasses_avroschema.avrodantic import AvroBaseModel -from kafkit.registry import manager - - -class PydanticKafkaProducer: - """Kafka producer that sends Pydantic models for message values and keys, - built around aiokafka. - - Parameters - ---------- - producer - The aiokafka producer. - schema_manager - The Pydantic schema manager used by the Pydantic Kafka producer. - """ - - def __init__( - self, - producer: aiokafka.AIOKafkaProducer, - schema_manager: manager.PydanticSchemaManager, - ) -> None: - self._producer = producer - self._schema_manager = schema_manager - - @property - def aiokafka_producer(self) -> aiokafka.AIOKafkaProducer: - """The aiokafka producer (access-only).""" - return self._producer - - @property - def schema_manager(self) -> manager.PydanticSchemaManager: - """The Pydantic schema manager used by the Pydantic Kafka - producer (access-only). - """ - return self._schema_manager - - async def send( - self, - *, - topic: str, - value: AvroBaseModel, - key: AvroBaseModel | None = None, - partition: int | None = None, - timestamp_ms: int | None = None, - headers: dict[str, bytes] | None = None, - ) -> Future: - """Send a message to a Kafka topic. - - Parameters - ---------- - topic - The topic to send the message to. - value - The message value. - key - The message key. - partition - The partition to send the message to. - timestamp_ms - The timestamp of the message. - headers - The headers of the message. - - Returns - ------- - asyncio.Future - A future that resolves when the message is sent. - """ - serialized_value = await self._schema_manager.serialize(value) - if key: - serialized_key = await self._schema_manager.serialize(key) - else: - serialized_key = None - - return await self._producer.send( - topic, - value=serialized_value, - key=serialized_key, - partition=partition, - timestamp_ms=timestamp_ms, - headers=headers, - ) diff --git a/src/ook/services/ltdmetadataservice.py b/src/ook/services/ltdmetadataservice.py index 5c9503a..845e9a0 100644 --- a/src/ook/services/ltdmetadataservice.py +++ b/src/ook/services/ltdmetadataservice.py @@ -85,8 +85,9 @@ async def iter_updated_projects( ) except RuntimeError: continue - date_rebuilt = parse_isodatetime(edition["date_rebuilt"]) - if date_rebuilt is None: + try: + date_rebuilt = parse_isodatetime(edition["date_rebuilt"]) + except ValueError: continue if date_rebuilt >= since: yield project_slug diff --git a/tests/conftest.py b/tests/conftest.py index c3d6969..d4597f6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,7 +3,6 @@ from __future__ import annotations from collections.abc import AsyncIterator, Iterator -from unittest.mock import Mock import pytest import pytest_asyncio @@ -16,11 +15,6 @@ from ook.factory import Factory from .support.algoliasearch import MockSearchClient, patch_algoliasearch -from .support.kafkaproducer import patch_aiokafkaproducer -from .support.schemamanager import ( - MockPydanticSchemaManager, - patch_schema_manager, -) @pytest.fixture @@ -29,18 +23,6 @@ def mock_algoliasearch() -> Iterator[MockSearchClient]: yield from patch_algoliasearch() -@pytest.fixture -def mock_schema_manager() -> Iterator[MockPydanticSchemaManager]: - """Return a mock PydanticSchemaManager for testing.""" - yield from patch_schema_manager() - - -@pytest.fixture -def mock_kafka_producer() -> Iterator[Mock]: - """Return a mock KafkaProducer for testing.""" - yield from patch_aiokafkaproducer() - - @pytest_asyncio.fixture async def http_client() -> AsyncIterator[AsyncClient]: async with AsyncClient() as client: @@ -49,8 +31,6 @@ async def http_client() -> AsyncIterator[AsyncClient]: @pytest_asyncio.fixture async def app( - mock_kafka_producer: Mock, - mock_schema_manager: MockPydanticSchemaManager, mock_algoliasearch: MockSearchClient, ) -> AsyncIterator[FastAPI]: """Return a configured test application. @@ -71,11 +51,9 @@ async def client(app: FastAPI) -> AsyncIterator[AsyncClient]: @pytest_asyncio.fixture async def factory( - mock_kafka_producer: Mock, - mock_schema_manager: MockPydanticSchemaManager, mock_algoliasearch: MockSearchClient, ) -> AsyncIterator[Factory]: - """Return a configured ``Factory``.""" + """Return a configured ``Factory`` without setting up a FastAPI app.""" logger = structlog.get_logger("ook") async with Factory.create_standalone(logger=logger) as factory: yield factory diff --git a/tests/domain/ltdtechnote_test.py b/tests/domain/ltdtechnote_test.py index ae487ee..4a8a763 100644 --- a/tests/domain/ltdtechnote_test.py +++ b/tests/domain/ltdtechnote_test.py @@ -30,9 +30,12 @@ def test_sqr_075() -> None: assert records[1].h2 == "Problem statement" assert records[1].author_names == ["Jonathan Sick"] - assert records[1].base_url == "https://sqr-075.lsst.io/" + assert str(records[1].base_url) == "https://sqr-075.lsst.io/" assert records[1].url == "https://sqr-075.lsst.io/#problem-statement" assert records[1].handle == "SQR-075" assert records[1].number == 75 assert records[1].series == "SQR" - assert records[1].github_repo_url == "https://github.com/lsst-sqre/sqr-075" + assert ( + str(records[1].github_repo_url) + == "https://github.com/lsst-sqre/sqr-075" + ) diff --git a/tests/domain/test_kafka.py b/tests/domain/test_kafka.py deleted file mode 100644 index 942ed6c..0000000 --- a/tests/domain/test_kafka.py +++ /dev/null @@ -1,21 +0,0 @@ -"""Tests for the Kafka models in ``ook.domain.kafka``.""" - -from __future__ import annotations - -import json - -from ook.domain.kafka import LtdUrlIngestV2, UrlIngestKeyV1 - - -def test_url_ingest_key_v1() -> None: - """Test ``UrlIngestKeyV1``.""" - schema = json.loads(UrlIngestKeyV1.avro_schema()) - assert schema["name"] == "url_ingest_key_v1" - assert schema["namespace"] == "lsst.square-events.ook" - - -def test_ltd_url_ingest_v2() -> None: - """Test the ``LtdUrlIngestV2`` model.""" - schema = json.loads(LtdUrlIngestV2.avro_schema()) - assert schema["name"] == "ltd_url_ingest_v2" - assert schema["namespace"] == "lsst.square-events.ook" diff --git a/tests/services/githubmetadata_test.py b/tests/services/githubmetadata_test.py index b00583a..8de0717 100644 --- a/tests/services/githubmetadata_test.py +++ b/tests/services/githubmetadata_test.py @@ -32,7 +32,7 @@ async def test_parse_repo_url( """Test parsing a GitHub repository URL into an owner and name.""" logger = structlog.get_logger() gh_factory = GitHubAppClientFactory( - id="12345", + id=12345, key="secret", name="lsst-sqre/ook", http_client=http_client, @@ -51,7 +51,7 @@ async def test_format_raw_url(http_client: AsyncClient) -> None: ) logger = structlog.get_logger() gh_factory = GitHubAppClientFactory( - id="12345", + id=12345, key="secret", name="lsst-sqre/ook", http_client=http_client, diff --git a/tests/support/algoliasearch.py b/tests/support/algoliasearch.py index de41a0b..e619476 100644 --- a/tests/support/algoliasearch.py +++ b/tests/support/algoliasearch.py @@ -10,8 +10,7 @@ class MockSearchClient: """A mock Algolia SearchClient for testing.""" - def __init__(self, *args: Any, **kwargs: Any) -> None: - ... + def __init__(self, *args: Any, **kwargs: Any) -> None: ... @classmethod async def create(cls, *args: Any, **kwargs: Any) -> Self: @@ -29,8 +28,7 @@ def init_index(self, index_name: str) -> MockAlgoliaIndex: class MockAlgoliaIndex: """A mock Algolia index for testing.""" - def __init__(self, index_name: str) -> None: - ... + def __init__(self, index_name: str) -> None: ... async def save_objects_async(self, objects: list[dict[str, Any]]) -> None: """Save objects to the index.""" @@ -45,6 +43,5 @@ def browse_objects(self, settings: dict[str, Any]) -> list[dict[str, Any]]: def patch_algoliasearch() -> Iterator[MockSearchClient]: """Patch the Algolia API client.""" - mock = MockSearchClient() with patch("algoliasearch.search_client.SearchClient") as mock: yield mock.return_value diff --git a/tests/support/kafkaproducer.py b/tests/support/kafkaproducer.py deleted file mode 100644 index 5c83503..0000000 --- a/tests/support/kafkaproducer.py +++ /dev/null @@ -1,13 +0,0 @@ -"""Mock PydanticKafkaProducer for testing.""" - -from collections.abc import Iterator -from unittest.mock import AsyncMock, Mock, patch - - -def patch_aiokafkaproducer() -> Iterator[Mock]: - """Patch aiokafka.AIOKafkaProducer with an autospec'd mock.""" - with patch("aiokafka.AIOKafkaProducer", autospec=True) as mock_producer: - mock_producer.start = AsyncMock(return_value=None) - mock_producer.stop = AsyncMock(return_value=None) - mock_producer.send = AsyncMock(return_value=None) - yield mock_producer diff --git a/tests/support/schemamanager.py b/tests/support/schemamanager.py deleted file mode 100644 index fe26c75..0000000 --- a/tests/support/schemamanager.py +++ /dev/null @@ -1,86 +0,0 @@ -"""Support for mocking PydanticSchemaManager in tests.""" - -from __future__ import annotations - -from collections.abc import Iterator -from unittest.mock import patch - -from dataclasses_avroschema.avrodantic import AvroBaseModel -from kafkit.registry.manager._pydantic import CachedSchema -from kafkit.registry.sansio import MockRegistryApi, RegistryApi - - -class MockPydanticSchemaManager: - """A mock PydanticSchemaManager for testing.""" - - def __init__(self, registry_api: RegistryApi, suffix: str = "") -> None: - self._registry = registry_api - self._models: dict[str, CachedSchema] = {} - self._suffix = suffix - - async def register_models( - self, - models: list[type[AvroBaseModel]], - compatibility: str | None = None, - ) -> None: - """Register models into the mock's cache.""" - for model in models: - await self.register_model(model, compatibility=compatibility) - - async def register_model( - self, model: type[AvroBaseModel], compatibility: str | None = None - ) -> None: - """Register a model into the mock's cache.""" - self._models[model.__name__] = self._cache_model(model) - - def _cache_model( - self, model: AvroBaseModel | type[AvroBaseModel] - ) -> CachedSchema: - schema_fqn = self._get_model_fqn(model) - avro_schema = model.avro_schema_to_python() - - if isinstance(model, AvroBaseModel): - model_type = model.__class__ - else: - model_type = model - - self._models[schema_fqn] = CachedSchema( - schema=avro_schema, model=model_type - ) - - return self._models[schema_fqn] - - def _get_model_fqn( - self, model: AvroBaseModel | type[AvroBaseModel] - ) -> str: - # Mypy can't detect the Meta class on the model, so we have to ignore - # those lines. - - try: - name = model.Meta.schema_name # type: ignore [union-attr] - except AttributeError: - name = model.__class__.__name__ - - try: - namespace = model.Meta.namespace # type: ignore [union-attr] - except AttributeError: - namespace = None - - if namespace: - name = f"{namespace}.{name}" - - if self._suffix: - name += self._suffix - - return name - - -def patch_schema_manager() -> Iterator[MockPydanticSchemaManager]: - """Replace PydanticSchemaManager with a mock.""" - registry = MockRegistryApi(url="http://localhost:8081") - mock_manager = MockPydanticSchemaManager(registry_api=registry) - with patch( - "kafkit.registry.manager.PydanticSchemaManager", - return_value=mock_manager, - ): - yield mock_manager