Skip to content

Commit

Permalink
run black formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
erezz-mov-ai committed Jul 9, 2023
1 parent f86dd5f commit 9d7e81f
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 56 deletions.
24 changes: 13 additions & 11 deletions backend/core/log_streamer/log_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
Developers:
- Erez Zomer ([email protected]) - 2023
"""
from aiohttp import web, WSMsgType
from logging import Logger
import uuid
import asyncio
import uuid
from logging import Logger

from aiohttp import WSMsgType, web
from movai_core_shared.logger import Log
from movai_core_shared.messages.log_data import LogRequest

Expand All @@ -22,6 +22,7 @@

QUEUE_SIZE = 100000


class LogClient:
def __init__(self, logger: Logger = None) -> None:
self._id = uuid.uuid4()
Expand Down Expand Up @@ -82,7 +83,7 @@ async def send_msg(self, request: LogRequest):
try:
log_msg = request.get_client_log_format()
await self._ws.send_json(log_msg)
except (ValueError ,RuntimeError, TypeError) as err:
except (ValueError, RuntimeError, TypeError) as err:
self.logger.error(err.__str__())

async def stream_msgs(self):
Expand All @@ -94,17 +95,18 @@ async def stream_msgs(self):
await self.send_msg(msg)

async def listen_to_client_msgs(self):
"""listens for client msgs and repond if necessary.
"""
"""listens for client msgs and repond if necessary."""
self._validate_socket()
async for msg in self._ws:
if msg.type == WSMsgType.TEXT:
if msg.data == 'close':
self._logger.debug("closing the websocket connection for client id: {self._id}")
if msg.data == "close":
self._logger.debug(
"closing the websocket connection for client id: {self._id}"
)
await self._ws.close()
elif msg.type == WSMsgType.ERROR:
self._logger.error(f"ws connection closed with exception {ws.exception()}")
self._logger.error(f"ws connection closed with exception {ws.exception()}")

async def run(self, request: web.Request):
"""Runs the client object in oreder to stream logs from backed to client.
Expand Down Expand Up @@ -141,4 +143,4 @@ def _validate_socket(self):
if self._ws is None:
raise TypeError("The websocket is not initialized for client {self._id}")
if not self.is_alive():
raise ConnectionError(f"The websocket for client {self.id} is closed!")
raise ConnectionError(f"The websocket for client {self.id} is closed!")
52 changes: 26 additions & 26 deletions backend/core/log_streamer/log_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@


class ParamFilter(ABC):
"""An abstract base class for various types of filter.
"""
"""An abstract base class for various types of filter."""

def __init__(self, name: str) -> None:
"""Ctor.
Expand Down Expand Up @@ -35,6 +35,7 @@ def name(self) -> str:
"""
return self._name


class StrParam(ParamFilter):
def __init__(self, name: str, value: str) -> None:
"""Ctor.
Expand All @@ -54,7 +55,7 @@ def __init__(self, name: str, value) -> None:
Args:
name (str): the name of the filter.
"""
"""
super().__init__(name)
if not isinstance(value, int):
raise ValueError("value must be an int")
Expand All @@ -67,7 +68,7 @@ def __init__(self, name: str, value) -> None:
Args:
name (str): the name of the filter.
"""
"""
super().__init__(name)
if isinstance(value, str):
self._value = [val.strip() for val in value.split(",")]
Expand All @@ -76,14 +77,14 @@ def __init__(self, name: str, value) -> None:


class RobotParam(ListParam):
"""A filter for robot name.
"""
"""A filter for robot name."""

def __init__(self, value) -> None:
"""Ctor.
Args:
name (str): the name of the filter.
"""
"""
super().__init__("robots", value)

def filter_msg(self, msg: LogRequest) -> bool:
Expand All @@ -95,12 +96,12 @@ def filter_msg(self, msg: LogRequest) -> bool:
Returns:
bool: True if log message can pass filter, False if not.
"""
return msg.req_data.log_tags.robot in self._value
return msg.req_data.log_tags.robot in self._value


class ServiceParam(ListParam):
"""A filter for service name.
"""
"""A filter for service name."""

def __init__(self, value) -> None:
"""Ctor.
Expand All @@ -117,13 +118,13 @@ def filter_msg(self, msg: LogRequest):
Returns:
bool: True if log message can pass filter, False if not.
"""
"""
return msg.req_data.log_tags.service in self._value


class LevelParam(ListParam):
"""A filter for level type.
"""
"""A filter for level type."""

def __init__(self, value) -> None:
"""Ctor.
Expand Down Expand Up @@ -166,8 +167,8 @@ def filter_msg(self, msg: LogRequest):


class FromDateParam(int):
"""A filter for issue time is later than specific value.
"""
"""A filter for issue time is later than specific value."""

def __init__(self, value) -> None:
"""Ctor.
Expand All @@ -186,11 +187,11 @@ def filter_msg(self, msg: LogRequest):
bool: True if log message can pass filter, False if not.
"""
return msg.created >= self._value


class ToDateParam(int):
"""A filter for issue time is before than a specific value.
"""
"""A filter for issue time is before than a specific value."""

def __init__(self, value) -> None:
"""Ctor.
Expand All @@ -212,28 +213,27 @@ def filter_msg(self, msg: LogRequest):


