Skip to content

Commit

Permalink
Refactor JobScheduler reload method to use custom timeout
Browse files Browse the repository at this point in the history
The custom timeout is determined by the value of the RELOAD_MIN_TIMEOUT environment variable, which is set to 5 seconds by default. If the value is not an integer, an error is logged and the timeout defaults to 5 seconds. The reload method now uses the maximum of the custom timeout and twice the length of the SERVER_NAME list to set the timeout for the API call to "/reload".

Refactor ApiCaller send_files method to support response option

If the response parameter is set to True, the method returns a tuple containing the success status and the response dictionary. Otherwise, it returns only the success status. This change allows callers of the send_files method to access the response data if needed.

Update certbot-deploy script to use custom reload timeout
  • Loading branch information
TheophileDiot committed Oct 24, 2024
1 parent 61f8b83 commit 8cf6035
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 49 deletions.
10 changes: 9 additions & 1 deletion src/common/core/letsencrypt/jobs/certbot-deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@
instances = db.get_instances()
services = db.get_non_default_settings(global_only=True, methods=False, with_drafts=True, filtered_settings=("SERVER_NAME"))["SERVER_NAME"].split(" ")

reload_min_timeout = getenv("RELOAD_MIN_TIMEOUT", "5")

if not reload_min_timeout.isdigit():
LOGGER.error("RELOAD_MIN_TIMEOUT must be an integer, defaulting to 5")
reload_min_timeout = 5

reload_min_timeout = int(reload_min_timeout)

for instance in instances:
endpoint = f"http://{instance['hostname']}:{instance['port']}"
host = instance["server_name"]
Expand All @@ -53,7 +61,7 @@
LOGGER.info(
f"Successfully sent API request to {api.endpoint}/lets-encrypt/certificates",
)
sent, err, status, resp = api.request("POST", "/reload", timeout=max(5, 2 * len(services)))
sent, err, status, resp = api.request("POST", "/reload", timeout=max(reload_min_timeout, 2 * len(services)))
if not sent:
status = 1
LOGGER.error(f"Can't send API request to {api.endpoint}/reload : {err}")
Expand Down
7 changes: 5 additions & 2 deletions src/common/utils/ApiCaller.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,13 @@ def send_request(api, files):

return ret, responses

def send_files(self, path: str, url: str, timeout=(5, 10)) -> bool:
def send_files(self, path: str, url: str, timeout=(5, 10), response: bool = False) -> Union[bool, Tuple[bool, Optional[Dict[str, Any]]]]:
with BytesIO() as tgz:
with tar_open(mode="w:gz", fileobj=tgz, dereference=True, compresslevel=3) as tf:
tf.add(path, arcname=".")
tgz.seek(0, 0)
files = {"archive.tar.gz": tgz}
return self.send_to_apis("POST", url, files=files, timeout=timeout)[0]
ret = self.send_to_apis("POST", url, files=files, timeout=timeout, response=response)
if response:
return ret[0], ret[1]
return ret[0]
8 changes: 7 additions & 1 deletion src/scheduler/JobScheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,13 @@ def __str_to_schedule(self, every: str) -> Job:

def __reload(self) -> bool:
self.__logger.info("Reloading nginx...")
reload_success = self.send_to_apis("POST", "/reload", timeout=max(5, 2 * len(self.__env["SERVER_NAME"].split(" "))))[0]
reload_min_timeout = self.__env.get("RELOAD_MIN_TIMEOUT", "5")

if not reload_min_timeout.isdigit():
self.__logger.error("RELOAD_MIN_TIMEOUT must be an integer, defaulting to 5")
reload_min_timeout = 5

reload_success = self.send_to_apis("POST", "/reload", timeout=max(int(reload_min_timeout), 2 * len(self.__env["SERVER_NAME"].split(" "))))[0]
if reload_success:
self.__logger.info("Successfully reloaded nginx")
return True
Expand Down
117 changes: 72 additions & 45 deletions src/scheduler/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from subprocess import run as subprocess_run, DEVNULL, STDOUT
from sys import path as sys_path
from tarfile import TarFile, open as tar_open
from threading import Event, Thread
from threading import Event, Lock, Thread
from time import sleep
from traceback import format_exc
from typing import Any, Dict, List, Literal, Optional, Union
Expand All @@ -39,6 +39,7 @@
APPLYING_CHANGES = Event()
RUN = True
SCHEDULER: Optional[JobScheduler] = None
SCHEDULER_LOCK = Lock()

