Skip to content

Commit

Permalink
First attempt adding background bucket polling
Browse files Browse the repository at this point in the history
  • Loading branch information
ugyballoons committed Jul 24, 2023
1 parent 2a06031 commit f332e2d
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 95 deletions.
61 changes: 61 additions & 0 deletions src/rubintv/background/bucketpoller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import asyncio

import boto3

from rubintv.models.models import Event, Location, get_current_day_obs


class BucketPoller:
_client = boto3.client("s3")
_current_lists: dict[str, dict[str, list | None]] = {}

def __init__(self, locations: list[Location]) -> None:
self.locations = locations
# cam list is None by default or list if polled
self._current_lists = {
loc.name: {cam.name: None for cam in loc.cameras}
for loc in locations
}

async def poll_buckets_for_todays_data(self) -> None:
while True:
current_day_obs = get_current_day_obs()
for loc in self.locations:
for camera in loc.cameras:
prefix = f"{camera.name}/{current_day_obs}"
objects = self.list_objects(loc.bucket_name, prefix)
if objects != self._current_lists[loc.name][camera.name]:
# msg = {f"{loc.name}/{camera.name}", objects}
print(f"{loc.name}/{camera.name}")
print(objects)
# await notify_current_camera_table_clients(msg)
self._current_lists[loc.name][camera.name] = objects
await asyncio.sleep(0.5)

def list_objects(self, bucket_name: str, prefix: str) -> list[dict]:
objects = []
response = self._client.list_objects_v2(
Bucket=bucket_name, Prefix=prefix
)
while True:
for content in response.get("Contents", []):
object = {}
object["url"] = content["Key"]
object["hash"] = content["ETag"].strip('"')
objects.append(object)
if "NextContinuationToken" not in response:
break
response = self._client.list_objects_v2(
Bucket=bucket_name,
Prefix=prefix,
ContinuationToken=response["NextContinuationToken"],
)
return objects

def get_object(self, bucket_name: str, object_id: str) -> dict:
return self._client.get_object(Bucket=bucket_name, Key=object_id)


def objects_to_events(objects: list[dict]) -> list[Event]:
events = [Event(**object) for object in objects]
return events
40 changes: 2 additions & 38 deletions src/rubintv/handlers/external.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""Handlers for the app's external root, ``/rubintv/``."""
from datetime import date
from itertools import chain
from typing import Tuple

Expand All @@ -8,15 +7,9 @@
from safir.dependencies.logger import logger_dependency
from structlog.stdlib import BoundLogger

from rubintv.inittemplates import get_templates
from rubintv.models.helpers import find_first
from rubintv.models.models import (
Camera,
Event,
Location,
build_prefix_with_date,
get_current_day_obs,
)
from rubintv.models.models import Camera, Location
from rubintv.templates_init import get_templates

__all__ = ["get_home", "external_router", "templates"]

