From 0acef0408a6f10116f412546d8554f1c823d1c2a Mon Sep 17 00:00:00 2001 From: Christian Stefanescu Date: Tue, 21 May 2024 14:45:11 +0200 Subject: [PATCH] Run periodic tasks in app context --- aleph/worker.py | 51 ++++++++++++++++++++++++++++++------------------- 1 file changed, 31 insertions(+), 20 deletions(-) diff --git a/aleph/worker.py b/aleph/worker.py index efa70c9f0..1d965ecf2 100644 --- a/aleph/worker.py +++ b/aleph/worker.py @@ -117,6 +117,7 @@ def __init__( self.indexing_batches = defaultdict(list) self.local_queue = queue.Queue() self.prefetch_count = prefetch_count + self.periodic_timer = threading.Timer(10, self.periodic) def on_message(self, channel, method, properties, body, args): connection = args[0] @@ -127,34 +128,44 @@ def on_message(self, channel, method, properties, body, args): task._channel = channel self.local_queue.put((task, channel, connection)) + def on_signal(self, signal, _): + log.debug("Cancelling periodic timer") + self.periodic_timer.cancel() + super().on_signal(signal, _) + def process(self, blocking=True): if blocking: + log.info( + f"Starting periodic timer (interval={self.periodic_timer.interval}s)" + ) + self.periodic_timer.start() with app.app_context(): self.process_blocking() else: self.process_nonblocking() def periodic(self): - try: - db.session.remove() - if self.often.check(): - self.often.update() - log.info("Self-check...") - compute_collections() - Dataset.cleanup_dataset_status(kv) - - if self.daily.check(): - self.daily.update() - log.info("Running daily tasks...") - update_roles() - check_alerts() - generate_digest() - delete_expired_exports() - delete_old_notifications() - - self.run_indexing_batches() - except Exception: - log.exception("Error while executing periodic tasks") + with app.app_context(): + try: + db.session.remove() + if self.often.check(): + self.often.update() + log.info("Self-check...") + compute_collections() + Dataset.cleanup_dataset_status(kv) + + if self.daily.check(): + self.daily.update() + log.info("Running daily tasks...") + update_roles() + check_alerts() + generate_digest() + delete_expired_exports() + delete_old_notifications() + + self.run_indexing_batches() + except Exception: + log.exception("Error while executing periodic tasks") def dispatch_task(self, task: Task) -> Task: log.info(