CACHE_PATH = Path(sep, "var", "cache", "bunkerweb")
CACHE_PATH.mkdir(parents=True, exist_ok=True)
Expand Down Expand Up @@ -93,6 +94,14 @@
HEALTHCHECK_EVENT = Event()
HEALTHCHECK_LOGGER = setup_logger("Scheduler.Healthcheck", getenv("CUSTOM_LOG_LEVEL", getenv("LOG_LEVEL", "INFO")))

RELOAD_MIN_TIMEOUT = getenv("RELOAD_MIN_TIMEOUT", "5")

if not RELOAD_MIN_TIMEOUT.isdigit():
LOGGER.error("RELOAD_MIN_TIMEOUT must be an integer, defaulting to 5")
RELOAD_MIN_TIMEOUT = 5

RELOAD_MIN_TIMEOUT = int(RELOAD_MIN_TIMEOUT)

SLAVE_MODE = getenv("SLAVE_MODE", "no") == "yes"
MASTER_MODE = getenv("MASTER_MODE", "no") == "yes"

Expand Down Expand Up @@ -154,39 +163,49 @@ def stop(status):
_exit(status)


def send_nginx_configs(sent_path: Path = CONFIG_PATH):
def send_file_to_bunkerweb(file_path: Path, endpoint: str):
assert SCHEDULER is not None, "SCHEDULER is not defined"
LOGGER.info(f"Sending {sent_path} folder ...")
ret = SCHEDULER.send_files(sent_path.as_posix(), "/confs")
if not ret:
LOGGER.error("Sending nginx configs failed, configuration will not work as expected...")


def send_nginx_cache(sent_path: Path = CACHE_PATH):
assert SCHEDULER is not None, "SCHEDULER is not defined"
LOGGER.info(f"Sending {sent_path} folder ...")
if not SCHEDULER.send_files(sent_path.as_posix(), "/cache"):
LOGGER.error(f"Error while sending {sent_path} folder")
else:
LOGGER.info(f"Successfully sent {sent_path} folder")
LOGGER.info(f"Sending {file_path} to BunkerWeb instances ...")
success, responses = SCHEDULER.send_files(file_path.as_posix(), endpoint, response=True)
fails = []

for db_instance in SCHEDULER.db.get_instances():
index = -1
with SCHEDULER_LOCK:
for i, api in enumerate(SCHEDULER.apis):
if api.endpoint == f"http://{db_instance['hostname']}:{db_instance['port']}/":
index = i
break

def send_nginx_custom_configs(sent_path: Path = CUSTOM_CONFIGS_PATH):
assert SCHEDULER is not None, "SCHEDULER is not defined"
LOGGER.info(f"Sending {sent_path} folder ...")
if not SCHEDULER.send_files(sent_path.as_posix(), "/custom_configs"):
LOGGER.error(f"Error while sending {sent_path} folder")
else:
LOGGER.info(f"Successfully sent {sent_path} folder")
status = responses.get(db_instance["hostname"], {"status": "down"}).get("status", "down")
if status == "success":
success = True
elif index != -1:
fails.append(db_instance["hostname"])

ret = SCHEDULER.db.update_instance(db_instance["hostname"], "up" if status == "success" else "down")
if ret:
LOGGER.error(f"Couldn't update instance {db_instance['hostname']} status to down in the database: {ret}")

if status == "success":
if index == -1:
with SCHEDULER_LOCK:
LOGGER.debug(f"Adding {db_instance['hostname']}:{db_instance['port']} to the list of reachable instances")
SCHEDULER.apis.append(API(f"http://{db_instance['hostname']}:{db_instance['port']}", db_instance["server_name"]))
continue

with SCHEDULER_LOCK:
if api.endpoint == f"http://{db_instance['hostname']}:{db_instance['port']}/":
LOGGER.debug(f"Removing {db_instance['hostname']}:{db_instance['port']} from the list of reachable instances")
del SCHEDULER.apis[index]

