From 6cdf98e8951f30338ac5be04529e7a4b6a70f9d3 Mon Sep 17 00:00:00 2001 From: Egor Voynov Date: Fri, 2 Aug 2024 10:16:37 +0200 Subject: [PATCH] refactor monitoring clients --- pghoard/__init__.py | 1 - pghoard/basebackup/delta.py | 2 +- pghoard/mapping.py | 6 ---- pghoard/metrics.py | 48 +++++++++++++++++++------------ pghoard/monitoring/__init__.py | 10 +++---- pghoard/monitoring/base.py | 16 +++++++++++ pghoard/monitoring/prometheus.py | 5 +++- pghoard/monitoring/pushgateway.py | 5 +++- pghoard/monitoring/sentry.py | 17 ++++++----- pghoard/monitoring/statsd.py | 5 +++- 10 files changed, 71 insertions(+), 44 deletions(-) delete mode 100644 pghoard/mapping.py create mode 100644 pghoard/monitoring/base.py diff --git a/pghoard/__init__.py b/pghoard/__init__.py index 93b81844..6e7805dc 100644 --- a/pghoard/__init__.py +++ b/pghoard/__init__.py @@ -4,4 +4,3 @@ Copyright (c) 2016 Ohmu Ltd See LICENSE for details """ -from . import mapping, monitoring diff --git a/pghoard/basebackup/delta.py b/pghoard/basebackup/delta.py index 58e2b274..95b1eaf8 100644 --- a/pghoard/basebackup/delta.py +++ b/pghoard/basebackup/delta.py @@ -154,7 +154,7 @@ def _delta_upload_hexdigest( result_hash = hashlib.blake2s() def progress_callback(n_bytes: int = 1) -> None: - self.metrics.increase("pghoard.basebackup_bytes_uploaded", inc_value=n_bytes, tags={"delta": True}) + self.metrics.increase("pghoard.basebackup_bytes_uploaded", inc_value=n_bytes, tags={"delta": "True"}) with NamedTemporaryFile(dir=temp_dir, prefix=os.path.basename(chunk_path), suffix=".tmp") as raw_output_obj: raw_output_file = cast(FileLikeWithName, raw_output_obj) diff --git a/pghoard/mapping.py b/pghoard/mapping.py deleted file mode 100644 index 6fffa899..00000000 --- a/pghoard/mapping.py +++ /dev/null @@ -1,6 +0,0 @@ -clients = { - "statsd": ("pghoard.monitoring.statsd", "StatsClient"), - "pushgateway": ("pghoard.monitoring.pushgateway", "PushgatewayClient"), - "prometheus": ("pghoard.monitoring.prometheus", "PrometheusClient"), - "sentry": ("pghoard.monitoring.sentry", "SentryClient"), -} diff --git a/pghoard/metrics.py b/pghoard/metrics.py index db469c3e..9dc9ffb7 100644 --- a/pghoard/metrics.py +++ b/pghoard/metrics.py @@ -2,37 +2,47 @@ Interface for monitoring clients """ -import pghoard +import logging +from dataclasses import dataclass +from typing import Dict, Optional, Type +from pghoard.monitoring import (PrometheusClient, PushgatewayClient, SentryClient, StatsClient) +from pghoard.monitoring.base import MetricsClient + +LOG = logging.getLogger(__name__) -class Metrics: - def __init__(self, **configs): - self.clients = self._init_clients(configs) - def _init_clients(self, configs): - clients = {} +@dataclass() +class AvailableClient: + client_class: Type[MetricsClient] + config_key: str - if not isinstance(configs, dict): - return clients - map_client = pghoard.mapping.clients - for k, config in configs.items(): - if isinstance(config, dict) and k in map_client: - path, classname = map_client[k] - mod = __import__(path, fromlist=[classname]) - klass = getattr(mod, classname) - clients[k] = klass(config) +class Metrics: + available_clients = [ + AvailableClient(StatsClient, "statsd"), + AvailableClient(PrometheusClient, "prometheus"), + AvailableClient(PushgatewayClient, "pushgateway"), + AvailableClient(SentryClient, "sentry"), + ] + + def __init__(self, **configs): + self.clients = {} - return clients + for client_info in self.available_clients: + client_config = configs.get(client_info.config_key) + if isinstance(client_config, dict): + LOG.info("Initializing monitoring client %s", client_info.config_key) + self.clients[client_info.config_key] = client_info.client_class(client_config) - def gauge(self, metric, value, tags=None): + def gauge(self, metric: str, value: float, tags: Optional[Dict[str, str]] = None) -> None: for client in self.clients.values(): client.gauge(metric, value, tags) - def increase(self, metric, inc_value=1, tags=None): + def increase(self, metric: str, inc_value: int = 1, tags: Optional[Dict[str, str]] = None) -> None: for client in self.clients.values(): client.increase(metric, inc_value, tags) - def unexpected_exception(self, ex, where, tags=None): + def unexpected_exception(self, ex: Exception, where: str, tags: Optional[Dict[str, str]] = None) -> None: for client in self.clients.values(): client.unexpected_exception(ex, where, tags) diff --git a/pghoard/monitoring/__init__.py b/pghoard/monitoring/__init__.py index bb2ebc83..18cfcf7b 100644 --- a/pghoard/monitoring/__init__.py +++ b/pghoard/monitoring/__init__.py @@ -1,5 +1,5 @@ -import pkgutil - -__path__ = pkgutil.extend_path(__path__, __name__) # type: ignore -for importer, modname, ispkg in pkgutil.walk_packages(path=__path__, prefix=__name__ + "."): - __import__(modname) +# Copyright (c) 2024 Aiven, Helsinki, Finland. https://aiven.io/ +from .prometheus import PrometheusClient +from .pushgateway import PushgatewayClient +from .sentry import SentryClient +from .statsd import StatsClient diff --git a/pghoard/monitoring/base.py b/pghoard/monitoring/base.py new file mode 100644 index 00000000..f2be9b76 --- /dev/null +++ b/pghoard/monitoring/base.py @@ -0,0 +1,16 @@ +# Copyright (c) 2024 Aiven, Helsinki, Finland. https://aiven.io/ +from typing import Any, Dict, Optional + + +class MetricsClient: + def __init__(self, config: Dict[str, Any]): + self.config = config + + def gauge(self, metric: str, value: float, tags: Optional[Dict[str, str]] = None) -> None: + pass + + def increase(self, metric: str, inc_value: int = 1, tags: Optional[Dict[str, str]] = None) -> None: + pass + + def unexpected_exception(self, ex: Exception, where: str, tags: Optional[Dict[str, str]] = None) -> None: + pass diff --git a/pghoard/monitoring/prometheus.py b/pghoard/monitoring/prometheus.py index 6577067c..d0ab9f58 100644 --- a/pghoard/monitoring/prometheus.py +++ b/pghoard/monitoring/prometheus.py @@ -5,9 +5,12 @@ import time +from pghoard.monitoring.base import MetricsClient -class PrometheusClient: + +class PrometheusClient(MetricsClient): def __init__(self, config): + super().__init__(config) self._tags = config.get("tags", {}) self.metrics = {} diff --git a/pghoard/monitoring/pushgateway.py b/pghoard/monitoring/pushgateway.py index 13d6ec86..c74ae61f 100644 --- a/pghoard/monitoring/pushgateway.py +++ b/pghoard/monitoring/pushgateway.py @@ -6,9 +6,12 @@ import requests +from pghoard.monitoring.base import MetricsClient -class PushgatewayClient: + +class PushgatewayClient(MetricsClient): def __init__(self, config): + super().__init__(config) self._endpoint = config.get("endpoint", "") self._job = config.get("job", "pghoard") self._instance = config.get("instance", "") diff --git a/pghoard/monitoring/sentry.py b/pghoard/monitoring/sentry.py index 5e699d97..3df4005d 100644 --- a/pghoard/monitoring/sentry.py +++ b/pghoard/monitoring/sentry.py @@ -1,10 +1,15 @@ +# Copyright (c) 2024 Aiven, Helsinki, Finland. https://aiven.io/ import logging +from typing import Any, Dict, Optional + +from pghoard.monitoring.base import MetricsClient LOG = logging.getLogger(__name__) -class SentryClient: - def __init__(self, config): +class SentryClient(MetricsClient): + def __init__(self, config: Dict[str, Any]): + super().__init__(config) self.sentry = None if config is None: LOG.info("Sentry configuration not found, skipping setup") @@ -29,13 +34,7 @@ def __init__(self, config): for key, value in tags.items(): sentry_sdk.set_tag(key, value) - def gauge(self, metric, value, tags=None): - pass - - def increase(self, metric, inc_value=1, tags=None): - pass - - def unexpected_exception(self, ex, where, tags=None): + def unexpected_exception(self, ex: Exception, where: str, tags: Optional[Dict[str, str]] = None) -> None: if not self.sentry: return diff --git a/pghoard/monitoring/statsd.py b/pghoard/monitoring/statsd.py index 183e4580..a57cc38b 100644 --- a/pghoard/monitoring/statsd.py +++ b/pghoard/monitoring/statsd.py @@ -8,9 +8,12 @@ """ import socket +from pghoard.monitoring.base import MetricsClient -class StatsClient: + +class StatsClient(MetricsClient): def __init__(self, config): + super().__init__(config) self._dest_addr = (config.get("host", "127.0.0.1"), config.get("port", 8125)) self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._tags = config.get("tags", {})