diff --git a/src/rubintv/background/bucketpoller.py b/src/rubintv/background/bucketpoller.py new file mode 100644 index 00000000..7ccac7df --- /dev/null +++ b/src/rubintv/background/bucketpoller.py @@ -0,0 +1,61 @@ +import asyncio + +import boto3 + +from rubintv.models.models import Event, Location, get_current_day_obs + + +class BucketPoller: + _client = boto3.client("s3") + _current_lists: dict[str, dict[str, list | None]] = {} + + def __init__(self, locations: list[Location]) -> None: + self.locations = locations + # cam list is None by default or list if polled + self._current_lists = { + loc.name: {cam.name: None for cam in loc.cameras} + for loc in locations + } + + async def poll_buckets_for_todays_data(self) -> None: + while True: + current_day_obs = get_current_day_obs() + for loc in self.locations: + for camera in loc.cameras: + prefix = f"{camera.name}/{current_day_obs}" + objects = self.list_objects(loc.bucket_name, prefix) + if objects != self._current_lists[loc.name][camera.name]: + # msg = {f"{loc.name}/{camera.name}", objects} + print(f"{loc.name}/{camera.name}") + print(objects) + # await notify_current_camera_table_clients(msg) + self._current_lists[loc.name][camera.name] = objects + await asyncio.sleep(0.5) + + def list_objects(self, bucket_name: str, prefix: str) -> list[dict]: + objects = [] + response = self._client.list_objects_v2( + Bucket=bucket_name, Prefix=prefix + ) + while True: + for content in response.get("Contents", []): + object = {} + object["url"] = content["Key"] + object["hash"] = content["ETag"].strip('"') + objects.append(object) + if "NextContinuationToken" not in response: + break + response = self._client.list_objects_v2( + Bucket=bucket_name, + Prefix=prefix, + ContinuationToken=response["NextContinuationToken"], + ) + return objects + + def get_object(self, bucket_name: str, object_id: str) -> dict: + return self._client.get_object(Bucket=bucket_name, Key=object_id) + + +def objects_to_events(objects: list[dict]) -> list[Event]: + events = [Event(**object) for object in objects] + return events diff --git a/src/rubintv/handlers/external.py b/src/rubintv/handlers/external.py index b10fb276..da7070d7 100644 --- a/src/rubintv/handlers/external.py +++ b/src/rubintv/handlers/external.py @@ -1,5 +1,4 @@ """Handlers for the app's external root, ``/rubintv/``.""" -from datetime import date from itertools import chain from typing import Tuple @@ -8,15 +7,9 @@ from safir.dependencies.logger import logger_dependency from structlog.stdlib import BoundLogger -from rubintv.inittemplates import get_templates from rubintv.models.helpers import find_first -from rubintv.models.models import ( - Camera, - Event, - Location, - build_prefix_with_date, - get_current_day_obs, -) +from rubintv.models.models import Camera, Location +from rubintv.templates_init import get_templates __all__ = ["get_home", "external_router", "templates"] @@ -71,35 +64,6 @@ def get_location_camera( return (location, camera) -@external_router.get( - "/api/location/{location_name}/camera/{camera_name}/latest" -) -def get_camera_latest_data( - location_name: str, - camera_name: str, - request: Request, - logger: BoundLogger = Depends(logger_dependency), -) -> dict[str, date | list]: - location, camera = get_location_camera(location_name, camera_name, request) - day_obs = get_current_day_obs() - prefix = build_prefix_with_date(camera, day_obs) - logger.info(f"Looking for data for: {prefix}") - events = scrape_data_for_prefix(location, prefix) - return {"date": day_obs, "events": events} - - -def scrape_data_for_prefix(location: Location, prefix: str) -> list: - bucket_handler = location.bucket_handler - objects = bucket_handler.list_objects(prefix) - events = objects_to_events(objects) - return events - - -def objects_to_events(objects: list[dict]) -> list[Event]: - events = [Event(**object) for object in objects] - return events - - @external_router.get( "/{location_name}", response_class=HTMLResponse, name="location" ) diff --git a/src/rubintv/main.py b/src/rubintv/main.py index c5f12352..cca7a2a4 100644 --- a/src/rubintv/main.py +++ b/src/rubintv/main.py @@ -6,9 +6,11 @@ constructed when this module is loaded and is not deferred until a function is called. """ - +import asyncio +from contextlib import asynccontextmanager from importlib.metadata import metadata, version from pathlib import Path +from typing import AsyncGenerator from fastapi import FastAPI from fastapi.staticfiles import StaticFiles @@ -17,6 +19,7 @@ from safir.logging import configure_logging, configure_uvicorn_logging from safir.middleware.x_forwarded import XForwardedMiddleware +from .background.bucketpoller import BucketPoller from .config import config from .handlers.external import external_router from .handlers.internal import internal_router @@ -33,6 +36,34 @@ ) configure_uvicorn_logging(config.log_level) +# Initialise model data fixtures +models = ModelsInitiator() + + +@asynccontextmanager +async def lifespan(app: FastAPI) -> AsyncGenerator: + # Start mocking for s3 buckets. + # Remove when actual s3 is populated. + mock = mock_s3() + mock.start() + + # Generate mock test buckets + # Remove when actual s3 is populated. + mock_up_data(models.locations, models.cameras) + + # initialise the background bucket poller + bp = BucketPoller(models.locations) + polling = asyncio.create_task(bp.poll_buckets_for_todays_data()) + + yield + + polling.cancel() + + # Remove mocking when actual s3 is populated. + mock.stop() + await http_client_dependency.aclose() + + """The main FastAPI application for rubintv.""" app = FastAPI( title="rubintv", @@ -42,26 +73,11 @@ docs_url=f"/{config.path_prefix}/docs", redoc_url=f"/{config.path_prefix}/redoc", debug=True, + lifespan=lifespan, ) -# Start mocking for s3 buckets. -# Remove when actual s3 is populated. -mock = mock_s3() -mock.start() - -# Initialise model data fixtures -models = ModelsInitiator() app.state.fixtures = models -# Initialise bucket handlers -# for loc in models.locations: -# loc: Location -# loc.bucket_handler = S3BucketHandler(loc.bucket_name) - -# Generate mock test buckets -# Remove when actual s3 is populated. -mock_up_data(models.locations, models.cameras) - # Intwine jinja2 templating app.mount( "/static", @@ -75,10 +91,3 @@ # Add middleware. app.add_middleware(XForwardedMiddleware) - - -@app.on_event("shutdown") -async def shutdown_event() -> None: - # Remove mocking when actual s3 is populated. - mock.stop() - await http_client_dependency.aclose() diff --git a/src/rubintv/mockdata.py b/src/rubintv/mockdata.py index 1d32d337..d529bfc9 100644 --- a/src/rubintv/mockdata.py +++ b/src/rubintv/mockdata.py @@ -45,25 +45,30 @@ def mock_up_data(locations: list[Location], cameras: list[Camera]) -> None: if camera.channels: for index, channel in enumerate(camera.channels): print( - f"Uploading testcard to {bucket_name} for \ - {camera_name} in {channel.name}" + f"Uploading testcard to {location.name} for" + f"{camera_name} in {channel.name}" ) # upload a file for today upload_file( Path(__file__).parent / "static/images/testcard_f.jpg", bucket_name, - f"{camera_name}/{today}/{channel.name}/\ - {index:06}.jpg", + ( + f"{camera_name}/{today}/{channel.name}/" + f"{index:06}.jpg" + ), ) # upload one for 100 days ago upload_file( Path(__file__).parent / "static/images/testcard_f.jpg", bucket_name, - f"{camera_name}/{the_past}/{channel.name}/\ - {index:06}.jpg", + ( + f"{camera_name}/{the_past}/{channel.name}/" + f"{index:06}.jpg" + ), ) + print() def upload_file( diff --git a/src/rubintv/models/buckethandler.py b/src/rubintv/models/buckethandler.py index 74b22457..0b92ca15 100644 --- a/src/rubintv/models/buckethandler.py +++ b/src/rubintv/models/buckethandler.py @@ -10,13 +10,14 @@ class S3BucketHandler: The BucketHandler interface. """ + _client = boto3.client("s3") + def __init__(self, bucket_handle: str) -> None: - self.client = boto3.client("s3") self.bucket_handle = bucket_handle def list_objects(self, prefix: str) -> list[dict]: objects = [] - response = self.client.list_objects_v2( + response = self._client.list_objects_v2( Bucket=self.bucket_handle, Prefix=prefix ) while True: @@ -27,7 +28,7 @@ def list_objects(self, prefix: str) -> list[dict]: objects.append(object) if "NextContinuationToken" not in response: break - response = self.client.list_objects_v2( + response = self._client.list_objects_v2( Bucket=self.bucket_handle, Prefix=prefix, ContinuationToken=response["NextContinuationToken"], @@ -35,4 +36,6 @@ def list_objects(self, prefix: str) -> list[dict]: return objects def get_object(self, object_id: str) -> dict: - return self.client.get_object(Bucket=self.bucket_handle, Key=object_id) + return self._client.get_object( + Bucket=self.bucket_handle, Key=object_id + ) diff --git a/src/rubintv/models/models.py b/src/rubintv/models/models.py index 0f156305..4911556f 100644 --- a/src/rubintv/models/models.py +++ b/src/rubintv/models/models.py @@ -2,15 +2,12 @@ import re from datetime import date, datetime, timedelta -from functools import cached_property from typing import Any, Type from dateutil.tz import gettz -from pydantic import BaseModel, computed_field, field_validator +from pydantic import BaseModel, field_validator from pydantic.dataclasses import dataclass -from rubintv.models.buckethandler import S3BucketHandler - __all__ = [ "Location", "Channel", @@ -23,23 +20,6 @@ ] -class Location(BaseModel, arbitrary_types_allowed=True): - name: str - title: str - bucket_name: str - services: list[str] - camera_groups: dict[str, list[str]] - logo: str = "" - # bucket_handler: S3BucketHandler | None = None - - # see https://docs.pydantic.dev/dev-v2/usage/computed_fields/ and - # https://github.com/python/mypy/issues/1362 for mypy ignore explanation - @computed_field # type: ignore[misc] - @cached_property - def bucket_handler(self) -> S3BucketHandler: - return S3BucketHandler(self.bucket_name) - - class Channel(BaseModel): name: str title: str @@ -68,6 +48,16 @@ def default_as_name(cls: Type, v: str, values: Any) -> str: return v or values.get("name") +class Location(BaseModel, arbitrary_types_allowed=True): + name: str + title: str + bucket_name: str + services: list[str] + camera_groups: dict[str, list[str]] + cameras: list[Camera] = [] + logo: str = "" + + class Heartbeat(BaseModel): name: str title: str diff --git a/src/rubintv/models/models_init.py b/src/rubintv/models/models_init.py index b6c7e1ad..ee5c1693 100644 --- a/src/rubintv/models/models_init.py +++ b/src/rubintv/models/models_init.py @@ -1,10 +1,12 @@ +from itertools import chain from pathlib import Path from typing import Any, Type import yaml from pydantic import BaseModel -from .models import Camera, Heartbeat, Location +from rubintv.models.helpers import find_first +from rubintv.models.models import Camera, Heartbeat, Location __all__ = ["ModelsInitiator", "dict_from_list_of_named_objects"] @@ -26,14 +28,29 @@ def __init__(self) -> None: models_file_path = Path(__file__).parent / "models_data.yaml" with open(models_file_path, "r") as file: data = yaml.safe_load(file) - self.locations = self._populate_model(Location, data["locations"]) cameras = self._populate_model(Camera, data["cameras"]) self.cameras = self._attach_metadata_cols( cameras, data["metadata_cols"] ) + locations = self._populate_model(Location, data["locations"]) + self.locations = self._attach_cameras_to_locations( + self.cameras, locations + ) heartbeats = self._populate_model(Heartbeat, data["heartbeats"]) self.heartbeats = self._inject_heartbeat_channels(heartbeats) + def _attach_cameras_to_locations( + self, cameras: list[Camera], locations: list[Location] + ) -> list[Location]: + for location in locations: + camera_groups = location.camera_groups.values() + location_cams = chain(*camera_groups) + for cam_name in location_cams: + camera = find_first(cameras, "name", cam_name) + if camera: + location.cameras.append(camera) + return locations + def _populate_model( self, cls: Type[BaseModel], data_dict: dict[str, list] ) -> list[Any]: diff --git a/src/rubintv/inittemplates.py b/src/rubintv/templates_init.py similarity index 100% rename from src/rubintv/inittemplates.py rename to src/rubintv/templates_init.py