def send_nginx_external_plugins(sent_path: Path = EXTERNAL_PLUGINS_PATH):
assert SCHEDULER is not None, "SCHEDULER is not defined"
LOGGER.info(f"Sending {sent_path} folder ...")
if not SCHEDULER.send_files(sent_path.as_posix(), "/pro_plugins" if sent_path.as_posix().endswith("/pro/plugins") else "/plugins"):
LOGGER.error(f"Error while sending {sent_path} folder")
else:
LOGGER.info(f"Successfully sent {sent_path} folder")
if not success:
LOGGER.error(f"Error while sending {file_path} to BunkerWeb instances")
return
elif not fails:
LOGGER.info(f"Successfully sent {file_path} folder to reachable BunkerWeb instances")
return
LOGGER.warning(f"Error while sending {file_path} to some BunkerWeb instances, removing them from the list of reachable instances: {', '.join(fails)}")


def generate_custom_configs(configs: Optional[List[Dict[str, Any]]] = None, *, original_path: Union[Path, str] = CUSTOM_CONFIGS_PATH):
Expand Down Expand Up @@ -233,7 +252,7 @@ def generate_custom_configs(configs: Optional[List[Dict[str, Any]]] = None, *, o
)

if SCHEDULER and SCHEDULER.apis:
send_nginx_custom_configs(original_path)
send_file_to_bunkerweb(original_path, "/custom_configs")


