Skip to content

Commit

Permalink
BP-944: check node state before publishing (#84)
Browse files Browse the repository at this point in the history
* BP-944: check node state before publishing

* added gd running check for ros ports

* run black formatter
  • Loading branch information
Mograbi authored Oct 4, 2023
1 parent 9fff5fc commit 44aff17
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 25 deletions.
2 changes: 2 additions & 0 deletions gd_node/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ async def init_oports(
"message": message,
"_params": params,
"flow_name": flow_name,
"_gd_node": self,
}

Oport.create(key, **config)
Expand Down Expand Up @@ -252,6 +253,7 @@ async def init_iports(
"_params": params,
"_data": transition_data,
"_update": self.develop,
"_gd_node": self,
}

if (key == MOVAI_INIT) == init:
Expand Down
4 changes: 2 additions & 2 deletions gd_node/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,10 +376,10 @@ def shutdown(self):


class OportRos1Pub:
def __init__(self, name, topic, message, _params=None, **_ignore):
def __init__(self, name, topic, message, _params=None, _gd_node=None, **_ignore):
if _params is None:
_params = {}
self._instance = ROS1.ROS1_Publisher(topic, message, _params)
self._instance = ROS1.ROS1_Publisher(topic, message, _params, _gd_node)
gd.oport[name] = self._instance

def shutdown(self):
Expand Down
3 changes: 2 additions & 1 deletion gd_node/protocols/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def __init__(
_message: str,
_callback: str,
update: bool = True,
_gd_node: Any = None,
**_ignore
) -> None:

Expand All @@ -34,12 +35,12 @@ def __init__(
self.topic = _topic
self.enabled = False
self.start_enabled = True
self._gd_node = _gd_node
self.cb = Callback(_callback, self.node_name, self.port_name, update)

def callback(self, msg: Any) -> None:
"""Executes the callback if port is enabled"""
if self.enabled:
# print("iport trigger", self.port_name)
self.cb.execute(msg)

def unregister(self) -> None:
Expand Down
57 changes: 35 additions & 22 deletions gd_node/protocols/ros1.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,21 +115,24 @@ def rosnode_cleanup() -> None:

############################## IPORTS ###############################


class ROS1IportBase(BaseIport):
"""
A base class for all of the ROS IPORTS
"""

MAX_RETRIES = 100

def callback(self, msg: Any) -> None:
"""Callback function for all the ROS IPORTS"""
for _ in range(self.MAX_RETRIES):
if self.enabled:
if self._gd_node.RUNNING and self.enabled:
self.cb.execute(msg)
return
# if the port isn't initiated after 10 seconds it means it's disabled
rospy.timer.sleep(0.1)


class ROS1_Subscriber(ROS1IportBase):

"""ROS Subscriber class. Implementation of the rospy.Subscriber
Expand All @@ -152,15 +155,18 @@ def __init__(
_message: str,
_callback: str,
_update: bool,
_gd_node: Any = None,
**_
) -> None:
"""Init"""
super().__init__(_node_name, _port_name, _topic, _message, _callback, _update)
super().__init__(
_node_name, _port_name, _topic, _message, _callback, _update, _gd_node=_gd_node
)
self._gd_node = _gd_node

self.msg = GD_Message(_message, _type="msg").get()
self.sub = rospy.Subscriber(self.topic, self.msg, self.callback)


def unregister(self) -> None:
"""Unregisters the subscriber"""
super().unregister()
Expand Down Expand Up @@ -198,10 +204,13 @@ def __init__(
_message: str,
_callback: str,
_update: bool,
_gd_node: Any = None,
**_ignore
) -> None:
"""Init"""
super().__init__(_node_name, _port_name, _topic, _message, _callback, _update)
super().__init__(
_node_name, _port_name, _topic, _message, _callback, _update, _gd_node=_gd_node
)
_message = _message.rsplit("Request")[0]
self.srv = GD_Message(_message, _type="srv").get()

Expand Down Expand Up @@ -275,10 +284,13 @@ def __init__(
_callback: str,
_params: dict,
_update: bool,
_gd_node: Any = None,
**_ignore
) -> None:
"""Init"""
super().__init__(_node_name, _port_name, _topic, _message, _callback, _update)
super().__init__(
_node_name, _port_name, _topic, _message, _callback, _update, _gd_node=_gd_node
)

self.duration = 1 / float(_params.get("Frequency", 10))
self.oneshot = _params.get("Oneshot", False)
Expand Down Expand Up @@ -316,11 +328,14 @@ def __init__(
_callback: str,
_params: dict,
_update: bool,
_gd_node: Any = None,
**_ignore
) -> None:
globals()["tf"] = importlib.import_module("tf")
globals()["tf2_ros"] = importlib.import_module("tf2_ros")
super().__init__(_node_name, _port_name, _topic, _message, _callback, _update)
super().__init__(
_node_name, _port_name, _topic, _message, _callback, _update, _gd_node=_gd_node
)

self.parent_frame = _params["Parent"]
self.child_frame = _params["Child"]
Expand Down Expand Up @@ -352,6 +367,7 @@ def callback(self, msg: Any) -> None:

####################################################


# This will not be used for now and is not up to date!
class ROS1_ActionServer(ROS1IportBase):
def __init__(
Expand All @@ -362,9 +378,12 @@ def __init__(
_message: str,
_callback: str,
_update: bool,
_gd_node: Any = None,
**_ignore
) -> None:
super().__init__(_node_name, _port_name, _topic, _message, _callback, _update)
super().__init__(
_node_name, _port_name, _topic, _message, _callback, _update, _gd_node=_gd_node
)
reply_key = "reply@" + self.topic

module, msg_name = _message.split("/")
Expand Down Expand Up @@ -428,7 +447,6 @@ class RostopicHzMsg:
"""

def __init__(self, rate=0.0, min=0.0, max=0.0, std_dev=0.0, window=1):

self.rate = rate

self.min = min
Expand Down Expand Up @@ -467,14 +485,16 @@ def __init__(
_callback: str,
_params: dict,
_update: bool,
_gd_node: Any = None,
**_ignore
) -> None:

"""Init"""

import rostopic

super().__init__(_node_name, _port_name, _topic, _message, _callback, _update)
super().__init__(
_node_name, _port_name, _topic, _message, _callback, _update, _gd_node=_gd_node
)

self.msg = rospy.AnyMsg

Expand All @@ -493,47 +513,39 @@ def __init__(
self.timer = rospy.Timer(rospy.Duration(self.duration), self.callback)

def callback(self, msg: Any) -> None:

"""Callback"""

res = self.rostopic_hz.get_hz(self.topic)

if res:

data = RostopicHzMsg(*res)

else:

data = RostopicHzMsg()

super().callback(data)

def unregister(self) -> None:

"""Unregisters the subscriber"""

super().unregister()

self.timer.shutdown()

if self.sub is not None:

self.sub.unregister()

self.sub = None

def register(self) -> None:

"""Registers the subscriber"""

super().register()

if self.sub is None:

self.sub = rospy.Subscriber(self.topic, self.msg, self.callback)

if self.timer._shutdown:

self.timer = rospy.Timer(rospy.Duration(self.duration), self.callback)


Expand All @@ -548,10 +560,11 @@ class ROS1_Publisher:
_message: ROS message
"""

def __init__(self, _topic: str, _message: Any, _params: dict = None) -> None:
def __init__(self, _topic: str, _message: Any, _params: dict = None, _gd_node=None) -> None:
"""Init"""

self.msg = GD_Message(_message).get()
self._gd_node = _gd_node
params = {"queue_size": 1}
if _params is not None:
params.update(_params)
Expand All @@ -563,7 +576,9 @@ def send(self, msg: Any) -> None:
Args:
msg: ROS Message
"""
self.pub.publish(msg)
if not self._gd_node or self._gd_node.RUNNING:
# publish only when Node is actually still running
self.pub.publish(msg)

def unregister(self):
if self.pub:
Expand Down Expand Up @@ -693,13 +708,11 @@ def send(self, msg: dict, topic: str = None, timeout=0.5):
client.update_configuration(msg)

def access_dynclient(self, msg: dict, topic: str = None, timeout=0.5):

topic = self.topic if topic is None else topic

client = self.mapping.get(topic)

if not client:

client = DynClient(topic, timeout=timeout, config_callback=None)

self.mapping[topic] = client
Expand Down

0 comments on commit 44aff17

Please sign in to comment.