Skip to content

Commit

Permalink
Fix get_type_description service bug and add a unit test (#1155)
Browse files Browse the repository at this point in the history
* Fix get_type_description service bug and add a unit test

Signed-off-by: Emerson Knapp <[email protected]>
  • Loading branch information
emersonknapp authored Aug 29, 2023
1 parent 159ced4 commit ef6983e
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 3 deletions.
17 changes: 14 additions & 3 deletions rclpy/rclpy/type_description_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import weakref

from rcl_interfaces.msg import ParameterDescriptor
from rcl_interfaces.msg import ParameterType

Expand Down Expand Up @@ -39,6 +41,7 @@ class TypeDescriptionService:

def __init__(self, node):
"""Initialize the service, if the parameter is set to true."""
self._node_weak_ref = weakref.ref(node)
node_name = node.get_name()
self.service_name = TOPIC_SEPARATOR_STRING.join((node_name, 'get_type_description'))
self._type_description_srv = None
Expand Down Expand Up @@ -68,14 +71,16 @@ def __init__(self, node):
.format(START_TYPE_DESCRIPTION_SERVICE_PARAM))

if self.enabled:
self._start_service(node)
self._start_service()

def destroy(self):
# Required manual destruction because this is not managed by rclpy.Service
if self._type_description_srv is not None:
self._type_description_srv.destroy_when_not_in_use()
self._type_description_srv = None

def _start_service(self, node):
def _start_service(self):
node = self._get_node()
self._type_description_srv = _rclpy.TypeDescriptionService(node.handle)
# Because we are creating our own service wrapper, must manually add the service
# to the appropriate parts of Node because we cannot call create_service.
Expand All @@ -93,4 +98,10 @@ def _start_service(self, node):

def _service_callback(self, request, response):
return self._type_description_srv.handle_request(
request, GetTypeDescription.Response, self._node)
request, GetTypeDescription.Response, self._get_node().handle)

def _get_node(self):
node = self._node_weak_ref()
if node is None:
raise ReferenceError('Expected valid node weak reference')
return node
66 changes: 66 additions & 0 deletions rclpy/test/test_type_description_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright 2023 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest

import rclpy
import rclpy.context
from rclpy.executors import SingleThreadedExecutor
from rclpy.qos import qos_profile_services_default
from test_msgs.msg import BasicTypes
from type_description_interfaces.srv import GetTypeDescription


class TestTypeDescriptionService(unittest.TestCase):

def setUp(self):
self.context = rclpy.context.Context()
rclpy.init(context=self.context)

self.test_node = rclpy.create_node(
'test_type_description_service',
namespace='/rclpy',
context=self.context)
self.test_topic = '/rclpy/basic_types'
self.test_pub = self.test_node.create_publisher(
BasicTypes, self.test_topic, 10)

self.get_type_description_client = self.test_node.create_client(
GetTypeDescription, '/rclpy/test_parameter_service/get_type_description',
qos_profile=qos_profile_services_default)

self.executor = SingleThreadedExecutor(context=self.context)
self.executor.add_node(self.test_node)

def tearDown(self):
self.executor.shutdown()
self.test_node.destroy_node()
rclpy.shutdown(context=self.context)

def test_get_type_description(self):
pub_infos = self.test_node.get_publishers_info_by_topic(self.test_topic)
assert len(pub_infos)
type_hash = pub_infos[0].topic_type_hash

request = GetTypeDescription.Request(
type_name='test_msgs/msg/BasicTypes',
type_hash=type_hash,
include_type_sources=True)
future = self.get_type_description_client.call_async(request)
self.executor.spin_until_future_complete(future)
response = future.result()
assert response is not None
assert response.successful
assert response.type_description.type_description.type_name == 'test_msgs/msg/BasicTypes'
assert len(response.type_sources)

0 comments on commit ef6983e

Please sign in to comment.