Skip to content

Commit

Permalink
celery: add liveness probe
Browse files Browse the repository at this point in the history
  • Loading branch information
MJedr committed Sep 26, 2023
1 parent 1831786 commit b26f733
Showing 1 changed file with 54 additions and 6 deletions.
60 changes: 54 additions & 6 deletions inspirehep/celery.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of INSPIRE.
# Copyright (C) 2014-2017 CERN.
# Copyright (C) 2014-2023 CERN.
#
# INSPIRE is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand All @@ -28,30 +28,78 @@
import os
import signal

from celery import bootsteps
from celery.signals import worker_ready, worker_shutdown
from flask_celeryext import AppContextTask, create_celery_app
from psycopg2 import OperationalError as Psycopg2OperationalError
from sqlalchemy.exc import InvalidRequestError, OperationalError

from inspirehep.factory import create_app


HEARTBEAT_FILE = "/tmp/celery_live"
READINESS_FILE = "/tmp/celery_ready"

LOGGER = logging.getLogger(__name__)


class CeleryTask(AppContextTask):

def on_failure(self, exc, task_id, args, kwargs, einfo):
if isinstance(exc, (InvalidRequestError, OperationalError, Psycopg2OperationalError)):
LOGGER.exception('Shutting down celery process because of'.format(exc))
if isinstance(
exc, (InvalidRequestError, OperationalError, Psycopg2OperationalError)
):
LOGGER.exception("Shutting down celery process because of".format(exc))
try:
with open('/dev/termination-log', 'w') as term_log:
with open("/dev/termination-log", "w") as term_log:
term_log.write(str(exc))
finally:
os.kill(os.getppid(), signal.SIGTERM)


# adapted from https://github.com/celery/celery/issues/4079#issuecomment-1128954283
class LivenessProbe(bootsteps.StartStopStep):
requires = {"celery.worker.components:Timer"}

def __init__(self, worker, **kwargs):
self.requests = []
self.tref = None

def start(self, worker):
self.tref = worker.timer.call_repeatedly(
1.0,
self.update_heartbeat_file,
(worker,),
priority=10,
)

def stop(self, worker):
try:
os.remove(HEARTBEAT_FILE)
except OSError:
pass

def update_heartbeat_file(self, worker):
if not os.path.exists(READINESS_FILE):
open(HEARTBEAT_FILE, 'w').close()


@worker_ready.connect
def worker_ready(**_):
if not os.path.exists(READINESS_FILE):
open(READINESS_FILE, 'w').close()


@worker_shutdown.connect
def worker_shutdown(**_):
try:
os.remove(READINESS_FILE)
except OSError:
pass


celery = create_celery_app(create_app(LOGGING_SENTRY_CELERY=True))
celery.Task = CeleryTask
celery.steps["worker"].add(LivenessProbe)

# We don't want to log to Sentry backoff errors
logging.getLogger('backoff').propagate = 0
logging.getLogger("backoff").propagate = 0

0 comments on commit b26f733

Please sign in to comment.