Expand Down Expand Up @@ -71,35 +64,6 @@ def get_location_camera(
return (location, camera)


@external_router.get(
"/api/location/{location_name}/camera/{camera_name}/latest"
)
def get_camera_latest_data(
location_name: str,
camera_name: str,
request: Request,
logger: BoundLogger = Depends(logger_dependency),
) -> dict[str, date | list]:
location, camera = get_location_camera(location_name, camera_name, request)
day_obs = get_current_day_obs()
prefix = build_prefix_with_date(camera, day_obs)
logger.info(f"Looking for data for: {prefix}")
events = scrape_data_for_prefix(location, prefix)
return {"date": day_obs, "events": events}


def scrape_data_for_prefix(location: Location, prefix: str) -> list:
bucket_handler = location.bucket_handler
objects = bucket_handler.list_objects(prefix)
events = objects_to_events(objects)
return events


def objects_to_events(objects: list[dict]) -> list[Event]:
events = [Event(**object) for object in objects]
return events


@external_router.get(
"/{location_name}", response_class=HTMLResponse, name="location"
)
Expand Down
57 changes: 33 additions & 24 deletions src/rubintv/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
constructed when this module is loaded and is not deferred until a function is
called.
"""

import asyncio
from contextlib import asynccontextmanager
from importlib.metadata import metadata, version
from pathlib import Path
from typing import AsyncGenerator

from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
Expand All @@ -17,6 +19,7 @@
from safir.logging import configure_logging, configure_uvicorn_logging
from safir.middleware.x_forwarded import XForwardedMiddleware

from .background.bucketpoller import BucketPoller
from .config import config
from .handlers.external import external_router
from .handlers.internal import internal_router
Expand All @@ -33,6 +36,34 @@
)
configure_uvicorn_logging(config.log_level)

# Initialise model data fixtures
models = ModelsInitiator()


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator:
# Start mocking for s3 buckets.
# Remove when actual s3 is populated.
mock = mock_s3()
mock.start()

# Generate mock test buckets
# Remove when actual s3 is populated.
mock_up_data(models.locations, models.cameras)

# initialise the background bucket poller
bp = BucketPoller(models.locations)
polling = asyncio.create_task(bp.poll_buckets_for_todays_data())

yield

polling.cancel()

# Remove mocking when actual s3 is populated.
mock.stop()
await http_client_dependency.aclose()


"""The main FastAPI application for rubintv."""
app = FastAPI(
title="rubintv",
Expand All @@ -42,26 +73,11 @@
docs_url=f"/{config.path_prefix}/docs",
redoc_url=f"/{config.path_prefix}/redoc",
debug=True,
lifespan=lifespan,
)

# Start mocking for s3 buckets.
# Remove when actual s3 is populated.
mock = mock_s3()
mock.start()

# Initialise model data fixtures
models = ModelsInitiator()
app.state.fixtures = models

# Initialise bucket handlers
# for loc in models.locations:
# loc: Location
# loc.bucket_handler = S3BucketHandler(loc.bucket_name)

# Generate mock test buckets
# Remove when actual s3 is populated.
mock_up_data(models.locations, models.cameras)

# Intwine jinja2 templating
app.mount(
"/static",
Expand All @@ -75,10 +91,3 @@

# Add middleware.
app.add_middleware(XForwardedMiddleware)


@app.on_event("shutdown")
async def shutdown_event() -> None:
# Remove mocking when actual s3 is populated.
mock.stop()
await http_client_dependency.aclose()
17 changes: 11 additions & 6 deletions src/rubintv/mockdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,25 +45,30 @@ def mock_up_data(locations: list[Location], cameras: list[Camera]) -> None:
if camera.channels:
for index, channel in enumerate(camera.channels):
print(
f"Uploading testcard to {bucket_name} for \
{camera_name} in {channel.name}"
f"Uploading testcard to {location.name} for"
f"{camera_name} in {channel.name}"
)
# upload a file for today
upload_file(
Path(__file__).parent
/ "static/images/testcard_f.jpg",
bucket_name,
f"{camera_name}/{today}/{channel.name}/\
{index:06}.jpg",
(
f"{camera_name}/{today}/{channel.name}/"
f"{index:06}.jpg"
),
)
# upload one for 100 days ago
upload_file(
Path(__file__).parent
/ "static/images/testcard_f.jpg",
bucket_name,
f"{camera_name}/{the_past}/{channel.name}/\
{index:06}.jpg",
(
f"{camera_name}/{the_past}/{channel.name}/"
f"{index:06}.jpg"
),
)
print()


def upload_file(
Expand Down
11 changes: 7 additions & 4 deletions src/rubintv/models/buckethandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ class S3BucketHandler:
The BucketHandler interface.
"""

_client = boto3.client("s3")

def __init__(self, bucket_handle: str) -> None:
self.client = boto3.client("s3")
self.bucket_handle = bucket_handle

def list_objects(self, prefix: str) -> list[dict]:
objects = []
response = self.client.list_objects_v2(
response = self._client.list_objects_v2(
Bucket=self.bucket_handle, Prefix=prefix
)
while True:
Expand All @@ -27,12 +28,14 @@ def list_objects(self, prefix: str) -> list[dict]:
objects.append(object)
if "NextContinuationToken" not in response:
break
response = self.client.list_objects_v2(
response = self._client.list_objects_v2(
Bucket=self.bucket_handle,
Prefix=prefix,
ContinuationToken=response["NextContinuationToken"],
)
return objects

def get_object(self, object_id: str) -> dict:
return self.client.get_object(Bucket=self.bucket_handle, Key=object_id)
return self._client.get_object(
Bucket=self.bucket_handle, Key=object_id
)
32 changes: 11 additions & 21 deletions src/rubintv/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@

import re
from datetime import date, datetime, timedelta
from functools import cached_property
from typing import Any, Type

from dateutil.tz import gettz
from pydantic import BaseModel, computed_field, field_validator
from pydantic import BaseModel, field_validator
from pydantic.dataclasses import dataclass

from rubintv.models.buckethandler import S3BucketHandler

__all__ = [
"Location",
"Channel",
Expand All @@ -23,23 +20,6 @@
]


class Location(BaseModel, arbitrary_types_allowed=True):
name: str
title: str
bucket_name: str
services: list[str]
camera_groups: dict[str, list[str]]
logo: str = ""
# bucket_handler: S3BucketHandler | None = None

# see https://docs.pydantic.dev/dev-v2/usage/computed_fields/ and
# https://github.com/python/mypy/issues/1362 for mypy ignore explanation
@computed_field # type: ignore[misc]
@cached_property
def bucket_handler(self) -> S3BucketHandler:
return S3BucketHandler(self.bucket_name)


class Channel(BaseModel):
name: str
title: str
Expand Down Expand Up @@ -68,6 +48,16 @@ def default_as_name(cls: Type, v: str, values: Any) -> str:
return v or values.get("name")


class Location(BaseModel, arbitrary_types_allowed=True):
name: str
title: str
bucket_name: str
services: list[str]
camera_groups: dict[str, list[str]]
cameras: list[Camera] = []
logo: str = ""


class Heartbeat(BaseModel):
name: str
title: str
Expand Down
21 changes: 19 additions & 2 deletions src/rubintv/models/models_init.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from itertools import chain
from pathlib import Path
from typing import Any, Type

import yaml
from pydantic import BaseModel

from .models import Camera, Heartbeat, Location
from rubintv.models.helpers import find_first
from rubintv.models.models import Camera, Heartbeat, Location

__all__ = ["ModelsInitiator", "dict_from_list_of_named_objects"]

Expand All @@ -26,14 +28,29 @@ def __init__(self) -> None:
models_file_path = Path(__file__).parent / "models_data.yaml"
with open(models_file_path, "r") as file:
data = yaml.safe_load(file)
self.locations = self._populate_model(Location, data["locations"])
cameras = self._populate_model(Camera, data["cameras"])
self.cameras = self._attach_metadata_cols(
cameras, data["metadata_cols"]
)
locations = self._populate_model(Location, data["locations"])
self.locations = self._attach_cameras_to_locations(
self.cameras, locations
)
heartbeats = self._populate_model(Heartbeat, data["heartbeats"])
self.heartbeats = self._inject_heartbeat_channels(heartbeats)

def _attach_cameras_to_locations(
self, cameras: list[Camera], locations: list[Location]
) -> list[Location]:
for location in locations:
camera_groups = location.camera_groups.values()
location_cams = chain(*camera_groups)
for cam_name in location_cams:
camera = find_first(cameras, "name", cam_name)
if camera:
location.cameras.append(camera)
return locations

def _populate_model(
self, cls: Type[BaseModel], data_dict: dict[str, list]
) -> list[Any]:
Expand Down
File renamed without changes.

0 comments on commit f332e2d

Please sign in to comment.