Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-45331: Manage cases when yesterday's movie needs to be picked up immediately #210

Merged
merged 8 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 117 additions & 43 deletions python/lsst/ts/rubintv/background/currentpoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
NightReport,
NightReportData,
)
from lsst.ts.rubintv.models.models import ServiceMessageTypes as Service
from lsst.ts.rubintv.models.models import ServiceMessageTypes as MessageType
from lsst.ts.rubintv.models.models import get_current_day_obs
from lsst.ts.rubintv.models.models_helpers import (
all_objects_to_events,
Expand All @@ -33,12 +33,13 @@ class CurrentPoller:
MIN_INTERVAL = 1

def __init__(self, locations: list[Location], test_mode: bool = False) -> None:
self._clients: dict[str, S3Client] = {}
self._s3clients: dict[str, S3Client] = {}
self._objects: dict[str, list] = {}
self._events: dict[str, list[Event]] = {}
self._metadata: dict[str, dict] = {}
self._table: dict[str, dict[int, dict[str, dict]]] = {}
self._per_day: dict[str, dict[str, dict]] = {}
self._yesterday_prefixes: dict[str, list[str]] = {}
self._most_recent_events: dict[str, Event] = {}
self._nr_metadata: dict[str, NightReportData] = {}
self._night_reports: dict[str, NightReport] = {}
Expand All @@ -49,11 +50,11 @@ def __init__(self, locations: list[Location], test_mode: bool = False) -> None:
self.locations = locations
self._current_day_obs = get_current_day_obs()
for location in locations:
self._clients[location.name] = S3Client(
self._s3clients[location.name] = S3Client(
location.profile_name, location.bucket_name
)

async def clear_all_data(self) -> None:
async def clear_todays_data(self) -> None:
self._objects = {}
self._events = {}
self._metadata = {}
Expand All @@ -63,21 +64,42 @@ async def clear_all_data(self) -> None:
self._nr_metadata = {}
self._night_reports = {}

async def check_for_empty_per_day_channels(self) -> None:
ugyballoons marked this conversation as resolved.
Show resolved Hide resolved
"""Creates a store of channel prefixes for per-day data that's not
been received over the course of a day's polling. The prefixes use the
date that's been rolled over from, i.e. yesterday's date, to continue
to look for that data into the new day.
"""
for location in self.locations:
# clear out yesterday's stash
self._yesterday_prefixes[location.name] = []
for camera in location.cameras:
if not camera.online:
continue
if not (chans := camera.pd_channels()):
continue
stored_per_day = await self.get_current_per_day_data(
location.name, camera
)
missing_chans = [
chan for chan in chans if chan.name not in stored_per_day
]
loc_prefixes = self._yesterday_prefixes[location.name]
for chan in missing_chans:
prefix = f"{camera.name}/{self._current_day_obs}/{chan.name}"
loc_prefixes.append(prefix)

async def poll_buckets_for_todays_data(self, test_day: str = "") -> None:
while True:
timer_start = time()
try:
if self._current_day_obs != get_current_day_obs():
logger.info(
"Day rolled over",
date_from=self._current_day_obs,
date_to=get_current_day_obs(),
)
await self.clear_all_data()
await self.check_for_empty_per_day_channels()
await self.clear_todays_data()
day_obs = self._current_day_obs = get_current_day_obs()

for location in self.locations:
client = self._clients[location.name]
client = self._s3clients[location.name]
for camera in location.cameras:
if not camera.online:
continue
Expand All @@ -87,15 +109,18 @@ async def poll_buckets_for_todays_data(self, test_day: str = "") -> None:
prefix = f"{camera.name}/{test_day}"

objects = await client.async_list_objects(prefix)
loc_cam = await self._get_loc_cam(location.name, camera)

objects = await self.sieve_out_metadata(
objects, prefix, location, camera
)
objects = await self.sieve_out_night_reports(
objects, location, camera
)
await self.process_channel_objects(objects, loc_cam, camera)
if objects:
loc_cam = self._get_loc_cam(location.name, camera)
objects = await self.sieve_out_metadata(
objects, prefix, location, camera
)
objects = await self.sieve_out_night_reports(
objects, location, camera
)
await self.process_channel_objects(objects, loc_cam, camera)

await self.poll_for_yesterdays_per_day(location)

self.completed_first_poll = True

if self.test_mode:
Expand All @@ -111,6 +136,40 @@ async def poll_buckets_for_todays_data(self, test_day: str = "") -> None:
except Exception:
logger.exception("Caught exception during poll for data")

async def poll_for_yesterdays_per_day(self, location: Location) -> None:
ugyballoons marked this conversation as resolved.
Show resolved Hide resolved
"""Uses the store of prefixes for yesterday's missing per-day data to
poll for new objects that have maybe been delayed in processing (this
will mainly be movies) and didn't appear in the bucket before the day
rolled over.
Multiple objects will be ignored except for the most recent.
If an object is found in the bucket, the current page is notified.
Note: This does not effect historical pages.


Parameters
----------
location : Location
A given location.
"""
client = self._s3clients[location.name]
found = []
for prefix in self._yesterday_prefixes.get(location.name, []):
objects = await client.async_list_objects(prefix)
if objects:
found.append(prefix)
events = await all_objects_to_events(objects)
pd_data = {e.channel_name: e.__dict__ for e in events}
cam_name = prefix.split("/")[0]
loc_cam = f"{location.name}/{cam_name}"
logger.info(
"Found yesterday's per day data:", loc_cam=loc_cam, pd_data=pd_data
)
await notify_ws_clients(
"camera", MessageType.CAMERA_PD_BACKDATED, loc_cam, pd_data
)
for prefix in found:
self._yesterday_prefixes[location.name].remove(prefix)

async def process_channel_objects(
self, objects: list[dict[str, str]], loc_cam: str, camera: Camera
) -> None:
Expand All @@ -124,7 +183,9 @@ async def process_channel_objects(

pd_data = await self.make_per_day_data(camera, events)
self._per_day[loc_cam] = pd_data
await notify_ws_clients("camera", Service.CAMERA_PER_DAY, loc_cam, pd_data)
await notify_ws_clients(
"camera", MessageType.CAMERA_PER_DAY, loc_cam, pd_data
)

table = await self.make_channel_table(camera, events)
self._table[loc_cam] = table
Expand All @@ -134,7 +195,17 @@ async def process_channel_objects(
num_seqs=len(table),
max_seq=max(table) if table else -1,
)
await notify_ws_clients("camera", Service.CAMERA_TABLE, loc_cam, table)
await notify_ws_clients("camera", MessageType.CAMERA_TABLE, loc_cam, table)

# clear all relevant prefixes from the store looking for
# yesterday's per day updates

loc = loc_cam.split("/")[0]
prefixes = self._yesterday_prefixes.get(loc, [])
new_prefixes = [
prefix for prefix in prefixes if not prefix.startswith(camera.name)
]
self._yesterday_prefixes[loc] = new_prefixes

async def update_channel_events(
self, events: list[Event], loc_cam: str, camera: Camera
Expand All @@ -155,7 +226,7 @@ async def update_channel_events(
self._most_recent_events[chan_lookup] = current_event
await notify_ws_clients(
"channel",
Service.CHANNEL_EVENT,
MessageType.CHANNEL_EVENT,
chan_lookup,
current_event.__dict__,
)
Expand Down Expand Up @@ -212,9 +283,9 @@ async def filter_camera_metadata_object(
async def process_metadata_file(
self, md_obj: dict[str, str], location: Location, camera: Camera
) -> None:
loc_cam = await self._get_loc_cam(location.name, camera)
loc_cam = self._get_loc_cam(location.name, camera)
md_key = md_obj["key"]
client = self._clients[location.name]
client = self._s3clients[location.name]
data = await client.async_get_object(md_key)
if data and (loc_cam not in self._metadata or data != self._metadata[loc_cam]):
self._metadata[loc_cam] = data
Expand All @@ -227,21 +298,21 @@ async def process_metadata_file(
c for c in location.cameras if c.metadata_from == camera.name
)
for cam in to_notify:
loc_cam = await self._get_loc_cam(location.name, cam)
loc_cam = self._get_loc_cam(location.name, cam)
await notify_ws_clients(
"camera", Service.CAMERA_METADATA, loc_cam, data
"camera", MessageType.CAMERA_METADATA, loc_cam, data
)

async def sieve_out_night_reports(
self, objects: list[dict[str, str]], location: Location, camera: Camera
) -> list[dict[str, str]]:
loc_cam = await self._get_loc_cam(location.name, camera)
loc_cam = self._get_loc_cam(location.name, camera)
report_objs, objects = await self.filter_night_report_objects(objects)
if report_objs:
if not self.night_report_exists(location.name, camera.name):
await notify_ws_clients(
"camera",
Service.CAMERA_PER_DAY,
MessageType.CAMERA_PER_DAY,
loc_cam,
{"nightReportExists": True},
)
Expand All @@ -258,7 +329,7 @@ async def filter_night_report_objects(
async def process_night_report_objects(
self, report_objs: list[dict[str, str]], location: Location, camera: Camera
) -> None:
loc_cam = await self._get_loc_cam(location.name, camera)
loc_cam = self._get_loc_cam(location.name, camera)
prev_nr = await self.get_current_night_report(location.name, camera.name)
night_report = NightReport()

Expand All @@ -272,7 +343,7 @@ async def process_night_report_objects(
loc_cam not in self._nr_metadata
or self._nr_metadata[loc_cam] != metadata_file
):
client = self._clients[location.name]
client = self._s3clients[location.name]
text = await client.async_get_object(metadata_file.key)
night_report.text = text
self._nr_metadata[loc_cam] = metadata_file
Expand All @@ -286,7 +357,10 @@ async def process_night_report_objects(

if prev_nr.text != night_report.text or prev_nr.plots != night_report.plots:
await notify_ws_clients(
"nightreport", Service.NIGHT_REPORT, loc_cam, night_report.model_dump()
"nightreport",
MessageType.NIGHT_REPORT,
loc_cam,
night_report.model_dump(),
)
self._night_reports[loc_cam] = night_report
return
Expand All @@ -313,26 +387,26 @@ async def make_channel_table(
async def get_current_objects(
self, location_name: str, camera: Camera
) -> list[dict[str, str]]:
loc_cam = await self._get_loc_cam(location_name, camera)
loc_cam = self._get_loc_cam(location_name, camera)
return self._objects.get(loc_cam, [])

async def get_current_events(
self, location_name: str, camera: Camera
) -> list[Event]:
loc_cam = await self._get_loc_cam(location_name, camera)
loc_cam = self._get_loc_cam(location_name, camera)
return self._events.get(loc_cam, [])

async def get_current_channel_table(
self, location_name: str, camera: Camera
) -> dict[int, dict[str, dict]]:
loc_cam = await self._get_loc_cam(location_name, camera)
loc_cam = self._get_loc_cam(location_name, camera)
table = self._table.get(loc_cam, {})
return table

async def get_current_per_day_data(
self, location_name: str, camera: Camera
) -> dict[str, dict[str, dict]]:
loc_cam = await self._get_loc_cam(location_name, camera)
loc_cam = self._get_loc_cam(location_name, camera)
events = self._per_day.get(loc_cam, {})
return {chan: event for chan, event in events.items()}

Expand All @@ -358,7 +432,7 @@ async def get_current_night_report(
night_report = self._night_reports.get(loc_cam, NightReport())
return night_report

async def _get_loc_cam(self, location_name: str, camera: Camera) -> str:
def _get_loc_cam(self, location_name: str, camera: Camera) -> str:
"""Return `f"{location_name}/{camera.name}"`

Parameters
Expand Down Expand Up @@ -401,24 +475,24 @@ async def get_latest_data(
if channel_data := await self.get_current_channel_table(
location.name, camera
):
yield Service.CAMERA_TABLE, channel_data
yield MessageType.CAMERA_TABLE, channel_data

if metadata := await self.get_current_metadata(location.name, camera):
yield Service.CAMERA_METADATA, metadata
yield MessageType.CAMERA_METADATA, metadata

if per_day := await self.get_current_per_day_data(
location.name, camera
):
yield Service.CAMERA_PER_DAY, per_day
yield MessageType.CAMERA_PER_DAY, per_day

if self.night_report_exists(location.name, camera.name):
yield Service.CAMERA_PER_DAY, {"nightReportLink": "current"}
yield MessageType.CAMERA_PER_DAY, {"nightReportLink": "current"}

case "channel":
if event := await self.get_current_channel_event(
location.name, camera.name, channel_name
):
yield Service.CHANNEL_EVENT, event.__dict__
yield MessageType.CHANNEL_EVENT, event.__dict__

case "nightreport":
night_report = await self.get_current_night_report(
Expand All @@ -427,4 +501,4 @@ async def get_latest_data(
# Check for equality with empty NightReport (from
# pydantic.BaseModel)
if night_report != NightReport():
yield Service.NIGHT_REPORT, night_report.model_dump()
yield MessageType.NIGHT_REPORT, night_report.model_dump()
2 changes: 1 addition & 1 deletion python/lsst/ts/rubintv/handlers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async def historical_reset(request: Request) -> None:
historical: HistoricalPoller = request.app.state.historical
await historical.trigger_reload_everything()
current: CurrentPoller = request.app.state.current_poller
await current.clear_all_data()
await current.clear_todays_data()


@api_router.get("/{location_name}", response_model=Location)
Expand Down
4 changes: 2 additions & 2 deletions python/lsst/ts/rubintv/handlers/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
websocket_to_client,
)
from lsst.ts.rubintv.models.models import Camera, Location
from lsst.ts.rubintv.models.models import ServiceMessageTypes as Service
from lsst.ts.rubintv.models.models import ServiceMessageTypes as MessageType
from lsst.ts.rubintv.models.models_helpers import find_first

data_ws_router = APIRouter()
Expand Down Expand Up @@ -77,7 +77,7 @@ async def data_websocket(
historical_busy = await websocket.app.state.historical.is_busy()
await websocket.send_json(
{
"dataType": Service.HISTORICAL_STATUS.value,
"dataType": MessageType.HISTORICAL_STATUS.value,
"payload": historical_busy,
}
)
Expand Down
Loading