diff --git a/gd_node/callback.py b/gd_node/callback.py index 40d9588..9106af9 100644 --- a/gd_node/callback.py +++ b/gd_node/callback.py @@ -12,34 +12,35 @@ import time from typing import Any from os import getenv +import re + from asyncio import CancelledError from movai_core_shared.logger import Log, LogAdapter from movai_core_shared.exceptions import DoesNotExist, TransitionException -# Imports from DAL - from dal.movaidb import MovaiDB -from dal.models.callback import Callback + +from dal.new_models.flow.container import Container +from dal.new_models.flow.nodeinst import NodeInst +from dal.new_models.callback import Callback +from dal.new_models.configuration import Configuration +from dal.new_models.message import Message +from dal.new_models.ports import Ports from dal.models.lock import Lock -from dal.models.container import Container -from dal.models.nodeinst import NodeInst -from dal.scopes.package import Package -from dal.models.ports import Ports +from dal.models.scopestree import scopes from dal.models.var import Var -from dal.models.scopestree import ScopesTree, scopes -from dal.scopes.configuration import Configuration +from dal.scopes.package import Package from dal.scopes.fleetrobot import FleetRobot -from dal.scopes.message import Message from dal.scopes.robot import Robot from dal.scopes.statemachine import StateMachine, SMVars + from gd_node.user import GD_User as gd try: - from movai_core_enterprise.message_client_handlers.alerts import Alerts from movai_core_enterprise.models.annotation import Annotation from movai_core_enterprise.models.graphicasset import GraphicAsset @@ -78,8 +79,11 @@ def __init__( self.node_name = _node_name self.port_name = _port_name self.updated_globals = {} - - self.callback = ScopesTree().from_path(_cb_name, scope="Callback") + if "Callback/" in _cb_name: + cb_name = re.search(r".*Callback/([^/]+)", _cb_name).group(1) + self.callback = Callback(cb_name) + else: + self.callback = Callback(_cb_name) self.compiled_code = compile(self.callback.Code, _cb_name, "exec") self.user = UserFunctions( @@ -90,16 +94,21 @@ def __init__( self.callback.Message, ) self.count = 0 - self._debug = eval(getenv("DEBUG_CB", "False")) + def debug_callback(self): + if self._debug and self.callback.name in self.debug_callbacks: + import debugpy + debugpy.listen(5678) + debugpy.wait_for_client() + debugpy.breakpoint() + def execute(self, msg: Any = None) -> None: """Executes the code Args: msg: Message received in the callback """ - self.user.globals.update({"msg": msg}) self.user.globals.update({"count": self.count}) globais = copy.copy(self.user.globals) @@ -123,7 +132,6 @@ def start(self, code, globais): t_init = time.perf_counter() if self._debug: import linecache - linecache.cache[self.name] = ( len(self.callback.Code), None, @@ -153,7 +161,7 @@ def __init__( _cb_name: str, _node_name: str, _port_name: str, - _libraries: list, + _libraries: dict, _message: str, _user="SUPER", ) -> None: @@ -164,19 +172,20 @@ def __init__( self.node_name = _node_name # self.globals['redis_sub'] = GD_Message('movai_msgs/redis_sub', _type='msg').get() - for lib in _libraries: - try: - mod = importlib.import_module(_libraries[lib].Module) + if _libraries: + for lib in _libraries: try: - self.globals[lib] = getattr(mod, _libraries[lib].Class) - except TypeError: # Class is not defined - self.globals[lib] = mod - except CancelledError: - raise CancelledError("cancelled task") - except (ImportError, AttributeError, LookupError): - raise ImportError( - f"Import {lib} in callback {_cb_name} of node {_node_name} was not found" - ) + mod = importlib.import_module(_libraries[lib].Module) + try: + self.globals[lib] = getattr(mod, _libraries[lib].Class) + except TypeError: # Class is not defined + self.globals[lib] = mod + except CancelledError as exc: + raise CancelledError("cancelled task") from exc + except (ImportError, AttributeError, LookupError) as exc: + raise ImportError( + f"Import {lib} in callback {_cb_name} of node {_node_name} was not found" + ) from exc if GD_Callback._robot is None: GD_Callback._robot = Robot() @@ -249,7 +258,8 @@ def __init__(self, sm_name: str): super().__init__(_sm_name=sm_id, _node_name=_node_name) if _user == "SUPER": - logger = Log.get_callback_logger("GD_Callback", self.node_name, self.cb_name) + log = Log.get_logger("GD_Callback") + logger = LogAdapter(log, node=self.node_name, callback=self.cb_name, runtime=True) self.globals.update( { "scopes": scopes, @@ -295,7 +305,7 @@ def user_print(self, *args): def run(self, cb_name, msg): """Run another callback from a callback""" - callback = scopes.from_path(cb_name, scope="Callback") + callback = Callback(cb_name) compiled_code = compile(callback.Code, cb_name, "exec") user = UserFunctions("", "", "", callback.Py3Lib, callback.Message) diff --git a/gd_node/message.py b/gd_node/message.py index 8b951d3..f4b8946 100644 --- a/gd_node/message.py +++ b/gd_node/message.py @@ -14,7 +14,7 @@ from movai_core_shared.envvars import ROS2_PATH -from dal.scopes.message import Message +from dal.new_models import Message ROS2_ONLY_ATTR = "SLOT_TYPES" # Check in future updates diff --git a/gd_node/node.py b/gd_node/node.py index d3b52ad..72d8473 100755 --- a/gd_node/node.py +++ b/gd_node/node.py @@ -16,6 +16,7 @@ import uvloop +from typing import Dict from movai_core_shared.logger import Log from movai_core_shared.consts import MOVAI_INIT @@ -23,6 +24,8 @@ from dal.movaidb import RedisClient from dal.models.scopestree import scopes, ScopePropertyNode from dal.models.var import Var +from dal.new_models import Ports +from dal.new_models.node import PortsInstValue, Node from gd_node.protocol import Iport, Oport, Transports @@ -125,7 +128,7 @@ async def init_transports( self, node_name: str, inst_name: str, - ports_templates: dict, + ports_templates: Dict[str, Ports], transports: dict, remaps: list, ): @@ -169,7 +172,8 @@ async def init_transports( # print("ROS1 Node %s registered successfully." % inst_name) async def init_oports( - self, inst_name: str, ports_templates: dict, ports_inst: dict, flow_name: str + self, inst_name: str, ports_templates: Dict[str, Ports], + ports_inst: Dict[str, PortsInstValue], flow_name: str ): """Init all the output ports @@ -180,12 +184,12 @@ async def init_oports( for ports in ports_inst: template = ports_templates[ports_inst[ports].Template] - for pout in ports_inst[ports].Out: - + for pout in ports_inst[ports].Out.model_dump(exclude_none=True): + outvalue = getattr(ports_inst[ports].Out, pout) transport = template.Out[pout].Transport protocol = template.Out[pout].Protocol - message = ports_inst[ports].Out[pout].Message - params = ports_inst[ports].Out[pout].Parameter or {} + message = outvalue.Message + params = outvalue.Parameter for param in params: params[param] = self.ports_params.get( @@ -224,13 +228,17 @@ async def init_iports( for ports in ports_inst: template = ports_templates[ports_inst[ports].Template] - for i in ports_inst[ports].In: + for i, v in ports_inst[ports].In: + if i == "in_": + i = "in" + if not v: + continue transport = template.In[i].Transport protocol = template.In[i].Protocol - message = ports_inst[ports].In[i].Message + message = v.Message # place_holder - callback = ports_inst[ports].In[i].Callback or self.__DEFAULT_CALLBACK__ - params = ports_inst[ports].In[i].Parameter or {} + callback = v.Callback or self.__DEFAULT_CALLBACK__ + params = v.Parameter for param in params: params[param] = self.ports_params.get( @@ -263,21 +271,14 @@ async def main(self, args, unknown) -> None: type(self).RUNNING = asyncio.Event() # connect databases await self.connect() - - # self.robot = Robot() GD_User.name = self.inst_name GD_User.template = self.node_name - - self.node = scopes.from_path(self.node_name, scope="Node") - - # set db client name - # await self.databases.db_global.client_setname(self.robot.RobotName + '_' + self.inst_name) - # await self.databases.db_slave.client_setname(self.inst_name) - # await self.databases.db_local.client_setname(self.inst_name) + self.node = Node(self.node_name) inst_params = {} if args.params: - parameters = args.params.split('"', 1)[1].rsplit('"', 1)[0] + p1 = args.params.split('"', 1)[1] + parameters = p1.rsplit('"', 1)[0] for param in parameters.split(";"): key, value = param.split(":=") try: @@ -290,7 +291,7 @@ async def main(self, args, unknown) -> None: # params are available all over the node as gd.params['name'] for param in self.node.Parameter: - value = inst_params.get(param, self.node["Parameter"][param]["Value"]) + value = inst_params.get(param, self.node.Parameter[param].Value) try: if isinstance(value, ScopePropertyNode): value = value.value @@ -306,7 +307,7 @@ async def main(self, args, unknown) -> None: node_ports = {} for ports in self.node.PortsInst: ports_name = self.node.PortsInst[ports].Template - node_ports[ports_name] = scopes.from_path(ports_name, scope="Ports") + node_ports[ports_name] = Ports(ports_name) # Transition message trans_msg = None @@ -321,13 +322,15 @@ async def main(self, args, unknown) -> None: ) # Then we start the oports - await self.init_oports(self.inst_name, node_ports, self.node["PortsInst"], self.flow_name) + await self.init_oports( + self.inst_name, node_ports, self.node.PortsInst, self.flow_name + ) # Init all the Iports await self.init_iports( self.inst_name, node_ports, - self.node["PortsInst"], + self.node.PortsInst, init=False, transition_data=trans_msg, ) @@ -337,9 +340,10 @@ async def main(self, args, unknown) -> None: await asyncio.sleep(0.2) # Then we run the initial callback - await self.init_iports(self.inst_name, node_ports, self.node["PortsInst"], init=True) + await self.init_iports( + self.inst_name, node_ports, self.node.PortsInst, init=True + ) if not GD_User.is_transitioning: - # And finally we enable the iports for iport in GD_User.iport: try: @@ -354,7 +358,9 @@ async def main(self, args, unknown) -> None: start_time = time.time() - TIME_0 - LOGGER.info('Full time to init the GD_Node "%s": %s' % (self.inst_name, start_time)) + LOGGER.info( + 'Full time to init the GD_Node "%s": %s' % (self.inst_name, start_time) + ) signal.signal(signal.SIGINT, CoreInterruptHandler) signal.signal(signal.SIGTERM, CoreInterruptHandler) diff --git a/gd_node/protocol.py b/gd_node/protocol.py index 679d260..1d527dd 100755 --- a/gd_node/protocol.py +++ b/gd_node/protocol.py @@ -12,7 +12,7 @@ """ from typing import Any -from dal.classes.protocols import redissub as RedisSub +from dal.classes.protocols.redissub import VarSubscriber import gd_node.protocols.http.http_route import gd_node.protocols.http.web_socket @@ -253,7 +253,7 @@ def shutdown(self): class IportVarSub: def __init__(self, **kwargs): name = kwargs["_port_name"] - gd.iport[name] = RedisSub.Var_Subscriber(**kwargs) + gd.iport[name] = VarSubscriber(**kwargs) class IportHttpRoute: diff --git a/gd_node/protocols/http/middleware.py b/gd_node/protocols/http/middleware.py index 9a3a681..3407727 100644 --- a/gd_node/protocols/http/middleware.py +++ b/gd_node/protocols/http/middleware.py @@ -1,17 +1,15 @@ -import asyncio import re import requests import bleach from typing import List, Union from aiohttp import web import urllib.parse -from urllib.parse import unquote from movai_core_shared.exceptions import InvalidToken, TokenExpired, TokenRevoked from movai_core_shared.logger import Log -from dal.scopes.flow import Flow -from dal.scopes.node import Node +from dal.new_models.flow import Flow +from dal.new_models.node import Node from dal.models.remoteuser import RemoteUser from dal.models.internaluser import InternalUser @@ -158,7 +156,7 @@ async def remove_flow_exposed_port_links(request, handler): if request.match_info.get("name"): try: - flow_obj = Flow(name=request.match_info.get("name")) + flow_obj = Flow(request.match_info.get("name")) old_flow_exposed_ports = {**flow_obj.ExposedPorts} except Exception as e: LOGGER.warning( @@ -181,11 +179,12 @@ async def remove_flow_exposed_port_links(request, handler): LOGGER.info(f"Deleted exposed ports result: {deleted_exposed_ports}") # Loop trough all deleted ports and delete Links associated to that exposed port + port_regex = re.compile(r"^.+/") for node in deleted_exposed_ports: for deleted_exposed_port in node.values(): node_inst_name = next(iter(deleted_exposed_port)) for port in deleted_exposed_port[node_inst_name]: - port_name = re.search(r"^.+/", port)[0][:-1] + port_name = port_regex.search(port)[0][:-1] flow_obj.delete_exposed_port_links(node_inst_name, port_name) if request.get("scope_delete"): diff --git a/gd_node/protocols/movai.py b/gd_node/protocols/movai.py index c827e67..a039010 100644 --- a/gd_node/protocols/movai.py +++ b/gd_node/protocols/movai.py @@ -330,17 +330,15 @@ def __init__( ): """Init""" super().__init__(_node_name, _port_name, _topic, _message, _callback, _update) - self.stack = _params.get("Namespace", "") - - self.loop = asyncio.get_event_loop() - self.loop.create_task(self.register_sub()) + asyncio.create_task(self.register_sub()) async def register_sub(self) -> None: """Subscribe to key.""" pattern = {"Var": {"context": {"ID": {self.stack + "_RX": {"Parameter": "**"}}}}} databases = await RedisClient().get_client() - await MovaiDB("local", loop=self.loop, databases=databases).subscribe_channel( + loop = asyncio.get_event_loop() + await MovaiDB("local", loop=loop, databases=databases).subscribe_channel( pattern, self.callback )