From 49120c254c19ded210833a0ac9be9f7595165767 Mon Sep 17 00:00:00 2001 From: ugyballoons Date: Wed, 21 Aug 2024 15:39:37 +0100 Subject: [PATCH 1/8] WIP: CurrentPoller in debug --- .../ts/rubintv/background/currentpoller.py | 112 ++++++++++++------ python/lsst/ts/rubintv/handlers/websocket.py | 4 +- .../rubintv/handlers/websocket_notifiers.py | 17 +-- python/lsst/ts/rubintv/models/models.py | 1 + 4 files changed, 90 insertions(+), 44 deletions(-) diff --git a/python/lsst/ts/rubintv/background/currentpoller.py b/python/lsst/ts/rubintv/background/currentpoller.py index d59df99d..dd6d1f4b 100644 --- a/python/lsst/ts/rubintv/background/currentpoller.py +++ b/python/lsst/ts/rubintv/background/currentpoller.py @@ -12,7 +12,7 @@ NightReport, NightReportData, ) -from lsst.ts.rubintv.models.models import ServiceMessageTypes as Service +from lsst.ts.rubintv.models.models import ServiceMessageTypes as MessageType from lsst.ts.rubintv.models.models import get_current_day_obs from lsst.ts.rubintv.models.models_helpers import ( all_objects_to_events, @@ -33,12 +33,13 @@ class CurrentPoller: MIN_INTERVAL = 1 def __init__(self, locations: list[Location], test_mode: bool = False) -> None: - self._clients: dict[str, S3Client] = {} + self._s3clients: dict[str, S3Client] = {} self._objects: dict[str, list] = {} self._events: dict[str, list[Event]] = {} self._metadata: dict[str, dict] = {} self._table: dict[str, dict[int, dict[str, dict]]] = {} self._per_day: dict[str, dict[str, dict]] = {} + self._yester_pd_prefixes: dict[str, list[str]] = {} self._most_recent_events: dict[str, Event] = {} self._nr_metadata: dict[str, NightReportData] = {} self._night_reports: dict[str, NightReport] = {} @@ -49,7 +50,7 @@ def __init__(self, locations: list[Location], test_mode: bool = False) -> None: self.locations = locations self._current_day_obs = get_current_day_obs() for location in locations: - self._clients[location.name] = S3Client( + self._s3clients[location.name] = S3Client( location.profile_name, location.bucket_name ) @@ -63,21 +64,37 @@ async def clear_all_data(self) -> None: self._nr_metadata = {} self._night_reports = {} + async def check_for_empty_per_day_channels(self) -> None: + for location in self.locations: + # clear out yesterday's stash + self._yester_pd_prefixes[location.name] = [] + for camera in location.cameras: + if not camera.online: + continue + if not (chans := camera.pd_channels()): + continue + stored_per_day = await self.get_current_per_day_data( + location.name, camera + ) + missing_chans = [ + chan for chan in chans if chan.name not in stored_per_day + ] + loc_prefixes = self._yester_pd_prefixes[location.name] + for chan in missing_chans: + prefix = f"{camera.name}/{self._current_day_obs}/{chan.name}" + loc_prefixes.append(prefix) + async def poll_buckets_for_todays_data(self, test_day: str = "") -> None: while True: timer_start = time() try: - if self._current_day_obs != get_current_day_obs(): - logger.info( - "Day rolled over", - date_from=self._current_day_obs, - date_to=get_current_day_obs(), - ) - await self.clear_all_data() + # if self._current_day_obs != get_current_day_obs(): + await self.check_for_empty_per_day_channels() + await self.clear_all_data() day_obs = self._current_day_obs = get_current_day_obs() for location in self.locations: - client = self._clients[location.name] + client = self._s3clients[location.name] for camera in location.cameras: if not camera.online: continue @@ -87,7 +104,7 @@ async def poll_buckets_for_todays_data(self, test_day: str = "") -> None: prefix = f"{camera.name}/{test_day}" objects = await client.async_list_objects(prefix) - loc_cam = await self._get_loc_cam(location.name, camera) + loc_cam = self._get_loc_cam(location.name, camera) objects = await self.sieve_out_metadata( objects, prefix, location, camera @@ -96,6 +113,9 @@ async def poll_buckets_for_todays_data(self, test_day: str = "") -> None: objects, location, camera ) await self.process_channel_objects(objects, loc_cam, camera) + + await self.poll_for_yesterdays_stragglers(location) + self.completed_first_poll = True if self.test_mode: @@ -111,6 +131,23 @@ async def poll_buckets_for_todays_data(self, test_day: str = "") -> None: except Exception: logger.exception("Caught exception during poll for data") + async def poll_for_yesterdays_stragglers(self, location: Location) -> None: + client = self._s3clients[location.name] + found = [] + for prefix in self._yester_pd_prefixes.get(location.name, []): + objects = await client.async_list_objects(prefix) + if objects: + found.append(prefix) + events = await objects_to_events(objects) + pd_data = {e.channel_name: e.__dict__ for e in events} + cam_name = prefix.split("/")[0] + loc_cam = f"{location.name}/{cam_name}" + await notify_ws_clients( + "camera", MessageType.CAMERA_PD_BACKDATED, loc_cam, pd_data + ) + for prefix in found: + self._yester_pd_prefixes[location.name].remove(prefix) + async def process_channel_objects( self, objects: list[dict[str, str]], loc_cam: str, camera: Camera ) -> None: @@ -124,7 +161,9 @@ async def process_channel_objects( pd_data = await self.make_per_day_data(camera, events) self._per_day[loc_cam] = pd_data - await notify_ws_clients("camera", Service.CAMERA_PER_DAY, loc_cam, pd_data) + await notify_ws_clients( + "camera", MessageType.CAMERA_PER_DAY, loc_cam, pd_data + ) table = await self.make_channel_table(camera, events) self._table[loc_cam] = table @@ -134,7 +173,7 @@ async def process_channel_objects( num_seqs=len(table), max_seq=max(table) if table else -1, ) - await notify_ws_clients("camera", Service.CAMERA_TABLE, loc_cam, table) + await notify_ws_clients("camera", MessageType.CAMERA_TABLE, loc_cam, table) async def update_channel_events( self, events: list[Event], loc_cam: str, camera: Camera @@ -155,7 +194,7 @@ async def update_channel_events( self._most_recent_events[chan_lookup] = current_event await notify_ws_clients( "channel", - Service.CHANNEL_EVENT, + MessageType.CHANNEL_EVENT, chan_lookup, current_event.__dict__, ) @@ -212,9 +251,9 @@ async def filter_camera_metadata_object( async def process_metadata_file( self, md_obj: dict[str, str], location: Location, camera: Camera ) -> None: - loc_cam = await self._get_loc_cam(location.name, camera) + loc_cam = self._get_loc_cam(location.name, camera) md_key = md_obj["key"] - client = self._clients[location.name] + client = self._s3clients[location.name] data = await client.async_get_object(md_key) if data and (loc_cam not in self._metadata or data != self._metadata[loc_cam]): self._metadata[loc_cam] = data @@ -227,21 +266,21 @@ async def process_metadata_file( c for c in location.cameras if c.metadata_from == camera.name ) for cam in to_notify: - loc_cam = await self._get_loc_cam(location.name, cam) + loc_cam = self._get_loc_cam(location.name, cam) await notify_ws_clients( - "camera", Service.CAMERA_METADATA, loc_cam, data + "camera", MessageType.CAMERA_METADATA, loc_cam, data ) async def sieve_out_night_reports( self, objects: list[dict[str, str]], location: Location, camera: Camera ) -> list[dict[str, str]]: - loc_cam = await self._get_loc_cam(location.name, camera) + loc_cam = self._get_loc_cam(location.name, camera) report_objs, objects = await self.filter_night_report_objects(objects) if report_objs: if not self.night_report_exists(location.name, camera.name): await notify_ws_clients( "camera", - Service.CAMERA_PER_DAY, + MessageType.CAMERA_PER_DAY, loc_cam, {"nightReportExists": True}, ) @@ -258,7 +297,7 @@ async def filter_night_report_objects( async def process_night_report_objects( self, report_objs: list[dict[str, str]], location: Location, camera: Camera ) -> None: - loc_cam = await self._get_loc_cam(location.name, camera) + loc_cam = self._get_loc_cam(location.name, camera) prev_nr = await self.get_current_night_report(location.name, camera.name) night_report = NightReport() @@ -272,7 +311,7 @@ async def process_night_report_objects( loc_cam not in self._nr_metadata or self._nr_metadata[loc_cam] != metadata_file ): - client = self._clients[location.name] + client = self._s3clients[location.name] text = await client.async_get_object(metadata_file.key) night_report.text = text self._nr_metadata[loc_cam] = metadata_file @@ -286,7 +325,10 @@ async def process_night_report_objects( if prev_nr.text != night_report.text or prev_nr.plots != night_report.plots: await notify_ws_clients( - "nightreport", Service.NIGHT_REPORT, loc_cam, night_report.model_dump() + "nightreport", + MessageType.NIGHT_REPORT, + loc_cam, + night_report.model_dump(), ) self._night_reports[loc_cam] = night_report return @@ -313,26 +355,26 @@ async def make_channel_table( async def get_current_objects( self, location_name: str, camera: Camera ) -> list[dict[str, str]]: - loc_cam = await self._get_loc_cam(location_name, camera) + loc_cam = self._get_loc_cam(location_name, camera) return self._objects.get(loc_cam, []) async def get_current_events( self, location_name: str, camera: Camera ) -> list[Event]: - loc_cam = await self._get_loc_cam(location_name, camera) + loc_cam = self._get_loc_cam(location_name, camera) return self._events.get(loc_cam, []) async def get_current_channel_table( self, location_name: str, camera: Camera ) -> dict[int, dict[str, dict]]: - loc_cam = await self._get_loc_cam(location_name, camera) + loc_cam = self._get_loc_cam(location_name, camera) table = self._table.get(loc_cam, {}) return table async def get_current_per_day_data( self, location_name: str, camera: Camera ) -> dict[str, dict[str, dict]]: - loc_cam = await self._get_loc_cam(location_name, camera) + loc_cam = self._get_loc_cam(location_name, camera) events = self._per_day.get(loc_cam, {}) return {chan: event for chan, event in events.items()} @@ -358,7 +400,7 @@ async def get_current_night_report( night_report = self._night_reports.get(loc_cam, NightReport()) return night_report - async def _get_loc_cam(self, location_name: str, camera: Camera) -> str: + def _get_loc_cam(self, location_name: str, camera: Camera) -> str: """Return `f"{location_name}/{camera.name}"` Parameters @@ -401,24 +443,24 @@ async def get_latest_data( if channel_data := await self.get_current_channel_table( location.name, camera ): - yield Service.CAMERA_TABLE, channel_data + yield MessageType.CAMERA_TABLE, channel_data if metadata := await self.get_current_metadata(location.name, camera): - yield Service.CAMERA_METADATA, metadata + yield MessageType.CAMERA_METADATA, metadata if per_day := await self.get_current_per_day_data( location.name, camera ): - yield Service.CAMERA_PER_DAY, per_day + yield MessageType.CAMERA_PER_DAY, per_day if self.night_report_exists(location.name, camera.name): - yield Service.CAMERA_PER_DAY, {"nightReportLink": "current"} + yield MessageType.CAMERA_PER_DAY, {"nightReportLink": "current"} case "channel": if event := await self.get_current_channel_event( location.name, camera.name, channel_name ): - yield Service.CHANNEL_EVENT, event.__dict__ + yield MessageType.CHANNEL_EVENT, event.__dict__ case "nightreport": night_report = await self.get_current_night_report( @@ -427,4 +469,4 @@ async def get_latest_data( # Check for equality with empty NightReport (from # pydantic.BaseModel) if night_report != NightReport(): - yield Service.NIGHT_REPORT, night_report.model_dump() + yield MessageType.NIGHT_REPORT, night_report.model_dump() diff --git a/python/lsst/ts/rubintv/handlers/websocket.py b/python/lsst/ts/rubintv/handlers/websocket.py index f4087826..f268d244 100644 --- a/python/lsst/ts/rubintv/handlers/websocket.py +++ b/python/lsst/ts/rubintv/handlers/websocket.py @@ -14,7 +14,7 @@ websocket_to_client, ) from lsst.ts.rubintv.models.models import Camera, Location -from lsst.ts.rubintv.models.models import ServiceMessageTypes as Service +from lsst.ts.rubintv.models.models import ServiceMessageTypes as MessageType from lsst.ts.rubintv.models.models_helpers import find_first data_ws_router = APIRouter() @@ -77,7 +77,7 @@ async def data_websocket( historical_busy = await websocket.app.state.historical.is_busy() await websocket.send_json( { - "dataType": Service.HISTORICAL_STATUS.value, + "dataType": MessageType.HISTORICAL_STATUS.value, "payload": historical_busy, } ) diff --git a/python/lsst/ts/rubintv/handlers/websocket_notifiers.py b/python/lsst/ts/rubintv/handlers/websocket_notifiers.py index 649f35b2..ca8de9bc 100644 --- a/python/lsst/ts/rubintv/handlers/websocket_notifiers.py +++ b/python/lsst/ts/rubintv/handlers/websocket_notifiers.py @@ -10,18 +10,18 @@ services_clients, services_lock, ) -from lsst.ts.rubintv.models.models import ServiceMessageTypes as Service +from lsst.ts.rubintv.models.models import ServiceMessageTypes as MessageType from lsst.ts.rubintv.models.models import get_current_day_obs logger = rubintv_logger() async def notify_ws_clients( - service: str, kind: str, loc_cam: str, payload: Any + service: str, message_type: MessageType, loc_cam: str, payload: Any ) -> None: service_loc_cam_chan = " ".join([service, loc_cam]) to_notify = await get_clients_to_notify(service_loc_cam_chan) - await notify_clients(to_notify, kind, payload) + await notify_clients(to_notify, message_type, payload) async def notify_clients( @@ -42,14 +42,17 @@ async def notify_clients( async def send_notification( - websocket: WebSocket, service: Service, payload: Any + websocket: WebSocket, messageType: MessageType, payload: Any ) -> None: + datestamp = get_current_day_obs().isoformat() + if messageType is MessageType.CAMERA_PD_BACKDATED and payload: + datestamp = payload.values()[0].get("day_obs", datestamp) try: await websocket.send_json( { - "dataType": service.value, + "dataType": messageType.value, "payload": payload, - "datestamp": get_current_day_obs().isoformat(), + "datestamp": datestamp, } ) except Exception as e: @@ -66,7 +69,7 @@ async def get_clients_to_notify(service_cam_id: str) -> list[UUID]: async def notify_all_status_change(historical_busy: bool) -> None: - service = Service.HISTORICAL_STATUS + service = MessageType.HISTORICAL_STATUS key = service.value tasks = [] async with services_lock: diff --git a/python/lsst/ts/rubintv/models/models.py b/python/lsst/ts/rubintv/models/models.py index 1e283feb..8bc3dd22 100644 --- a/python/lsst/ts/rubintv/models/models.py +++ b/python/lsst/ts/rubintv/models/models.py @@ -357,5 +357,6 @@ class ServiceMessageTypes(Enum): CAMERA_TABLE: str = "channelData" CAMERA_METADATA: str = "metadata" CAMERA_PER_DAY: str = "perDay" + CAMERA_PD_BACKDATED: str = "perDayBackdated" NIGHT_REPORT: str = "nightReport" HISTORICAL_STATUS: str = "historicalStatus" From 65db67acefdf74034912c758914dda8636186c65 Mon Sep 17 00:00:00 2001 From: ugyballoons Date: Thu, 22 Aug 2024 14:24:38 +0100 Subject: [PATCH 2/8] Make minor naming changes and add logging --- .../lsst/ts/rubintv/background/currentpoller.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/python/lsst/ts/rubintv/background/currentpoller.py b/python/lsst/ts/rubintv/background/currentpoller.py index dd6d1f4b..2dfe0611 100644 --- a/python/lsst/ts/rubintv/background/currentpoller.py +++ b/python/lsst/ts/rubintv/background/currentpoller.py @@ -39,7 +39,7 @@ def __init__(self, locations: list[Location], test_mode: bool = False) -> None: self._metadata: dict[str, dict] = {} self._table: dict[str, dict[int, dict[str, dict]]] = {} self._per_day: dict[str, dict[str, dict]] = {} - self._yester_pd_prefixes: dict[str, list[str]] = {} + self._yesterday_prefixes: dict[str, list[str]] = {} self._most_recent_events: dict[str, Event] = {} self._nr_metadata: dict[str, NightReportData] = {} self._night_reports: dict[str, NightReport] = {} @@ -67,7 +67,7 @@ async def clear_all_data(self) -> None: async def check_for_empty_per_day_channels(self) -> None: for location in self.locations: # clear out yesterday's stash - self._yester_pd_prefixes[location.name] = [] + self._yesterday_prefixes[location.name] = [] for camera in location.cameras: if not camera.online: continue @@ -79,7 +79,7 @@ async def check_for_empty_per_day_channels(self) -> None: missing_chans = [ chan for chan in chans if chan.name not in stored_per_day ] - loc_prefixes = self._yester_pd_prefixes[location.name] + loc_prefixes = self._yesterday_prefixes[location.name] for chan in missing_chans: prefix = f"{camera.name}/{self._current_day_obs}/{chan.name}" loc_prefixes.append(prefix) @@ -114,7 +114,7 @@ async def poll_buckets_for_todays_data(self, test_day: str = "") -> None: ) await self.process_channel_objects(objects, loc_cam, camera) - await self.poll_for_yesterdays_stragglers(location) + await self.poll_for_yesterdays_per_day(location) self.completed_first_poll = True @@ -131,10 +131,10 @@ async def poll_buckets_for_todays_data(self, test_day: str = "") -> None: except Exception: logger.exception("Caught exception during poll for data") - async def poll_for_yesterdays_stragglers(self, location: Location) -> None: + async def poll_for_yesterdays_per_day(self, location: Location) -> None: client = self._s3clients[location.name] found = [] - for prefix in self._yester_pd_prefixes.get(location.name, []): + for prefix in self._yesterday_prefixes.get(location.name, []): objects = await client.async_list_objects(prefix) if objects: found.append(prefix) @@ -142,11 +142,14 @@ async def poll_for_yesterdays_stragglers(self, location: Location) -> None: pd_data = {e.channel_name: e.__dict__ for e in events} cam_name = prefix.split("/")[0] loc_cam = f"{location.name}/{cam_name}" + logger.info( + "Found yesterday's per day data:", loc_cam=loc_cam, pd_data=pd_data + ) await notify_ws_clients( "camera", MessageType.CAMERA_PD_BACKDATED, loc_cam, pd_data ) for prefix in found: - self._yester_pd_prefixes[location.name].remove(prefix) + self._yesterday_prefixes[location.name].remove(prefix) async def process_channel_objects( self, objects: list[dict[str, str]], loc_cam: str, camera: Camera From 979131d3e12039239171bb0ac765bd9707baa15e Mon Sep 17 00:00:00 2001 From: ugyballoons Date: Thu, 22 Aug 2024 14:35:35 +0100 Subject: [PATCH 3/8] Uncomment rollover check and check for today data found --- .../ts/rubintv/background/currentpoller.py | 38 +++++++++++-------- python/lsst/ts/rubintv/handlers/api.py | 2 +- tests/background/currentpoller_test.py | 6 +-- 3 files changed, 26 insertions(+), 20 deletions(-) diff --git a/python/lsst/ts/rubintv/background/currentpoller.py b/python/lsst/ts/rubintv/background/currentpoller.py index 2dfe0611..986f4070 100644 --- a/python/lsst/ts/rubintv/background/currentpoller.py +++ b/python/lsst/ts/rubintv/background/currentpoller.py @@ -54,7 +54,7 @@ def __init__(self, locations: list[Location], test_mode: bool = False) -> None: location.profile_name, location.bucket_name ) - async def clear_all_data(self) -> None: + async def clear_todays_data(self) -> None: self._objects = {} self._events = {} self._metadata = {} @@ -87,11 +87,13 @@ async def check_for_empty_per_day_channels(self) -> None: async def poll_buckets_for_todays_data(self, test_day: str = "") -> None: while True: timer_start = time() + data_for_today_found = False try: - # if self._current_day_obs != get_current_day_obs(): - await self.check_for_empty_per_day_channels() - await self.clear_all_data() - day_obs = self._current_day_obs = get_current_day_obs() + if self._current_day_obs != get_current_day_obs(): + await self.check_for_empty_per_day_channels() + await self.clear_todays_data() + day_obs = self._current_day_obs = get_current_day_obs() + data_for_today_found = False for location in self.locations: client = self._s3clients[location.name] @@ -104,17 +106,21 @@ async def poll_buckets_for_todays_data(self, test_day: str = "") -> None: prefix = f"{camera.name}/{test_day}" objects = await client.async_list_objects(prefix) - loc_cam = self._get_loc_cam(location.name, camera) - - objects = await self.sieve_out_metadata( - objects, prefix, location, camera - ) - objects = await self.sieve_out_night_reports( - objects, location, camera - ) - await self.process_channel_objects(objects, loc_cam, camera) - - await self.poll_for_yesterdays_per_day(location) + if objects: + data_for_today_found = True + loc_cam = self._get_loc_cam(location.name, camera) + objects = await self.sieve_out_metadata( + objects, prefix, location, camera + ) + objects = await self.sieve_out_night_reports( + objects, location, camera + ) + await self.process_channel_objects(objects, loc_cam, camera) + + # Only look for yesterday's missing per day data if nothing + # yet found for today. + if not data_for_today_found: + await self.poll_for_yesterdays_per_day(location) self.completed_first_poll = True diff --git a/python/lsst/ts/rubintv/handlers/api.py b/python/lsst/ts/rubintv/handlers/api.py index f7e25abf..8c80f2eb 100644 --- a/python/lsst/ts/rubintv/handlers/api.py +++ b/python/lsst/ts/rubintv/handlers/api.py @@ -32,7 +32,7 @@ async def historical_reset(request: Request) -> None: historical: HistoricalPoller = request.app.state.historical await historical.trigger_reload_everything() current: CurrentPoller = request.app.state.current_poller - await current.clear_all_data() + await current.clear_todays_data() @api_router.get("/{location_name}", response_model=Location) diff --git a/tests/background/currentpoller_test.py b/tests/background/currentpoller_test.py index 79f45b83..3dc5b7b7 100644 --- a/tests/background/currentpoller_test.py +++ b/tests/background/currentpoller_test.py @@ -93,7 +93,7 @@ async def test_clear_all_data(current_poller: CurrentPoller) -> None: assert current_poller.completed_first_poll is True assert current_poller._objects != {} - await current_poller.clear_all_data() + await current_poller.clear_todays_data() assert current_poller._objects == {} assert current_poller._events == {} assert current_poller._metadata == {} @@ -108,7 +108,7 @@ async def test_clear_all_data(current_poller: CurrentPoller) -> None: async def test_process_channel_objects( current_poller: CurrentPoller, rubin_data_mocker: RubinDataMocker ) -> None: - await current_poller.clear_all_data() + await current_poller.clear_todays_data() camera, location = await get_test_camera_and_location() loc_cam = f"{location.name}/{camera.name}" @@ -152,7 +152,7 @@ async def test_update_channel_events( loc_cam = f"{location.name}/{camera.name}" events = rubin_data_mocker.events[loc_cam] - await current_poller.clear_all_data() + await current_poller.clear_todays_data() assert current_poller._most_recent_events == {} loc_cam = f"{location.name}/{camera.name}" await current_poller.update_channel_events(events, loc_cam, camera) From af8a2f0c8aa7513bb53887dd2a431c7376ac07d2 Mon Sep 17 00:00:00 2001 From: ugyballoons Date: Fri, 23 Aug 2024 12:38:48 +0100 Subject: [PATCH 4/8] Fix indent and rename methods in tests --- python/lsst/ts/rubintv/background/currentpoller.py | 2 +- tests/background/currentpoller_test.py | 11 +++++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/python/lsst/ts/rubintv/background/currentpoller.py b/python/lsst/ts/rubintv/background/currentpoller.py index 986f4070..dfe3a762 100644 --- a/python/lsst/ts/rubintv/background/currentpoller.py +++ b/python/lsst/ts/rubintv/background/currentpoller.py @@ -92,8 +92,8 @@ async def poll_buckets_for_todays_data(self, test_day: str = "") -> None: if self._current_day_obs != get_current_day_obs(): await self.check_for_empty_per_day_channels() await self.clear_todays_data() - day_obs = self._current_day_obs = get_current_day_obs() data_for_today_found = False + day_obs = self._current_day_obs = get_current_day_obs() for location in self.locations: client = self._s3clients[location.name] diff --git a/tests/background/currentpoller_test.py b/tests/background/currentpoller_test.py index 3dc5b7b7..5e55f578 100644 --- a/tests/background/currentpoller_test.py +++ b/tests/background/currentpoller_test.py @@ -37,7 +37,7 @@ async def test_poll_buckets_for_todays_data( return_value="2024-03-28", ) as mock_day_obs, patch( - "lsst.ts.rubintv.background.currentpoller.CurrentPoller.clear_all_data", + "lsst.ts.rubintv.background.currentpoller.CurrentPoller.clear_todays_data", new_callable=AsyncMock, ), patch( @@ -87,7 +87,7 @@ async def test_poll_buckets_for_today_process_and_store_seq_events( @pytest.mark.asyncio -async def test_clear_all_data(current_poller: CurrentPoller) -> None: +async def test_clear_todays_data(current_poller: CurrentPoller) -> None: await current_poller.poll_buckets_for_todays_data() assert current_poller.completed_first_poll is True @@ -217,6 +217,13 @@ async def test_day_rollover( assert mock_day_obs +@pytest.mark.asyncio +async def test_pick_up_yesterdays_movie( + current_poller: CurrentPoller, rubin_data_mocker: RubinDataMocker +) -> None: + pass + + async def get_test_camera_and_location() -> tuple[Camera, Location]: location: Location = find_first(m.locations, "name", "base-usdf") # fake_auxtel has both 'streaming' and per-day channels From ecd0b5cf6080bcaff7f24d9541f8207c4a8b5fe8 Mon Sep 17 00:00:00 2001 From: ugyballoons Date: Tue, 27 Aug 2024 17:42:44 +0100 Subject: [PATCH 5/8] Develop testing --- .../ts/rubintv/background/currentpoller.py | 4 + tests/background/currentpoller_test.py | 62 +++++++++++---- tests/mockdata.py | 76 +++++++++++++------ 3 files changed, 105 insertions(+), 37 deletions(-) diff --git a/python/lsst/ts/rubintv/background/currentpoller.py b/python/lsst/ts/rubintv/background/currentpoller.py index dfe3a762..3cedd0a2 100644 --- a/python/lsst/ts/rubintv/background/currentpoller.py +++ b/python/lsst/ts/rubintv/background/currentpoller.py @@ -119,6 +119,10 @@ async def poll_buckets_for_todays_data(self, test_day: str = "") -> None: # Only look for yesterday's missing per day data if nothing # yet found for today. + + # TODO: This is too broad- check for data from each camera + # separately + if not data_for_today_found: await self.poll_for_yesterdays_per_day(location) diff --git a/tests/background/currentpoller_test.py b/tests/background/currentpoller_test.py index 5e55f578..850cd5f1 100644 --- a/tests/background/currentpoller_test.py +++ b/tests/background/currentpoller_test.py @@ -73,7 +73,8 @@ async def test_poll_buckets_for_today_process_and_store_seq_events( ) -> None: await current_poller.poll_buckets_for_todays_data() - mocked_objs_keys = rubin_data_mocker.seq_objs.keys() + mocked_objs_keys = rubin_data_mocker.events.keys() + print(mocked_objs_keys) # make sure the keys for the location/cameras match up current_keys = sorted([k for k in current_poller._events.keys()]) @@ -110,7 +111,7 @@ async def test_process_channel_objects( ) -> None: await current_poller.clear_todays_data() - camera, location = await get_test_camera_and_location() + camera, location = get_test_camera_and_location() loc_cam = f"{location.name}/{camera.name}" objects = rubin_data_mocker.seq_objs[loc_cam] @@ -148,7 +149,7 @@ async def test_update_channel_events( "lsst.ts.rubintv.background.currentpoller." "notify_ws_clients", ) as mock_notify_ws_clients, ): - camera, location = await get_test_camera_and_location() + camera, location = get_test_camera_and_location() loc_cam = f"{location.name}/{camera.name}" events = rubin_data_mocker.events[loc_cam] @@ -191,6 +192,8 @@ async def test_make_per_day_data( async def test_day_rollover( current_poller: CurrentPoller, rubin_data_mocker: RubinDataMocker ) -> None: + # TODO: This doesn't actually test for anything! + # Make it so it does. day_obs = get_current_day_obs() with ( patch( @@ -203,28 +206,61 @@ async def test_day_rollover( assert mock_day_obs assert current_poller.completed_first_poll is True - day_obs = day_obs + timedelta(days=1) - rubin_data_mocker.day_obs = day_obs - rubin_data_mocker.mock_up_data() + day_obs = day_obs + timedelta(days=1) + rubin_data_mocker.day_obs = day_obs + rubin_data_mocker.mock_up_data() + + await current_poller.poll_buckets_for_todays_data() + + assert mock_day_obs + + +@pytest.mark.asyncio +async def test_pick_up_yesterdays_movie( + current_poller: CurrentPoller, rubin_data_mocker: RubinDataMocker +) -> None: + camera, location = get_test_camera_and_location() + channel = camera.pd_channels()[0] + mocked = rubin_data_mocker + day_obs = get_current_day_obs() + with ( patch( "lsst.ts.rubintv.background.currentpoller.get_current_day_obs", return_value=day_obs.isoformat(), ) as mock_day_obs, ): + assert mock_day_obs await current_poller.poll_buckets_for_todays_data() + assert current_poller.completed_first_poll is True - assert mock_day_obs + # clear movie channel + mocked.delete_channel_events(location, camera, channel) + await current_poller.poll_buckets_for_todays_data() + print(current_poller._yesterday_prefixes) + # rollover day obs + yesterday = day_obs + day_obs = day_obs + timedelta(days=1) + # rubin_data_mocker.day_obs = day_obs -@pytest.mark.asyncio -async def test_pick_up_yesterdays_movie( - current_poller: CurrentPoller, rubin_data_mocker: RubinDataMocker -) -> None: - pass + with ( + patch( + "lsst.ts.rubintv.background.currentpoller.get_current_day_obs", + return_value=day_obs.isoformat(), + ) as mock_day_obs, + ): + await current_poller.poll_buckets_for_todays_data() + + # add movie data + mocked.add_seq_objs_for_channel(location, camera, channel, 3) + await current_poller.poll_buckets_for_todays_data() + + print(await current_poller.get_current_per_day_data(location.name, camera)) + print(yesterday) -async def get_test_camera_and_location() -> tuple[Camera, Location]: +def get_test_camera_and_location() -> tuple[Camera, Location]: location: Location = find_first(m.locations, "name", "base-usdf") # fake_auxtel has both 'streaming' and per-day channels camera: Camera = find_first(location.cameras, "name", "fake_auxtel") diff --git a/tests/mockdata.py b/tests/mockdata.py index b79bad37..da6602ce 100644 --- a/tests/mockdata.py +++ b/tests/mockdata.py @@ -48,9 +48,9 @@ def __init__( self.s3_required = s3_required self.day_obs = day_obs self.location_channels: dict[str, list[Channel]] = {} - self.seq_objs: dict[str, list[dict[str, str]]] = {} self.empty_channel: dict[str, str] = {} self.events: dict[str, list[Event]] = {} + self.seq_objs: dict[str, list[dict[str, str]]] = {} self.metadata: dict[str, dict[str, str]] = {} self.mock_up_data() @@ -78,7 +78,6 @@ def mock_up_data(self) -> None: for location in self._locations: loc_name = location.name - self.events[loc_name] = [] self.location_channels[loc_name] = [] groups = location.camera_groups.values() camera_names = list(chain(*groups)) @@ -97,7 +96,9 @@ def mock_up_data(self) -> None: metadata = self.add_camera_metadata(location, camera) self.metadata[loc_cam] = metadata - def add_seq_objs(self, location: Location, camera: Camera) -> None: + def add_seq_objs( + self, location: Location, camera: Camera, include_empty_channel: bool = True + ) -> None: """ Generate mock channel objects and an empty channel string for a given camera and location. @@ -117,44 +118,49 @@ def add_seq_objs(self, location: Location, camera: Camera) -> None: A tuple containing a list of mock channel dictionaries and an updated empty channel string. """ - - channel_data: list[dict[str, str]] = [] loc_cam = f"{location.name}/{camera.name}" - iterations = 8 empty_channel = "" - seq_chans = [chan.name for chan in camera.seq_channels()] - if seq_chans: - empty_channel = random.choice(seq_chans) + if include_empty_channel: + seq_chans = [chan.name for chan in camera.seq_channels()] + if seq_chans: + empty_channel = random.choice(seq_chans) for channel in camera.channels: - loc_cam_chan = f"{loc_cam}/{channel.name}" - start = self.last_seq.get(loc_cam_chan, self.FIRST_SEQ) - if empty_channel == channel.name: self.empty_channel[loc_cam] = empty_channel continue + self.add_seq_objs_for_channel(location, camera, channel, 2) + + def add_seq_objs_for_channel( + self, location: Location, camera: Camera, channel: Channel, num_objs: int + ) -> None: + channel_data: list[dict[str, str]] = [] + loc_cam = f"{location.name}/{camera.name}" + loc_cam_chan = f"{location.name}/{camera.name}/{channel.name}" + start = self.last_seq.get(loc_cam_chan, self.FIRST_SEQ) - for index in range(start, start + iterations): - seq_num = f"{index:06}" + for index in range(start, start + num_objs): + seq_num = f"{index:06}" - if channel.per_day and index == start + iterations - 1: - seq_num = random.choice((seq_num, "final")) + if channel.per_day and index == start + num_objs - 1: + seq_num = random.choice((seq_num, "final")) - event_obj = self.generate_event( - location.bucket_name, camera.name, channel.name, seq_num - ) + event_obj = self.generate_event( + location.bucket_name, camera.name, channel.name, seq_num + ) - channel_data.append(event_obj) - self.last_seq[loc_cam_chan] = index + channel_data.append(event_obj) + self.last_seq[loc_cam_chan] = index # store the objects for testing against - if loc_cam in self.seq_objs: + event = self.dicts_to_events(channel_data) + if loc_cam in self.events: + self.events[loc_cam].extend(event) self.seq_objs[loc_cam].extend(channel_data) - self.events[loc_cam].extend(self.dicts_to_events(channel_data)) else: + self.events[loc_cam] = event self.seq_objs[loc_cam] = channel_data - self.events[loc_cam] = self.dicts_to_events(channel_data) def generate_event( self, bucket_name: str, camera_name: str, channel_name: str, seq_num: str @@ -173,6 +179,28 @@ def generate_event( hash = str(random.getrandbits(128)) return {"key": key, "hash": hash} + def delete_channel_events( + self, location: Location, camera: Camera, channel: Channel + ) -> None: + bucket_name = location.bucket_name + loc_cam = f"{location.name}/{camera.name}" + loc_cam_chan = f"{loc_cam}/{channel.name}" + events = self.events.get(loc_cam, []) + chan_evs = [ev for ev in events if ev.channel_name == channel.name] + + for ev in chan_evs: + self.events[loc_cam].remove(ev) + if self.s3_required: + res = self.s3_client.delete_object(Bucket=bucket_name, Key=ev.key) + print(f"Attempted to delete object: {ev.key}, result is: {res}") + + if loc_cam_chan in self.last_seq: + del self.last_seq[loc_cam_chan] + if self.empty_channel[loc_cam] == channel.name: + del self.empty_channel[loc_cam] + if channel in self.location_channels[location.name]: + self.location_channels[location.name].remove(channel) + def dicts_to_events(self, channel_dicts: list[dict[str, str]]) -> list[Event]: """ Convert a list of dictionaries to a list of Event objects. From cd8bc7287b93a50808f18c3755129f55954a5f35 Mon Sep 17 00:00:00 2001 From: ugyballoons Date: Wed, 28 Aug 2024 16:17:57 +0100 Subject: [PATCH 6/8] Finish test for yesterday collection --- .../ts/rubintv/background/currentpoller.py | 24 +++++++------- tests/background/currentpoller_test.py | 32 ++++++++++++++----- tests/mockdata.py | 18 +++++------ 3 files changed, 45 insertions(+), 29 deletions(-) diff --git a/python/lsst/ts/rubintv/background/currentpoller.py b/python/lsst/ts/rubintv/background/currentpoller.py index 3cedd0a2..e3e06f81 100644 --- a/python/lsst/ts/rubintv/background/currentpoller.py +++ b/python/lsst/ts/rubintv/background/currentpoller.py @@ -87,12 +87,10 @@ async def check_for_empty_per_day_channels(self) -> None: async def poll_buckets_for_todays_data(self, test_day: str = "") -> None: while True: timer_start = time() - data_for_today_found = False try: if self._current_day_obs != get_current_day_obs(): await self.check_for_empty_per_day_channels() await self.clear_todays_data() - data_for_today_found = False day_obs = self._current_day_obs = get_current_day_obs() for location in self.locations: @@ -107,7 +105,6 @@ async def poll_buckets_for_todays_data(self, test_day: str = "") -> None: objects = await client.async_list_objects(prefix) if objects: - data_for_today_found = True loc_cam = self._get_loc_cam(location.name, camera) objects = await self.sieve_out_metadata( objects, prefix, location, camera @@ -117,14 +114,7 @@ async def poll_buckets_for_todays_data(self, test_day: str = "") -> None: ) await self.process_channel_objects(objects, loc_cam, camera) - # Only look for yesterday's missing per day data if nothing - # yet found for today. - - # TODO: This is too broad- check for data from each camera - # separately - - if not data_for_today_found: - await self.poll_for_yesterdays_per_day(location) + await self.poll_for_yesterdays_per_day(location) self.completed_first_poll = True @@ -148,7 +138,7 @@ async def poll_for_yesterdays_per_day(self, location: Location) -> None: objects = await client.async_list_objects(prefix) if objects: found.append(prefix) - events = await objects_to_events(objects) + events = await all_objects_to_events(objects) pd_data = {e.channel_name: e.__dict__ for e in events} cam_name = prefix.split("/")[0] loc_cam = f"{location.name}/{cam_name}" @@ -188,6 +178,16 @@ async def process_channel_objects( ) await notify_ws_clients("camera", MessageType.CAMERA_TABLE, loc_cam, table) + # clear all relevant prefixes from the store looking for + # yesterday's per day updates + + loc = loc_cam.split("/")[0] + prefixes = self._yesterday_prefixes.get(loc, []) + new_prefixes = [ + prefix for prefix in prefixes if not prefix.startswith(camera.name) + ] + self._yesterday_prefixes[loc] = new_prefixes + async def update_channel_events( self, events: list[Event], loc_cam: str, camera: Camera ) -> None: diff --git a/tests/background/currentpoller_test.py b/tests/background/currentpoller_test.py index 850cd5f1..dbb433a2 100644 --- a/tests/background/currentpoller_test.py +++ b/tests/background/currentpoller_test.py @@ -4,7 +4,12 @@ import pytest from lsst.ts.rubintv.background.currentpoller import CurrentPoller -from lsst.ts.rubintv.models.models import Camera, Location, get_current_day_obs +from lsst.ts.rubintv.models.models import ( + Camera, + Location, + ServiceMessageTypes, + get_current_day_obs, +) from lsst.ts.rubintv.models.models_helpers import find_first from lsst.ts.rubintv.models.models_init import ModelsInitiator @@ -227,7 +232,7 @@ async def test_pick_up_yesterdays_movie( with ( patch( "lsst.ts.rubintv.background.currentpoller.get_current_day_obs", - return_value=day_obs.isoformat(), + return_value=day_obs, ) as mock_day_obs, ): assert mock_day_obs @@ -238,26 +243,37 @@ async def test_pick_up_yesterdays_movie( mocked.delete_channel_events(location, camera, channel) await current_poller.poll_buckets_for_todays_data() - print(current_poller._yesterday_prefixes) # rollover day obs yesterday = day_obs day_obs = day_obs + timedelta(days=1) - # rubin_data_mocker.day_obs = day_obs with ( patch( "lsst.ts.rubintv.background.currentpoller.get_current_day_obs", - return_value=day_obs.isoformat(), + return_value=day_obs, ) as mock_day_obs, + patch( + "lsst.ts.rubintv.background.currentpoller.notify_ws_clients", + new_callable=AsyncMock, + ) as mock_notify, ): await current_poller.poll_buckets_for_todays_data() - # add movie data + # add movie data (arbitrary number of objs) mocked.add_seq_objs_for_channel(location, camera, channel, 3) await current_poller.poll_buckets_for_todays_data() - print(await current_poller.get_current_per_day_data(location.name, camera)) - print(yesterday) + # assert that notification was made with the new event + # from yesterday. + service_msg = ServiceMessageTypes.CAMERA_PD_BACKDATED + loc_cam = f"{location.name}/{camera.name}" + events = mocked.get_mocked_events(location, camera, channel) + assert events is not [] + last_event = max(events) + assert last_event + assert last_event.day_obs == yesterday.isoformat() + payload = {channel.name: last_event.__dict__} + mock_notify.assert_called_once_with("camera", service_msg, loc_cam, payload) def get_test_camera_and_location() -> tuple[Camera, Location]: diff --git a/tests/mockdata.py b/tests/mockdata.py index da6602ce..159090b4 100644 --- a/tests/mockdata.py +++ b/tests/mockdata.py @@ -218,9 +218,11 @@ def dicts_to_events(self, channel_dicts: list[dict[str, str]]) -> list[Event]: events = [Event(**cd) for cd in channel_dicts] return events - async def get_mocked_seq_events(self, location: Location) -> list[Event]: + def get_mocked_events( + self, location: Location, camera: Camera, channel: Channel + ) -> list[Event]: """ - Asynchronously retrieve sequence events for a given location. + Retrieve events for a given location. Parameters ---------- @@ -232,13 +234,11 @@ async def get_mocked_seq_events(self, location: Location) -> list[Event]: list[Event] A list of Event objects representing sequence events. """ - events = self.events.get(location.name) - if events is None: - return [] - channels = self.location_channels[location.name] - seq_chan_names = [c for c in channels if not c.per_day] - seq_chan_events = [e for e in events if e.channel_name in seq_chan_names] - return seq_chan_events + loc_cam = f"{location.name}/{camera.name}" + events = [ + e for e in self.events.get(loc_cam, []) if e.channel_name in channel.name + ] + return events def add_camera_metadata(self, location: Location, camera: Camera) -> dict[str, str]: """ From d80ec017e986a34f7bd7296c1d8198ca17bbeceb Mon Sep 17 00:00:00 2001 From: ugyballoons Date: Wed, 28 Aug 2024 16:31:27 +0100 Subject: [PATCH 7/8] Fix rollover test --- tests/background/currentpoller_test.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/tests/background/currentpoller_test.py b/tests/background/currentpoller_test.py index dbb433a2..3caffaf1 100644 --- a/tests/background/currentpoller_test.py +++ b/tests/background/currentpoller_test.py @@ -151,7 +151,7 @@ async def test_update_channel_events( ) -> None: with ( patch( - "lsst.ts.rubintv.background.currentpoller." "notify_ws_clients", + "lsst.ts.rubintv.background.currentpoller.notify_ws_clients", ) as mock_notify_ws_clients, ): camera, location = get_test_camera_and_location() @@ -197,8 +197,6 @@ async def test_make_per_day_data( async def test_day_rollover( current_poller: CurrentPoller, rubin_data_mocker: RubinDataMocker ) -> None: - # TODO: This doesn't actually test for anything! - # Make it so it does. day_obs = get_current_day_obs() with ( patch( @@ -214,10 +212,19 @@ async def test_day_rollover( day_obs = day_obs + timedelta(days=1) rubin_data_mocker.day_obs = day_obs rubin_data_mocker.mock_up_data() - + with ( + patch( + "lsst.ts.rubintv.background.currentpoller.get_current_day_obs", + return_value=day_obs.isoformat(), + ) as mock_day_obs, + patch( + "lsst.ts.rubintv.background.currentpoller.CurrentPoller.clear_todays_data", + new_callable=AsyncMock, + ) as mock_clear_data, + ): await current_poller.poll_buckets_for_todays_data() - assert mock_day_obs + mock_clear_data.assert_called_once() @pytest.mark.asyncio From 4e12f892e850a6cddfe7d5570b868be726734dcb Mon Sep 17 00:00:00 2001 From: ugyballoons Date: Tue, 3 Sep 2024 16:27:30 +0100 Subject: [PATCH 8/8] Add docstrings --- .../ts/rubintv/background/currentpoller.py | 19 +++++++++++++++++++ tests/background/currentpoller_test.py | 1 - 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/python/lsst/ts/rubintv/background/currentpoller.py b/python/lsst/ts/rubintv/background/currentpoller.py index e3e06f81..ea97e2a6 100644 --- a/python/lsst/ts/rubintv/background/currentpoller.py +++ b/python/lsst/ts/rubintv/background/currentpoller.py @@ -65,6 +65,11 @@ async def clear_todays_data(self) -> None: self._night_reports = {} async def check_for_empty_per_day_channels(self) -> None: + """Creates a store of channel prefixes for per-day data that's not + been received over the course of a day's polling. The prefixes use the + date that's been rolled over from, i.e. yesterday's date, to continue + to look for that data into the new day. + """ for location in self.locations: # clear out yesterday's stash self._yesterday_prefixes[location.name] = [] @@ -132,6 +137,20 @@ async def poll_buckets_for_todays_data(self, test_day: str = "") -> None: logger.exception("Caught exception during poll for data") async def poll_for_yesterdays_per_day(self, location: Location) -> None: + """Uses the store of prefixes for yesterday's missing per-day data to + poll for new objects that have maybe been delayed in processing (this + will mainly be movies) and didn't appear in the bucket before the day + rolled over. + Multiple objects will be ignored except for the most recent. + If an object is found in the bucket, the current page is notified. + Note: This does not effect historical pages. + + + Parameters + ---------- + location : Location + A given location. + """ client = self._s3clients[location.name] found = [] for prefix in self._yesterday_prefixes.get(location.name, []): diff --git a/tests/background/currentpoller_test.py b/tests/background/currentpoller_test.py index 3caffaf1..da130b51 100644 --- a/tests/background/currentpoller_test.py +++ b/tests/background/currentpoller_test.py @@ -79,7 +79,6 @@ async def test_poll_buckets_for_today_process_and_store_seq_events( await current_poller.poll_buckets_for_todays_data() mocked_objs_keys = rubin_data_mocker.events.keys() - print(mocked_objs_keys) # make sure the keys for the location/cameras match up current_keys = sorted([k for k in current_poller._events.keys()])