Skip to content

Commit

Permalink
Pydantic integration 2.4.1 (#95)
Browse files Browse the repository at this point in the history
* added node, callback

* fixed node ports usage

* fixed regex for callback

* removed cache since it's already implemented in object new

* used new models ports

* fixed imports

* added node, callback

* fixed node ports usage

* fixed regex for callback

* removed cache since it's already implemented in object new

* used new models ports

* fixed imports

* fixed Annotaion import

* used new classes

* added proper typing

* rename Var_Subscriber to VarSubscriber

* pydantic best practice

* adding debugpy

* fix asyncio syntax

* fix imports

* Parameter is pydantic field not a model

* code review fixes

---------

Co-authored-by: Moawiya Mograbi <[email protected]>
  • Loading branch information
erezz-mov-ai and Mograbi authored Jan 4, 2024
1 parent a6c464b commit 051a1c7
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 72 deletions.
72 changes: 41 additions & 31 deletions gd_node/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,35 @@
import time
from typing import Any
from os import getenv
import re

from asyncio import CancelledError

from movai_core_shared.logger import Log, LogAdapter
from movai_core_shared.exceptions import DoesNotExist, TransitionException

# Imports from DAL

from dal.movaidb import MovaiDB
from dal.models.callback import Callback

from dal.new_models.flow.container import Container
from dal.new_models.flow.nodeinst import NodeInst
from dal.new_models.callback import Callback
from dal.new_models.configuration import Configuration
from dal.new_models.message import Message
from dal.new_models.ports import Ports

from dal.models.lock import Lock
from dal.models.container import Container
from dal.models.nodeinst import NodeInst
from dal.scopes.package import Package
from dal.models.ports import Ports
from dal.models.scopestree import scopes
from dal.models.var import Var
from dal.models.scopestree import ScopesTree, scopes

from dal.scopes.configuration import Configuration
from dal.scopes.package import Package
from dal.scopes.fleetrobot import FleetRobot
from dal.scopes.message import Message
from dal.scopes.robot import Robot
from dal.scopes.statemachine import StateMachine, SMVars


from gd_node.user import GD_User as gd

try:

from movai_core_enterprise.message_client_handlers.alerts import Alerts
from movai_core_enterprise.models.annotation import Annotation
from movai_core_enterprise.models.graphicasset import GraphicAsset
Expand Down Expand Up @@ -78,8 +79,11 @@ def __init__(
self.node_name = _node_name
self.port_name = _port_name
self.updated_globals = {}

self.callback = ScopesTree().from_path(_cb_name, scope="Callback")
if "Callback/" in _cb_name:
cb_name = re.search(r".*Callback/([^/]+)", _cb_name).group(1)
self.callback = Callback(cb_name)
else:
self.callback = Callback(_cb_name)

self.compiled_code = compile(self.callback.Code, _cb_name, "exec")
self.user = UserFunctions(
Expand All @@ -90,16 +94,21 @@ def __init__(
self.callback.Message,
)
self.count = 0

self._debug = eval(getenv("DEBUG_CB", "False"))

def debug_callback(self):
if self._debug and self.callback.name in self.debug_callbacks:
import debugpy
debugpy.listen(5678)
debugpy.wait_for_client()
debugpy.breakpoint()

def execute(self, msg: Any = None) -> None:
"""Executes the code
Args:
msg: Message received in the callback
"""

self.user.globals.update({"msg": msg})
self.user.globals.update({"count": self.count})
globais = copy.copy(self.user.globals)
Expand All @@ -123,7 +132,6 @@ def start(self, code, globais):
t_init = time.perf_counter()
if self._debug:
import linecache

linecache.cache[self.name] = (
len(self.callback.Code),
None,
Expand Down Expand Up @@ -153,7 +161,7 @@ def __init__(
_cb_name: str,
_node_name: str,
_port_name: str,
_libraries: list,
_libraries: dict,
_message: str,
_user="SUPER",
) -> None:
Expand All @@ -164,19 +172,20 @@ def __init__(
self.node_name = _node_name
# self.globals['redis_sub'] = GD_Message('movai_msgs/redis_sub', _type='msg').get()

for lib in _libraries:
try:
mod = importlib.import_module(_libraries[lib].Module)
if _libraries:
for lib in _libraries:
try:
self.globals[lib] = getattr(mod, _libraries[lib].Class)
except TypeError: # Class is not defined
self.globals[lib] = mod
except CancelledError:
raise CancelledError("cancelled task")
except (ImportError, AttributeError, LookupError):
raise ImportError(
f"Import {lib} in callback {_cb_name} of node {_node_name} was not found"
)
mod = importlib.import_module(_libraries[lib].Module)
try:
self.globals[lib] = getattr(mod, _libraries[lib].Class)
except TypeError: # Class is not defined
self.globals[lib] = mod
except CancelledError as exc:
raise CancelledError("cancelled task") from exc
except (ImportError, AttributeError, LookupError) as exc:
raise ImportError(
f"Import {lib} in callback {_cb_name} of node {_node_name} was not found"
) from exc

if GD_Callback._robot is None:
GD_Callback._robot = Robot()
Expand Down Expand Up @@ -249,7 +258,8 @@ def __init__(self, sm_name: str):
super().__init__(_sm_name=sm_id, _node_name=_node_name)

if _user == "SUPER":
logger = Log.get_callback_logger("GD_Callback", self.node_name, self.cb_name)
log = Log.get_logger("GD_Callback")
logger = LogAdapter(log, node=self.node_name, callback=self.cb_name, runtime=True)
self.globals.update(
{
"scopes": scopes,
Expand Down Expand Up @@ -295,7 +305,7 @@ def user_print(self, *args):

def run(self, cb_name, msg):
"""Run another callback from a callback"""
callback = scopes.from_path(cb_name, scope="Callback")
callback = Callback(cb_name)
compiled_code = compile(callback.Code, cb_name, "exec")
user = UserFunctions("", "", "", callback.Py3Lib, callback.Message)

Expand Down
2 changes: 1 addition & 1 deletion gd_node/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from movai_core_shared.envvars import ROS2_PATH

from dal.scopes.message import Message
from dal.new_models import Message

ROS2_ONLY_ATTR = "SLOT_TYPES" # Check in future updates

Expand Down
60 changes: 33 additions & 27 deletions gd_node/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@

import uvloop

from typing import Dict
from movai_core_shared.logger import Log
from movai_core_shared.consts import MOVAI_INIT

# importing database profile automatically registers the database connections
from dal.movaidb import RedisClient
from dal.models.scopestree import scopes, ScopePropertyNode
from dal.models.var import Var
from dal.new_models import Ports
from dal.new_models.node import PortsInstValue, Node


from gd_node.protocol import Iport, Oport, Transports
Expand Down Expand Up @@ -125,7 +128,7 @@ async def init_transports(
self,
node_name: str,
inst_name: str,
ports_templates: dict,
ports_templates: Dict[str, Ports],
transports: dict,
remaps: list,
):
Expand Down Expand Up @@ -169,7 +172,8 @@ async def init_transports(
# print("ROS1 Node %s registered successfully." % inst_name)

async def init_oports(
self, inst_name: str, ports_templates: dict, ports_inst: dict, flow_name: str
self, inst_name: str, ports_templates: Dict[str, Ports],
ports_inst: Dict[str, PortsInstValue], flow_name: str
):
"""Init all the output ports
Expand All @@ -180,12 +184,12 @@ async def init_oports(

for ports in ports_inst:
template = ports_templates[ports_inst[ports].Template]
for pout in ports_inst[ports].Out:

for pout in ports_inst[ports].Out.model_dump(exclude_none=True):
outvalue = getattr(ports_inst[ports].Out, pout)
transport = template.Out[pout].Transport
protocol = template.Out[pout].Protocol
message = ports_inst[ports].Out[pout].Message
params = ports_inst[ports].Out[pout].Parameter or {}
message = outvalue.Message
params = outvalue.Parameter

for param in params:
params[param] = self.ports_params.get(
Expand Down Expand Up @@ -224,13 +228,17 @@ async def init_iports(
for ports in ports_inst:
template = ports_templates[ports_inst[ports].Template]

for i in ports_inst[ports].In:
for i, v in ports_inst[ports].In:
if i == "in_":
i = "in"
if not v:
continue
transport = template.In[i].Transport
protocol = template.In[i].Protocol
message = ports_inst[ports].In[i].Message
message = v.Message
# place_holder
callback = ports_inst[ports].In[i].Callback or self.__DEFAULT_CALLBACK__
params = ports_inst[ports].In[i].Parameter or {}
callback = v.Callback or self.__DEFAULT_CALLBACK__
params = v.Parameter

for param in params:
params[param] = self.ports_params.get(
Expand Down Expand Up @@ -263,21 +271,14 @@ async def main(self, args, unknown) -> None:
type(self).RUNNING = asyncio.Event()
# connect databases
await self.connect()

# self.robot = Robot()
GD_User.name = self.inst_name
GD_User.template = self.node_name

self.node = scopes.from_path(self.node_name, scope="Node")

# set db client name
# await self.databases.db_global.client_setname(self.robot.RobotName + '_' + self.inst_name)
# await self.databases.db_slave.client_setname(self.inst_name)
# await self.databases.db_local.client_setname(self.inst_name)
self.node = Node(self.node_name)

inst_params = {}
if args.params:
parameters = args.params.split('"', 1)[1].rsplit('"', 1)[0]
p1 = args.params.split('"', 1)[1]
parameters = p1.rsplit('"', 1)[0]
for param in parameters.split(";"):
key, value = param.split(":=")
try:
Expand All @@ -290,7 +291,7 @@ async def main(self, args, unknown) -> None:

# params are available all over the node as gd.params['name']
for param in self.node.Parameter:
value = inst_params.get(param, self.node["Parameter"][param]["Value"])
value = inst_params.get(param, self.node.Parameter[param].Value)
try:
if isinstance(value, ScopePropertyNode):
value = value.value
Expand All @@ -306,7 +307,7 @@ async def main(self, args, unknown) -> None:
node_ports = {}
for ports in self.node.PortsInst:
ports_name = self.node.PortsInst[ports].Template
node_ports[ports_name] = scopes.from_path(ports_name, scope="Ports")
node_ports[ports_name] = Ports(ports_name)

# Transition message
trans_msg = None
Expand All @@ -321,13 +322,15 @@ async def main(self, args, unknown) -> None:
)

# Then we start the oports
await self.init_oports(self.inst_name, node_ports, self.node["PortsInst"], self.flow_name)
await self.init_oports(
self.inst_name, node_ports, self.node.PortsInst, self.flow_name
)

# Init all the Iports
await self.init_iports(
self.inst_name,
node_ports,
self.node["PortsInst"],
self.node.PortsInst,
init=False,
transition_data=trans_msg,
)
Expand All @@ -337,9 +340,10 @@ async def main(self, args, unknown) -> None:
await asyncio.sleep(0.2)

# Then we run the initial callback
await self.init_iports(self.inst_name, node_ports, self.node["PortsInst"], init=True)
await self.init_iports(
self.inst_name, node_ports, self.node.PortsInst, init=True
)
if not GD_User.is_transitioning:

# And finally we enable the iports
for iport in GD_User.iport:
try:
Expand All @@ -354,7 +358,9 @@ async def main(self, args, unknown) -> None:

start_time = time.time() - TIME_0

LOGGER.info('Full time to init the GD_Node "%s": %s' % (self.inst_name, start_time))
LOGGER.info(
'Full time to init the GD_Node "%s": %s' % (self.inst_name, start_time)
)

signal.signal(signal.SIGINT, CoreInterruptHandler)
signal.signal(signal.SIGTERM, CoreInterruptHandler)
Expand Down
4 changes: 2 additions & 2 deletions gd_node/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"""
from typing import Any

from dal.classes.protocols import redissub as RedisSub
from dal.classes.protocols.redissub import VarSubscriber

import gd_node.protocols.http.http_route
import gd_node.protocols.http.web_socket
Expand Down Expand Up @@ -253,7 +253,7 @@ def shutdown(self):
class IportVarSub:
def __init__(self, **kwargs):
name = kwargs["_port_name"]
gd.iport[name] = RedisSub.Var_Subscriber(**kwargs)
gd.iport[name] = VarSubscriber(**kwargs)


class IportHttpRoute:
Expand Down
11 changes: 5 additions & 6 deletions gd_node/protocols/http/middleware.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
import asyncio
import re
import requests
import bleach
from typing import List, Union
from aiohttp import web
import urllib.parse
from urllib.parse import unquote

from movai_core_shared.exceptions import InvalidToken, TokenExpired, TokenRevoked
from movai_core_shared.logger import Log

from dal.scopes.flow import Flow
from dal.scopes.node import Node
from dal.new_models.flow import Flow
from dal.new_models.node import Node
from dal.models.remoteuser import RemoteUser
from dal.models.internaluser import InternalUser

Expand Down Expand Up @@ -158,7 +156,7 @@ async def remove_flow_exposed_port_links(request, handler):

if request.match_info.get("name"):
try:
flow_obj = Flow(name=request.match_info.get("name"))
flow_obj = Flow(request.match_info.get("name"))
old_flow_exposed_ports = {**flow_obj.ExposedPorts}
except Exception as e:
LOGGER.warning(
Expand All @@ -181,11 +179,12 @@ async def remove_flow_exposed_port_links(request, handler):
LOGGER.info(f"Deleted exposed ports result: {deleted_exposed_ports}")

# Loop trough all deleted ports and delete Links associated to that exposed port
port_regex = re.compile(r"^.+/")
for node in deleted_exposed_ports:
for deleted_exposed_port in node.values():
node_inst_name = next(iter(deleted_exposed_port))
for port in deleted_exposed_port[node_inst_name]:
port_name = re.search(r"^.+/", port)[0][:-1]
port_name = port_regex.search(port)[0][:-1]
flow_obj.delete_exposed_port_links(node_inst_name, port_name)

if request.get("scope_delete"):
Expand Down
Loading

0 comments on commit 051a1c7

Please sign in to comment.