diff --git a/discord_lumberjack/handlers/discord_handler.py b/discord_lumberjack/handlers/discord_handler.py index 55a12bc..75dc1b8 100644 --- a/discord_lumberjack/handlers/discord_handler.py +++ b/discord_lumberjack/handlers/discord_handler.py @@ -1,10 +1,12 @@ +from dataclasses import dataclass, field import logging import threading import time from typing import Any, Dict, Iterable, Mapping, Optional import requests from discord_lumberjack.message_creators import BasicMessageCreator, MessageCreator -from queue import Queue +from queue import PriorityQueue, Queue +import math logger = logging.getLogger(__name__) @@ -24,6 +26,7 @@ class DiscordHandler(logging.Handler): level (int, optional): The level at which to log. Defaults to logging.NOTSET. message_creator (MessageCreator, optional): An instance of MessageCreator or one of its subclasses that will be used to create the message to send from each log record. Defaults to one that sends messages in monospace. http_headers (Mapping[str, Any], optional): A mapping of HTTP headers to send with the request. Defaults to an empty mapping. + queue_warning_size (int, optional): When the queue gets this full, a warning will be issued. Defaults to 100. Can be disabled with 0. """ def __init__( @@ -33,18 +36,30 @@ def __init__( message_creator: MessageCreator = None, http_headers: Mapping[str, Any] = None, flush_on_exit: bool = True, + queue_warning_size: int = 100, ) -> None: super().__init__(level=level) self.__url = url self.__session = requests.Session() self.__message_creator = message_creator or _default_message_creator self.__session.headers.update(http_headers or {}) - self.__queue: Queue[logging.LogRecord] = Queue() + self.__queue: Queue[logging.LogRecord] = PriorityQueue() self.__consumer_thread = threading.Thread( target=self.__consume, name="DiscordLumberjack", daemon=not flush_on_exit ) self.__consumer_thread.start() self.__sentinel = logging.LogRecord("", 0, "", 0, None, None, None) + self.__warning_record = logging.LogRecord( + "", + logging.WARNING, + __file__, + 51, + f"The queue exceeded the specified size of {queue_warning_size}", + None, + None, + ) + self.__queue_warning_size = queue_warning_size or math.inf + self.__issue_warning = True if flush_on_exit: threading.Thread( target=self.__cleanup, name="DiscordLumberjackCleanup" @@ -64,7 +79,13 @@ def emit(self, record: logging.LogRecord) -> None: record (logging.LogRecord): The log record to send. """ logger.debug(f"Enqueuing message {_record_str(record)}") - self.__queue.put(record) + self.__queue.put((time.time(), time.time(), record)) + if self.__issue_warning and self.__queue.qsize() > self.__queue_warning_size: + self.__queue.put((0, time.time(), self.__warning_record)) + logger.warning( + f"The queue exceeded the specified size of {self.__queue_warning_size}" + ) + self.__issue_warning = False def transform_message(self, message: Dict[str, Any]) -> Dict[str, Any]: """Transform a message before sending it to Discord. @@ -120,7 +141,7 @@ def __consume(self) -> None: """In an infinite loop, consume a log record from the queue, convert it to its message objects, and send them to Discord.""" while True: try: - record = self.__queue.get() + _, _, record = self.__queue.get() if record is self.__sentinel: logger.debug("Consumer: Sentinel record received, exiting thread.") return @@ -135,6 +156,8 @@ def __consume(self) -> None: self.handleError(record) finally: self.__queue.task_done() + if self.__queue.qsize() < self.queue_warning_size / 2: + self.__issue_warning = True logger.debug( f"Consumer: Finished processing message: {_record_str(record)}." )