Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

celery: add liveness probe #4316

Merged
merged 1 commit into from
Sep 26, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 54 additions & 6 deletions inspirehep/celery.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of INSPIRE.
# Copyright (C) 2014-2017 CERN.
# Copyright (C) 2014-2023 CERN.
#
# INSPIRE is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand All @@ -28,30 +28,78 @@
import os
import signal

from celery import bootsteps
from celery.signals import worker_ready, worker_shutdown
from flask_celeryext import AppContextTask, create_celery_app
from psycopg2 import OperationalError as Psycopg2OperationalError
from sqlalchemy.exc import InvalidRequestError, OperationalError

from inspirehep.factory import create_app


HEARTBEAT_FILE = "/tmp/celery_live"
READINESS_FILE = "/tmp/celery_ready"

LOGGER = logging.getLogger(__name__)


class CeleryTask(AppContextTask):

def on_failure(self, exc, task_id, args, kwargs, einfo):
if isinstance(exc, (InvalidRequestError, OperationalError, Psycopg2OperationalError)):
LOGGER.exception('Shutting down celery process because of'.format(exc))
if isinstance(
exc, (InvalidRequestError, OperationalError, Psycopg2OperationalError)
):
LOGGER.exception("Shutting down celery process because of".format(exc))
try:
with open('/dev/termination-log', 'w') as term_log:
with open("/dev/termination-log", "w") as term_log:
term_log.write(str(exc))
finally:
os.kill(os.getppid(), signal.SIGTERM)


# adapted from https://github.com/celery/celery/issues/4079#issuecomment-1128954283
class LivenessProbe(bootsteps.StartStopStep):
requires = {"celery.worker.components:Timer"}

def __init__(self, worker, **kwargs):
self.requests = []
self.tref = None

def start(self, worker):
self.tref = worker.timer.call_repeatedly(
1.0,
self.update_heartbeat_file,
(worker,),
priority=10,
)

def stop(self, worker):
try:
os.remove(HEARTBEAT_FILE)
except OSError:
pass

def update_heartbeat_file(self, worker):
if not os.path.exists(READINESS_FILE):
open(HEARTBEAT_FILE, 'w').close()


@worker_ready.connect
def worker_ready(**_):
if not os.path.exists(READINESS_FILE):
open(READINESS_FILE, 'w').close()


@worker_shutdown.connect
def worker_shutdown(**_):
try:
os.remove(READINESS_FILE)
except OSError:
pass


celery = create_celery_app(create_app(LOGGING_SENTRY_CELERY=True))
celery.Task = CeleryTask
celery.steps["worker"].add(LivenessProbe)

# We don't want to log to Sentry backoff errors
logging.getLogger('backoff').propagate = 0
logging.getLogger("backoff").propagate = 0
Loading