Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

queue size warning #61

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions discord_lumberjack/handlers/discord_handler.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from dataclasses import dataclass, field
import logging
import threading
import time
from typing import Any, Dict, Iterable, Mapping, Optional
import requests
from discord_lumberjack.message_creators import BasicMessageCreator, MessageCreator
from queue import Queue
from queue import PriorityQueue, Queue
import math

logger = logging.getLogger(__name__)

Expand All @@ -24,6 +26,7 @@ class DiscordHandler(logging.Handler):
level (int, optional): The level at which to log. Defaults to logging.NOTSET.
message_creator (MessageCreator, optional): An instance of MessageCreator or one of its subclasses that will be used to create the message to send from each log record. Defaults to one that sends messages in monospace.
http_headers (Mapping[str, Any], optional): A mapping of HTTP headers to send with the request. Defaults to an empty mapping.
queue_warning_size (int, optional): When the queue gets this full, a warning will be issued. Defaults to 100. Can be disabled with 0.
"""

def __init__(
Expand All @@ -33,18 +36,30 @@ def __init__(
message_creator: MessageCreator = None,
http_headers: Mapping[str, Any] = None,
flush_on_exit: bool = True,
queue_warning_size: int = 100,
) -> None:
super().__init__(level=level)
self.__url = url
self.__session = requests.Session()
self.__message_creator = message_creator or _default_message_creator
self.__session.headers.update(http_headers or {})
self.__queue: Queue[logging.LogRecord] = Queue()
self.__queue: Queue[logging.LogRecord] = PriorityQueue()
self.__consumer_thread = threading.Thread(
target=self.__consume, name="DiscordLumberjack", daemon=not flush_on_exit
)
self.__consumer_thread.start()
self.__sentinel = logging.LogRecord("", 0, "", 0, None, None, None)
self.__warning_record = logging.LogRecord(
"",
logging.WARNING,
__file__,
51,
f"The queue exceeded the specified size of {queue_warning_size}",
None,
None,
)
self.__queue_warning_size = queue_warning_size or math.inf
self.__issue_warning = True
if flush_on_exit:
threading.Thread(
target=self.__cleanup, name="DiscordLumberjackCleanup"
Expand All @@ -64,7 +79,13 @@ def emit(self, record: logging.LogRecord) -> None:
record (logging.LogRecord): The log record to send.
"""
logger.debug(f"Enqueuing message {_record_str(record)}")
self.__queue.put(record)
self.__queue.put((time.time(), time.time(), record))
if self.__issue_warning and self.__queue.qsize() > self.__queue_warning_size:
self.__queue.put((0, time.time(), self.__warning_record))
logger.warning(
f"The queue exceeded the specified size of {self.__queue_warning_size}"
)
self.__issue_warning = False

def transform_message(self, message: Dict[str, Any]) -> Dict[str, Any]:
"""Transform a message before sending it to Discord.
Expand Down Expand Up @@ -120,7 +141,7 @@ def __consume(self) -> None:
"""In an infinite loop, consume a log record from the queue, convert it to its message objects, and send them to Discord."""
while True:
try:
record = self.__queue.get()
_, _, record = self.__queue.get()
if record is self.__sentinel:
logger.debug("Consumer: Sentinel record received, exiting thread.")
return
Expand All @@ -135,6 +156,8 @@ def __consume(self) -> None:
self.handleError(record)
finally:
self.__queue.task_done()
if self.__queue.qsize() < self.queue_warning_size / 2:
self.__issue_warning = True
logger.debug(
f"Consumer: Finished processing message: {_record_str(record)}."
)
Expand Down