def generate_external_plugins(original_path: Union[Path, str] = EXTERNAL_PLUGINS_PATH):
Expand Down Expand Up @@ -298,7 +317,7 @@ def generate_external_plugins(original_path: Union[Path, str] = EXTERNAL_PLUGINS

if SCHEDULER and SCHEDULER.apis:
LOGGER.info(f"Sending {'pro ' if pro else ''}external plugins to BunkerWeb")
send_nginx_external_plugins(original_path)
send_file_to_bunkerweb(original_path, "/pro_plugins" if original_path.as_posix().endswith("/pro/plugins") else "/plugins")


def generate_caches():
Expand Down Expand Up @@ -379,10 +398,10 @@ def run_in_slave_mode(): # TODO: Refactor this feature
env["TZ"] = tz

# Instantiate scheduler environment
SCHEDULER.env = env | {"LOG_LEVEL": getenv("CUSTOM_LOG_LEVEL", env.get("LOG_LEVEL", "notice"))}
SCHEDULER.env = env | {"LOG_LEVEL": getenv("CUSTOM_LOG_LEVEL", env.get("LOG_LEVEL", "notice")), "RELOAD_MIN_TIMEOUT": str(RELOAD_MIN_TIMEOUT)}

generate_custom_configs()
threads = [
Thread(target=generate_custom_configs),
Thread(target=generate_external_plugins),
Thread(target=generate_external_plugins, args=(PRO_PLUGINS_PATH,)),
Thread(target=generate_caches),
Expand Down Expand Up @@ -586,7 +605,7 @@ def healthcheck_job():
env["TZ"] = tz

# Instantiate scheduler environment
SCHEDULER.env = env | {"LOG_LEVEL": getenv("CUSTOM_LOG_LEVEL", env.get("LOG_LEVEL", "notice"))}
SCHEDULER.env = env | {"LOG_LEVEL": getenv("CUSTOM_LOG_LEVEL", env.get("LOG_LEVEL", "notice")), "RELOAD_MIN_TIMEOUT": str(RELOAD_MIN_TIMEOUT)}

threads = []

Expand Down Expand Up @@ -641,8 +660,6 @@ def check_configs_changes():

generate_custom_configs(SCHEDULER.db.get_custom_configs())

threads.append(Thread(target=check_configs_changes))

def check_plugin_changes(_type: Literal["external", "pro"] = "external"):
# Check if any external or pro plugin has been added by the user
assert SCHEDULER is not None, "SCHEDULER is not defined"
Expand Down Expand Up @@ -697,10 +714,11 @@ def check_plugin_changes(_type: Literal["external", "pro"] = "external"):
except BaseException as e:
LOGGER.error(f"Error while saving {_type} plugins to database: {e}")
else:
return send_nginx_external_plugins(plugin_path)
return send_file_to_bunkerweb(plugin_path, "/pro_plugins" if _type == "pro" else "/plugins")

generate_external_plugins(plugin_path)

check_configs_changes()
threads.extend([Thread(target=check_plugin_changes, args=("external",)), Thread(target=check_plugin_changes, args=("pro",))])

for thread in threads:
Expand Down Expand Up @@ -775,7 +793,13 @@ def check_plugin_changes(_type: Literal["external", "pro"] = "external"):
if RUN_JOBS_ONCE:
# Only run jobs once
if not SCHEDULER.reload(
env | {"TZ": getenv("TZ", "UTC"), "LOG_LEVEL": getenv("CUSTOM_LOG_LEVEL", env.get("LOG_LEVEL", "notice"))}, changed_plugins=changed_plugins
env
| {
"TZ": getenv("TZ", "UTC"),
"LOG_LEVEL": getenv("CUSTOM_LOG_LEVEL", env.get("LOG_LEVEL", "notice")),
"RELOAD_MIN_TIMEOUT": str(RELOAD_MIN_TIMEOUT),
},
changed_plugins=changed_plugins,
):
LOGGER.error("At least one job in run_once() failed")
else:
Expand Down Expand Up @@ -815,21 +839,23 @@ def check_plugin_changes(_type: Literal["external", "pro"] = "external"):

if SCHEDULER.apis:
# send nginx configs
threads.append(Thread(target=send_nginx_configs))
threads.append(Thread(target=send_file_to_bunkerweb, args=(CONFIG_PATH, "/confs")))
threads[-1].start()

try:
success = True
reachable = True
if SCHEDULER.apis:
# send cache
threads.append(Thread(target=send_nginx_cache))
threads.append(Thread(target=send_file_to_bunkerweb, args=(CACHE_PATH, "/cache")))
threads[-1].start()

for thread in threads:
thread.join()

success, responses = SCHEDULER.send_to_apis("POST", "/reload", timeout=max(5, 2 * len(env["SERVER_NAME"].split(" "))), response=True)
success, responses = SCHEDULER.send_to_apis(
"POST", "/reload", timeout=max(RELOAD_MIN_TIMEOUT, 2 * len(env["SERVER_NAME"].split(" "))), response=True
)
if not success:
reachable = False
LOGGER.debug(f"Error while reloading all bunkerweb instances: {responses}")
Expand All @@ -852,6 +878,7 @@ def check_plugin_changes(_type: Literal["external", "pro"] = "external"):
LOGGER.debug(f"Adding {db_instance['hostname']}:{db_instance['port']} to the list of reachable instances")
SCHEDULER.apis.append(API(f"http://{db_instance['hostname']}:{db_instance['port']}", db_instance["server_name"]))
continue

for i, api in enumerate(SCHEDULER.apis):
if api.endpoint == f"http://{db_instance['hostname']}:{db_instance['port']}/":
LOGGER.debug(f"Removing {db_instance['hostname']}:{db_instance['port']} from the list of reachable instances")
Expand Down Expand Up @@ -883,17 +910,17 @@ def check_plugin_changes(_type: Literal["external", "pro"] = "external"):
# Failover to last working configuration
if SCHEDULER.apis:
tmp_threads = [
Thread(target=send_nginx_configs, args=(FAILOVER_PATH.joinpath("config"),)),
Thread(target=send_nginx_cache, args=(FAILOVER_PATH.joinpath("cache"),)),
Thread(target=send_nginx_custom_configs, args=(FAILOVER_PATH.joinpath("custom_configs"),)),
Thread(target=send_file_to_bunkerweb, args=(FAILOVER_PATH.joinpath("config"), "/confs")),
Thread(target=send_file_to_bunkerweb, args=(FAILOVER_PATH.joinpath("cache"), "/cache")),
Thread(target=send_file_to_bunkerweb, args=(FAILOVER_PATH.joinpath("custom_configs"), "/custom_configs")),
]
for thread in tmp_threads:
thread.start()

for thread in tmp_threads:
thread.join()

if not SCHEDULER.send_to_apis("POST", "/reload", timeout=max(5, 2 * len(env["SERVER_NAME"].split(" "))))[0]:
if not SCHEDULER.send_to_apis("POST", "/reload", timeout=max(RELOAD_MIN_TIMEOUT, 2 * len(env["SERVER_NAME"].split(" "))))[0]:
LOGGER.error("Error while reloading bunkerweb with failover configuration, skipping ...")
elif not reachable:
LOGGER.warning("No BunkerWeb instance is reachable, skipping failover ...")
Expand Down

0 comments on commit 8cf6035

Please sign in to comment.