class LogFilter:
"""A class for filtering a log msg through several types of filters.
"""
"""A class for filtering a log msg through several types of filters."""

_filters_types = {
"robot": RobotParam,
"robots": RobotParam,
"service": ServiceParam,
"services": ServiceParam,
"level": LevelParam,
"levels": LevelParam,
"message": MessageParam
}
"message": MessageParam,
}

def __init__(self, **params):
"""Ctor
"""
"""Ctor"""
self._filters = []
for filter_name, filter_val in params.items():
filter_name = filter_name.lower()
if filter_name in self._filters_types and filter_val is not None:
filter = self._filters_types[filter_name](filter_val)
self._filters.append(filter)

def filter_msg(self, msg: LogRequest) -> bool:
"""Checks that a LogRequest msg can pass the registered filters.
Expand All @@ -246,4 +246,4 @@ def filter_msg(self, msg: LogRequest) -> bool:
for filter in self._filters:
if not filter.filter_msg(msg):
return False
return True
return True
23 changes: 14 additions & 9 deletions backend/core/log_streamer/logs_streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
Developers:
- Erez Zomer ([email protected]) - 2023
"""
from aiohttp import web
import uuid

from aiohttp import web
from movai_core_shared.core.zmq_server import ZMQServer
from movai_core_shared.envvars import LOG_STREAMER_BIND_ADDR
from movai_core_shared.logger import Log
Expand All @@ -22,14 +22,15 @@


class LogsStreamer(ZMQServer):

def __init__(self, debug: bool = False) -> None:
"""Initializes the object.
Args:
debug (bool, optional): if True, will show debug logs while running ZMQServer
"""
super().__init__(self.__class__.__name__, LOG_STREAMER_BIND_ADDR, new_loop=False, debug=debug)
super().__init__(
self.__class__.__name__, LOG_STREAMER_BIND_ADDR, new_loop=False, debug=debug
)
self._logger = Log.get_logger(self.__class__.__name__)
self._clients = {}

Expand All @@ -43,7 +44,7 @@ def is_client_registered(self, client_id: uuid.UUID) -> bool:
bool: True if registered, False otherwise.
"""
return client_id in self._clients

def register_client(self, client: LogClient) -> uuid.UUID:
"""Register the client in the LogStreamer, so whenever a new log will arive it
will be sent to this client if it pass the filter.
Expand All @@ -52,7 +53,9 @@ def register_client(self, client: LogClient) -> uuid.UUID:
client (LogClient): the client to register.
"""
if self.is_client_registered(client.id):
self._logger.debug(f"The client: {client.id} is already registered in {self.__class__.__name__}")
self._logger.debug(
f"The client: {client.id} is already registered in {self.__class__.__name__}"
)
return
self._clients[client.id] = client
self._logger.debug(f"The client: {client.id} has been added to {self.__class__.__name__}")
Expand Down Expand Up @@ -81,21 +84,23 @@ async def handle_request(self, request: dict) -> dict:
try:
log_msg = LogRequest(**request)
if self._debug:
self._logger.debug(f"{self.__class__.__name__}: {log_msg.req_data.log_fields.message}")
self._logger.debug(
f"{self.__class__.__name__}: {log_msg.req_data.log_fields.message}"
)
for client in self._clients.values():
if client.is_alive():
await client.push(log_msg)
else:
clients_to_remove.add(client)

for client in clients_to_remove:
self.unregister_client(client)

return {}
except Exception as error:
self._logger.error(str(error))
return {}

async def stream_logs(self, request: web.Request):
"""Stream logs from arriving from message-server to the client.
Expand All @@ -105,7 +110,7 @@ async def stream_logs(self, request: web.Request):
Returns:
web.WebSocketResponse: The websocket response to the client.
"""

if not self._running:
self.run()
client = LogClient()
Expand Down
17 changes: 7 additions & 10 deletions backend/endpoints/ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,18 @@
Module that implements websockets REST API module/plugin
"""
import aiohttp_cors
from typing import List, Union
from aiohttp import web

import aiohttp_cors
from aiohttp import web
from dal.classes.protocols.wsredissub import WSRedisSub

from gd_node.protocols.http.middleware import (redirect_not_found,
remove_flow_exposed_port_links,
save_node_type)
from gd_node.protocols.http.movai_widget import MovaiWidget
from gd_node.protocols.http.middleware import (
save_node_type,
remove_flow_exposed_port_links,
redirect_not_found,
)

from backend.http import IWebApp, WebAppManager
from backend.core.log_streamer.logs_streamer import LogsStreamer
from backend.http import IWebApp, WebAppManager


class WSApp(IWebApp):
Expand All @@ -46,7 +43,7 @@ def routes(self) -> List[web.RouteDef]:
return [
web.get("/widget/support", self.test_support),
web.get(self.redis_sub.http_endpoint, self.redis_sub.handler),
web.get(r"/logs", self.log_streamer.stream_logs)
web.get(r"/logs", self.log_streamer.stream_logs),
]

@property
Expand Down

0 comments on commit 9d7e81f

Please sign in to comment.