diff --git a/ur_robot_driver/CMakeLists.txt b/ur_robot_driver/CMakeLists.txt
index 6069c5ace..c5e4cd2e7 100644
--- a/ur_robot_driver/CMakeLists.txt
+++ b/ur_robot_driver/CMakeLists.txt
@@ -177,19 +177,24 @@ if(BUILD_TESTING)
find_package(ur_description REQUIRED)
find_package(ur_msgs REQUIRED)
find_package(launch_testing_ament_cmake)
+ find_package(ament_cmake_pytest)
if(${UR_ROBOT_DRIVER_BUILD_INTEGRATION_TESTS})
- add_launch_test(test/dashboard_client.py
- TIMEOUT
- 180
- )
- add_launch_test(test/robot_driver.py
- TIMEOUT
- 500
- )
add_launch_test(test/urscript_interface.py
TIMEOUT
500
)
+ ament_add_pytest_test(dashboard_client
+ test/dashboard_client.py
+ ENV
+ "PYTEST_ADDOPTS=-s --log-cli-level=INFO"
+ TIMEOUT 180
+ )
+ ament_add_pytest_test(robot_driver
+ test/robot_driver.py
+ ENV
+ "PYTEST_ADDOPTS=-s --log-cli-level=INFO"
+ TIMEOUT 180
+ )
endif()
endif()
diff --git a/ur_robot_driver/package.xml b/ur_robot_driver/package.xml
index 1c35b4dcb..d56eba49f 100644
--- a/ur_robot_driver/package.xml
+++ b/ur_robot_driver/package.xml
@@ -59,7 +59,9 @@
xacro
docker.io
+ launch_pytest
launch_testing_ament_cmake
+ ament_cmake_pytest
ament_cmake
diff --git a/ur_robot_driver/test/__init__.py b/ur_robot_driver/test/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/ur_robot_driver/test/conftest.py b/ur_robot_driver/test/conftest.py
new file mode 100644
index 000000000..a489a3158
--- /dev/null
+++ b/ur_robot_driver/test/conftest.py
@@ -0,0 +1,243 @@
+# Copyright 2023, FZI Forschungszentrum Informatik
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+#
+# * Neither the name of the {copyright_holder} nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+import logging
+import time
+
+import pytest
+import rclpy.node
+from controller_manager_msgs.srv import SwitchController
+from rclpy.action import ActionClient
+from std_srvs.srv import Trigger
+from ur_dashboard_msgs.msg import RobotMode
+from ur_dashboard_msgs.srv import (
+ GetLoadedProgram,
+ GetProgramState,
+ GetRobotMode,
+ IsProgramRunning,
+ Load,
+)
+
+TIMEOUT_WAIT_SERVICE = 10
+TIMEOUT_WAIT_ACTION = 10
+# If we download the docker image simultaneously to the tests, it can take quite some time until the
+# dashboard server is reachable and usable.
+TIMEOUT_WAIT_SERVICE_INITIAL = 120
+
+
+def wait_for_service(node, srv_name, srv_type, timeout=TIMEOUT_WAIT_SERVICE):
+ client = node.create_client(srv_type, srv_name)
+
+ logging.info("Waiting for service '%s' with timeout %fs...", srv_name, timeout)
+
+ assert client.wait_for_service(timeout)
+ logging.info(" done")
+
+ return client
+
+
+def call_service(node, client, request):
+ logging.info("Calling service '%s' with request: %s", client.srv_name, request)
+
+ future = client.call_async(request)
+ rclpy.spin_until_future_complete(node, future)
+
+ assert future.result is not None
+ logging.info(" Received result: %s", future.result())
+ return future.result()
+
+
+def wait_for_action(node, action_name, action_type, timeout=TIMEOUT_WAIT_ACTION):
+ client = ActionClient(node, action_type, action_name)
+
+ logging.info("Waiting for action server '%s' with timeout %fs...", action_name, timeout)
+ assert client.wait_for_server(timeout)
+ logging.info(" done")
+
+ return client
+
+
+def call_action(node, client, goal):
+ logging.info("Sending goal to action server '%s': %s", client._action_name, goal)
+ future = client.send_goal_async(goal)
+ rclpy.spin_until_future_complete(node, future)
+
+ assert future.result() is not None
+ logging.info(
+ " Received response: accepted=%r, status=%d",
+ future.result().accepted,
+ future.result().status,
+ )
+
+ return future.result()
+
+
+def get_action_result(node, client, goal_response, timeout):
+ logging.info(
+ "Waiting for result of action call '%s' with timeout %fs...", client._action_name, timeout
+ )
+ future = goal_response.get_result_async()
+ rclpy.spin_until_future_complete(node, future, timeout_sec=timeout)
+
+ assert future.result() is not None
+ logging.info(" Received result: %s", future.result())
+
+ return future.result().result
+
+
+@pytest.fixture(scope="module")
+def rclpy_init():
+ """
+ Initializes and finalizes rclpy.
+
+ This ensures that rclpy.init() has been called before, but is not called more than once.
+ """
+ logging.info("Initializing rclpy")
+ rclpy.init()
+
+ yield
+
+ logging.info("Shutting down rclpy")
+ rclpy.shutdown()
+
+
+@pytest.fixture(scope="class")
+def node(request, rclpy_init):
+ """
+ Creates a node with a given name.
+
+ The name needs to be passed in as a parameter
+ """
+ logging.info("Creating node with name '%s'", request.param)
+ rclpy_node = rclpy.node.Node(request.param)
+
+ yield rclpy_node
+
+ logging.info("Destroying node '%s'", request.param)
+ rclpy_node.destroy_node()
+
+
+class DashboardClient:
+ def __init__(self, node, service_interfaces):
+ self._node = node
+ self._service_interfaces = service_interfaces
+
+ def call_service(self, name, request):
+ return call_service(self._node, self._service_interfaces[name], request)
+
+
+@pytest.fixture(scope="class")
+def dashboard_interface(node):
+ # We wait longer for the first client, as the robot is still starting up
+ power_on_client = wait_for_service(
+ node,
+ "/dashboard_client/power_on",
+ Trigger,
+ timeout=TIMEOUT_WAIT_SERVICE_INITIAL,
+ )
+
+ # Connect to all other expected services
+ dashboard_interfaces = {
+ "power_off": Trigger,
+ "brake_release": Trigger,
+ "unlock_protective_stop": Trigger,
+ "restart_safety": Trigger,
+ "get_robot_mode": GetRobotMode,
+ "load_installation": Load,
+ "load_program": Load,
+ "close_popup": Trigger,
+ "get_loaded_program": GetLoadedProgram,
+ "program_state": GetProgramState,
+ "program_running": IsProgramRunning,
+ "play": Trigger,
+ "stop": Trigger,
+ }
+
+ dashboard_clients = {
+ srv_name: wait_for_service(node, f"/dashboard_client/{srv_name}", srv_type)
+ for (srv_name, srv_type) in dashboard_interfaces.items()
+ }
+
+ # Add first client to dict
+ dashboard_clients["power_on"] = power_on_client
+
+ return DashboardClient(node, dashboard_clients)
+
+
+class ControllerManagerInterface:
+ def __init__(self, node):
+ self._node = node
+ self._switch_controller_client = wait_for_service(
+ node, "/controller_manager/switch_controller", SwitchController
+ )
+
+ def switch_controller(self, activate_controllers=[], deactivate_controllers=[], strict=False):
+ req = SwitchController.Request()
+ req.activate_controllers = activate_controllers
+ req.deactivate_controllers = deactivate_controllers
+ req.strictness = (
+ SwitchController.Request.STRICT if strict else SwitchController.Request.BEST_EFFORT
+ )
+
+ result = call_service(self._node, self._switch_controller_client, req)
+
+ assert result.ok
+ logging.info(" Successfully switched controllers")
+
+
+@pytest.fixture(scope="class")
+def controller_manager_interface(node):
+ return ControllerManagerInterface(node)
+
+
+@pytest.fixture(scope="class")
+def robot_program_running(node, dashboard_interface):
+ """
+ Makes sure the robot program is running.
+
+ This powers the robot on and releases its brakes. It also stops the robot program and resends
+ it. This is necessary as (depending on the exact startup sequence), the program might be sent by
+ headless mode at a time where it is not yet able to execute, leaving it paused.
+ """
+ resend_robot_program_client = wait_for_service(
+ node, "/io_and_status_controller/resend_robot_program", Trigger
+ )
+
+ assert dashboard_interface.call_service("power_on", Trigger.Request()).success
+ assert dashboard_interface.call_service("brake_release", Trigger.Request()).success
+
+ time.sleep(1)
+
+ robot_mode = dashboard_interface.call_service("get_robot_mode", GetRobotMode.Request())
+ assert robot_mode.success
+ assert robot_mode.robot_mode.mode == RobotMode.RUNNING
+
+ assert dashboard_interface.call_service("stop", Trigger.Request()).success
+
+ time.sleep(1)
+
+ assert call_service(node, resend_robot_program_client, Trigger.Request()).success
diff --git a/ur_robot_driver/test/dashboard_client.py b/ur_robot_driver/test/dashboard_client.py
index 917189c72..32cb6dff2 100755
--- a/ur_robot_driver/test/dashboard_client.py
+++ b/ur_robot_driver/test/dashboard_client.py
@@ -26,29 +26,17 @@
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
-
-
+import os
+import sys
import time
-import unittest
import pytest
-import rclpy
-from launch import LaunchDescription
-from launch.actions import DeclareLaunchArgument, ExecuteProcess, IncludeLaunchDescription
-from launch.launch_description_sources import PythonLaunchDescriptionSource
-from launch.substitutions import LaunchConfiguration, PathJoinSubstitution
-from launch_ros.substitutions import FindPackagePrefix, FindPackageShare
-from launch_testing.actions import ReadyToTest
-from rclpy.node import Node
from std_srvs.srv import Trigger
from ur_dashboard_msgs.msg import RobotMode
-from ur_dashboard_msgs.srv import (
- GetLoadedProgram,
- GetProgramState,
- GetRobotMode,
- IsProgramRunning,
- Load,
-)
+from ur_dashboard_msgs.srv import GetRobotMode
+
+sys.path.append(os.path.dirname(__file__))
+from robot_launch_descriptions import * # noqa: E402, F403
TIMEOUT_WAIT_SERVICE = 10
# If we download the docker image simultaneously to the tests, it can take quite some time until the
@@ -56,164 +44,46 @@
TIMEOUT_WAIT_SERVICE_INITIAL = 120
-@pytest.mark.launch_test
-def generate_test_description():
- declared_arguments = []
-
- declared_arguments.append(
- DeclareLaunchArgument(
- "ur_type",
- default_value="ur5e",
- description="Type/series of used UR robot.",
- choices=["ur3", "ur3e", "ur5", "ur5e", "ur10", "ur10e", "ur16e"],
- )
- )
-
- ur_type = LaunchConfiguration("ur_type")
-
- dashboard_client = IncludeLaunchDescription(
- PythonLaunchDescriptionSource(
- PathJoinSubstitution(
- [
- FindPackageShare("ur_robot_driver"),
- "launch",
- "ur_dashboard_client.launch.py",
- ]
- )
- ),
- launch_arguments={
- "robot_ip": "192.168.56.101",
- }.items(),
- )
- ursim = ExecuteProcess(
- cmd=[
- PathJoinSubstitution(
- [
- FindPackagePrefix("ur_robot_driver"),
- "lib",
- "ur_robot_driver",
- "start_ursim.sh",
- ]
- ),
- " ",
- "-m ",
- ur_type,
- ],
- name="start_ursim",
- output="screen",
- )
-
- return LaunchDescription(declared_arguments + [ReadyToTest(), dashboard_client, ursim])
-
-
-class DashboardClientTest(unittest.TestCase):
- @classmethod
- def setUpClass(cls):
- # Initialize the ROS context
- rclpy.init()
- cls.node = Node("dashboard_client_test")
- cls.init_robot(cls)
-
- @classmethod
- def tearDownClass(cls):
- # Shutdown the ROS context
- cls.node.destroy_node()
- rclpy.shutdown()
-
- def init_robot(self):
- # We wait longer for the first client, as the robot is still starting up
- power_on_client = waitForService(
- self.node, "/dashboard_client/power_on", Trigger, timeout=TIMEOUT_WAIT_SERVICE_INITIAL
- )
-
- # Connect to all other expected services
- dashboard_interfaces = {
- "power_off": Trigger,
- "brake_release": Trigger,
- "unlock_protective_stop": Trigger,
- "restart_safety": Trigger,
- "get_robot_mode": GetRobotMode,
- "load_installation": Load,
- "load_program": Load,
- "close_popup": Trigger,
- "get_loaded_program": GetLoadedProgram,
- "program_state": GetProgramState,
- "program_running": IsProgramRunning,
- "play": Trigger,
- "stop": Trigger,
- }
- self.dashboard_clients = {
- srv_name: waitForService(self.node, f"/dashboard_client/{srv_name}", srv_type)
- for (srv_name, srv_type) in dashboard_interfaces.items()
- }
-
- # Add first client to dict
- self.dashboard_clients["power_on"] = power_on_client
-
- #
- # Test functions
- #
-
- def test_switch_on(self):
+@pytest.mark.launch(fixture=launch_dashboard_client) # noqa: F405
+@pytest.mark.parametrize("node", [("dashboard_client_test")], indirect=True)
+class TestDashboardClient:
+ def test_switch_on(self, dashboard_interface):
"""Test power on a robot."""
# Wait until the robot is booted completely
end_time = time.time() + 10
mode = RobotMode.DISCONNECTED
while mode != RobotMode.POWER_OFF and time.time() < end_time:
time.sleep(0.1)
- result = self.call_dashboard_service("get_robot_mode", GetRobotMode.Request())
- self.assertTrue(result.success)
+
+ result = dashboard_interface.call_service("get_robot_mode", GetRobotMode.Request())
+ assert result.success
mode = result.robot_mode.mode
# Power on robot
- self.assertTrue(self.call_dashboard_service("power_on", Trigger.Request()).success)
+ assert dashboard_interface.call_service("power_on", Trigger.Request()).success
# Wait until robot mode changes
end_time = time.time() + 10
mode = RobotMode.DISCONNECTED
while mode not in (RobotMode.IDLE, RobotMode.RUNNING) and time.time() < end_time:
time.sleep(0.1)
- result = self.call_dashboard_service("get_robot_mode", GetRobotMode.Request())
- self.assertTrue(result.success)
+
+ result = dashboard_interface.call_service("get_robot_mode", GetRobotMode.Request())
+ assert result.success
mode = result.robot_mode.mode
- self.assertIn(mode, (RobotMode.IDLE, RobotMode.RUNNING))
+ assert mode in (RobotMode.IDLE, RobotMode.RUNNING)
# Release robot brakes
- self.assertTrue(self.call_dashboard_service("brake_release", Trigger.Request()).success)
+ assert dashboard_interface.call_service("brake_release", Trigger.Request()).success
# Wait until robot mode is RUNNING
end_time = time.time() + 10
mode = RobotMode.DISCONNECTED
while mode != RobotMode.RUNNING and time.time() < end_time:
time.sleep(0.1)
- result = self.call_dashboard_service("get_robot_mode", GetRobotMode.Request())
- self.assertTrue(result.success)
+ result = dashboard_interface.call_service("get_robot_mode", GetRobotMode.Request())
+ assert result.success
mode = result.robot_mode.mode
- self.assertEqual(mode, RobotMode.RUNNING)
-
- #
- # Utility functions
- #
-
- def call_dashboard_service(self, srv_name, request):
- self.node.get_logger().info(
- f"Calling dashboard service '{srv_name}' with request {request}"
- )
- future = self.dashboard_clients[srv_name].call_async(request)
- rclpy.spin_until_future_complete(self.node, future)
- if future.result() is not None:
- self.node.get_logger().info(f"Received result {future.result()}")
- return future.result()
- else:
- raise Exception(f"Exception while calling service: {future.exception()}")
-
-
-def waitForService(node, srv_name, srv_type, timeout=TIMEOUT_WAIT_SERVICE):
- client = node.create_client(srv_type, srv_name)
- if client.wait_for_service(timeout) is False:
- raise Exception(f"Could not reach service '{srv_name}' within timeout of {timeout}")
-
- node.get_logger().info(f"Successfully connected to service '{srv_name}'")
- return client
+ assert mode == RobotMode.RUNNING
diff --git a/ur_robot_driver/test/robot_driver.py b/ur_robot_driver/test/robot_driver.py
index 4dfc91404..156857bb1 100755
--- a/ur_robot_driver/test/robot_driver.py
+++ b/ur_robot_driver/test/robot_driver.py
@@ -26,36 +26,25 @@
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
-
-
+import logging
import time
-import unittest
import pytest
import rclpy
from builtin_interfaces.msg import Duration
from control_msgs.action import FollowJointTrajectory
-from controller_manager_msgs.srv import SwitchController
-from launch import LaunchDescription
-from launch.actions import (
- DeclareLaunchArgument,
- ExecuteProcess,
- IncludeLaunchDescription,
- RegisterEventHandler,
-)
-from launch.event_handlers import OnProcessExit
-from launch.launch_description_sources import PythonLaunchDescriptionSource
-from launch.substitutions import LaunchConfiguration, PathJoinSubstitution
-from launch_ros.substitutions import FindPackagePrefix, FindPackageShare
-from launch_testing.actions import ReadyToTest
-from rclpy.action import ActionClient
-from rclpy.node import Node
-from std_srvs.srv import Trigger
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from ur_msgs.msg import IOStates
from ur_msgs.srv import SetIO
-from ur_dashboard_msgs.msg import RobotMode
-from ur_dashboard_msgs.srv import GetRobotMode
+
+from .conftest import (
+ call_action,
+ call_service,
+ get_action_result,
+ wait_for_action,
+ wait_for_service,
+)
+from .robot_launch_descriptions import * # noqa: E402, F403
TIMEOUT_WAIT_SERVICE = 10
TIMEOUT_WAIT_SERVICE_INITIAL = 60
@@ -72,150 +61,17 @@
]
-@pytest.mark.launch_test
-def generate_test_description():
- declared_arguments = []
-
- declared_arguments.append(
- DeclareLaunchArgument(
- "ur_type",
- default_value="ur5e",
- description="Type/series of used UR robot.",
- choices=["ur3", "ur3e", "ur5", "ur5e", "ur10", "ur10e", "ur16e"],
- )
- )
-
- ur_type = LaunchConfiguration("ur_type")
-
- robot_driver = IncludeLaunchDescription(
- PythonLaunchDescriptionSource(
- PathJoinSubstitution(
- [FindPackageShare("ur_robot_driver"), "launch", "ur_control.launch.py"]
- )
- ),
- launch_arguments={
- "robot_ip": "192.168.56.101",
- "ur_type": ur_type,
- "launch_rviz": "false",
- "controller_spawner_timeout": str(TIMEOUT_WAIT_SERVICE_INITIAL),
- "initial_joint_controller": "scaled_joint_trajectory_controller",
- "headless_mode": "true",
- "launch_dashboard_client": "false",
- "start_joint_controller": "false",
- }.items(),
- )
- ursim = ExecuteProcess(
- cmd=[
- PathJoinSubstitution(
- [FindPackagePrefix("ur_robot_driver"), "lib", "ur_robot_driver", "start_ursim.sh"]
- ),
- " ",
- "-m ",
- ur_type,
- ],
- name="start_ursim",
- output="screen",
- )
- wait_dashboard_server = ExecuteProcess(
- cmd=[
- PathJoinSubstitution(
- [FindPackagePrefix("ur_robot_driver"), "bin", "wait_dashboard_server.sh"]
- )
- ],
- name="wait_dashboard_server",
- output="screen",
- )
- driver_starter = RegisterEventHandler(
- OnProcessExit(target_action=wait_dashboard_server, on_exit=robot_driver)
- )
-
- return LaunchDescription(
- declared_arguments + [ReadyToTest(), wait_dashboard_server, ursim, driver_starter]
- )
-
-
-class RobotDriverTest(unittest.TestCase):
- @classmethod
- def setUpClass(cls):
- # Initialize the ROS context
- rclpy.init()
- cls.node = Node("robot_driver_test")
- time.sleep(1)
- cls.init_robot(cls)
-
- @classmethod
- def tearDownClass(cls):
- # Shutdown the ROS context
- cls.node.destroy_node()
- rclpy.shutdown()
-
- def init_robot(self):
- # Wait longer for the first service clients:
- # - The robot has to start up
- # - The controller_manager has to start
- # - The controllers need to load and activate
- service_interfaces_initial = {
- "/dashboard_client/power_on": Trigger,
- "/controller_manager/switch_controller": SwitchController,
- "/io_and_status_controller/set_io": SetIO,
- }
- self.service_clients = {
- srv_name: waitForService(
- self.node, srv_name, srv_type, timeout=TIMEOUT_WAIT_SERVICE_INITIAL
- )
- for (srv_name, srv_type) in service_interfaces_initial.items()
- }
-
- # Connect to the rest of the required interfaces
- service_interfaces = {
- "/dashboard_client/brake_release": Trigger,
- "/dashboard_client/stop": Trigger,
- "/dashboard_client/get_robot_mode": GetRobotMode,
- "/controller_manager/switch_controller": SwitchController,
- "/io_and_status_controller/resend_robot_program": Trigger,
- }
- self.service_clients.update(
- {
- srv_name: waitForService(self.node, srv_name, srv_type)
- for (srv_name, srv_type) in service_interfaces.items()
- }
+@pytest.mark.launch(fixture=launch_robot_driver) # noqa: F405
+@pytest.mark.parametrize("node", [("robot_driver_test")], indirect=True)
+class TestRobotDriver:
+ def test_start_scaled_jtc_controller(
+ self, dashboard_interface, controller_manager_interface, robot_program_running
+ ):
+ controller_manager_interface.switch_controller(
+ activate_controllers=["scaled_joint_trajectory_controller"]
)
- action_interfaces = {
- "/scaled_joint_trajectory_controller/follow_joint_trajectory": FollowJointTrajectory
- }
- self.action_clients = {
- action_name: waitForAction(self.node, action_name, action_type)
- for (action_name, action_type) in action_interfaces.items()
- }
-
- def setUp(self):
- # Start robot
- empty_req = Trigger.Request()
- self.call_service("/dashboard_client/power_on", empty_req)
- self.call_service("/dashboard_client/brake_release", empty_req)
- time.sleep(1)
- robot_mode_resp = self.call_service(
- "/dashboard_client/get_robot_mode", GetRobotMode.Request()
- )
- self.assertEqual(robot_mode_resp.robot_mode.mode, RobotMode.RUNNING)
- self.call_service("/dashboard_client/stop", empty_req)
- time.sleep(1)
- self.call_service("/io_and_status_controller/resend_robot_program", empty_req)
-
- #
- # Test functions
- #
-
- def test_start_scaled_jtc_controller(self):
- req = SwitchController.Request()
- req.strictness = SwitchController.Request.BEST_EFFORT
- req.activate_controllers = ["scaled_joint_trajectory_controller"]
- result = self.call_service("/controller_manager/switch_controller", req)
-
- self.assertEqual(result.ok, True)
-
- def test_set_io(self):
+ def test_set_io(self, node, robot_program_running):
"""Test to set an IO and check whether it has been set."""
# Create io callback to verify result
io_msg = None
@@ -224,13 +80,15 @@ def io_msg_cb(msg):
nonlocal io_msg
io_msg = msg
- io_states_sub = self.node.create_subscription(
+ io_states_sub = node.create_subscription(
IOStates,
"/io_and_status_controller/io_states",
io_msg_cb,
rclpy.qos.qos_profile_system_default,
)
+ set_io_client = wait_for_service(node, "/io_and_status_controller/set_io", SetIO)
+
# Set pin 0 to 1.0
test_pin = 0
@@ -239,37 +97,37 @@ def io_msg_cb(msg):
set_io_req.pin = test_pin
set_io_req.state = 1.0
- self.node.get_logger().info(f"Setting pin {test_pin} to {set_io_req.state}")
- self.call_service("/io_and_status_controller/set_io", set_io_req)
+ logging.info("Setting pin %d to %f", set_io_req.pin, set_io_req.state)
+ call_service(node, set_io_client, set_io_req)
# Wait until the pin state has changed
pin_state = False
end_time = time.time() + 5
while not pin_state and time.time() < end_time:
- rclpy.spin_once(self.node, timeout_sec=0.1)
+ rclpy.spin_once(node, timeout_sec=0.1)
if io_msg is not None:
pin_state = io_msg.digital_out_states[test_pin].state
- self.assertEqual(pin_state, set_io_req.state)
+ assert pin_state == set_io_req.state
# Set pin 0 to 0.0
set_io_req.state = 0.0
- self.node.get_logger().info(f"Setting pin {test_pin} to {set_io_req.state}")
- self.call_service("/io_and_status_controller/set_io", set_io_req)
+ logging.info("Setting pin %d to %f", set_io_req.pin, set_io_req.state)
+ call_service(node, set_io_client, set_io_req)
# Wait until the pin state has changed back
end_time = time.time() + 5
while pin_state and time.time() < end_time:
- rclpy.spin_once(self.node, timeout_sec=0.1)
+ rclpy.spin_once(node, timeout_sec=0.1)
if io_msg is not None:
pin_state = io_msg.digital_out_states[test_pin].state
- self.assertEqual(pin_state, set_io_req.state)
+ assert pin_state == set_io_req.state
# Clean up io subscription
- self.node.destroy_subscription(io_states_sub)
+ node.destroy_subscription(io_states_sub)
- def test_trajectory(self):
+ def test_trajectory(self, node, robot_program_running):
"""Test robot movement."""
# Construct test trajectory
test_trajectory = [
@@ -286,24 +144,28 @@ def test_trajectory(self):
],
)
- # Sending trajectory goal
- self.node.get_logger().info("Sending simple goal")
- goal_response = self.call_action(
+ follow_joint_trajectory_client = wait_for_action(
+ node,
"/scaled_joint_trajectory_controller/follow_joint_trajectory",
- FollowJointTrajectory.Goal(trajectory=trajectory),
+ FollowJointTrajectory,
)
- self.assertEqual(goal_response.accepted, True)
- # Verify execution
- result = self.get_result(
- "/scaled_joint_trajectory_controller/follow_joint_trajectory",
- goal_response,
- TIMEOUT_EXECUTE_TRAJECTORY,
+ # Sending trajectory goal
+ logging.info("Sending simple test trajectory")
+ goal_response = call_action(
+ node, follow_joint_trajectory_client, FollowJointTrajectory.Goal(trajectory=trajectory)
+ )
+
+ assert goal_response.accepted
+ logging.info("Goal accepted")
+
+ result = get_action_result(
+ node, follow_joint_trajectory_client, goal_response, TIMEOUT_EXECUTE_TRAJECTORY
)
- self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL)
- self.node.get_logger().info("Received result SUCCESSFUL")
+ assert result.error_code == FollowJointTrajectory.Result.SUCCESSFUL
+ logging.info("Successfully executed simple test trajectory")
- def test_illegal_trajectory(self):
+ def test_illegal_trajectory(self, node):
"""
Test trajectory server.
@@ -323,18 +185,23 @@ def test_illegal_trajectory(self):
],
)
- # Send illegal goal
- self.node.get_logger().info("Sending illegal goal")
- goal_response = self.call_action(
+ follow_joint_trajectory_client = wait_for_action(
+ node,
"/scaled_joint_trajectory_controller/follow_joint_trajectory",
- FollowJointTrajectory.Goal(trajectory=trajectory),
+ FollowJointTrajectory,
+ )
+
+ # Send illegal goal
+ logging.info("Sending illegal test trajectory")
+ goal_response = call_action(
+ node, follow_joint_trajectory_client, FollowJointTrajectory.Goal(trajectory=trajectory)
)
# Verify the failure is correctly detected
- self.assertEqual(goal_response.accepted, False)
- self.node.get_logger().info("Goal response REJECTED")
+ assert not goal_response.accepted
+ logging.info("Correctly rejected illegal trajectory")
- def test_trajectory_scaled(self):
+ def test_trajectory_scaled(self, node):
"""Test robot movement."""
# Construct test trajectory
test_trajectory = [
@@ -350,34 +217,31 @@ def test_trajectory_scaled(self):
],
)
- goal = FollowJointTrajectory.Goal(trajectory=trajectory)
+ follow_joint_trajectory_client = wait_for_action(
+ node,
+ "/scaled_joint_trajectory_controller/follow_joint_trajectory",
+ FollowJointTrajectory,
+ )
# TODO: uncomment when JTC starts taking into account goal_time_tolerance from goal message
# see https://github.com/ros-controls/ros2_controllers/issues/249
# self.node.get_logger().info("Sending scaled goal without time restrictions")
- self.node.get_logger().info("Sending goal for robot to follow")
- goal_response = self.call_action(
- "/scaled_joint_trajectory_controller/follow_joint_trajectory", goal
+ logging.info("Sending trajectory goal")
+ goal_response = call_action(
+ node, follow_joint_trajectory_client, FollowJointTrajectory.Goal(trajectory=trajectory)
)
- self.assertEqual(goal_response.accepted, True)
-
- if goal_response.accepted:
- result = self.get_result(
- "/scaled_joint_trajectory_controller/follow_joint_trajectory",
- goal_response,
- TIMEOUT_EXECUTE_TRAJECTORY,
- )
- self.assertIn(
- result.error_code,
- (
- FollowJointTrajectory.Result.PATH_TOLERANCE_VIOLATED,
- FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED,
- FollowJointTrajectory.Result.SUCCESSFUL,
- ),
- )
-
- self.node.get_logger().info("Received result")
+ assert goal_response.accepted
+ logging.info("Goal accepted")
+
+ result = get_action_result(
+ node, follow_joint_trajectory_client, goal_response, TIMEOUT_EXECUTE_TRAJECTORY
+ )
+ assert result.error_code in (
+ FollowJointTrajectory.Result.PATH_TOLERANCE_VIOLATED,
+ FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED,
+ FollowJointTrajectory.Result.SUCCESSFUL,
+ )
# TODO: uncomment when JTC starts taking into account goal_time_tolerance from goal message
# see https://github.com/ros-controls/ros2_controllers/issues/249
@@ -393,60 +257,3 @@ def test_trajectory_scaled(self):
# result = self.get_result("/scaled_joint_trajectory_controller/follow_joint_trajectory", goal_response, TIMEOUT_EXECUTE_TRAJECTORY)
# self.assertEqual(result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED)
# self.node.get_logger().info("Received result GOAL_TOLERANCE_VIOLATED")
-
- #
- # Utility functions
- #
-
- def call_service(self, srv_name, request):
- self.node.get_logger().info(f"Calling service '{srv_name}' with request {request}")
- future = self.service_clients[srv_name].call_async(request)
- rclpy.spin_until_future_complete(self.node, future)
- if future.result() is not None:
- self.node.get_logger().info(f"Received result {future.result()}")
- return future.result()
- else:
- raise Exception(f"Exception while calling service: {future.exception()}")
-
- def call_action(self, action_name, goal):
- self.node.get_logger().info(f"Sending goal to action server '{action_name}'")
- future = self.action_clients[action_name].send_goal_async(goal)
- rclpy.spin_until_future_complete(self.node, future)
-
- if future.result() is not None:
- return future.result()
- else:
- raise Exception(f"Exception while calling action: {future.exception()}")
-
- def get_result(self, action_name, goal_response, timeout):
- self.node.get_logger().info(
- f"Waiting for result for action server '{action_name}' (timeout: {timeout} seconds)"
- )
- future_res = self.action_clients[action_name]._get_result_async(goal_response)
- rclpy.spin_until_future_complete(self.node, future_res, timeout_sec=timeout)
-
- if future_res.result() is not None:
- self.node.get_logger().info(f"Received result {future_res.result().result}")
- return future_res.result().result
- else:
- raise Exception(f"Exception while calling action: {future_res.exception()}")
-
-
-def waitForService(node, srv_name, srv_type, timeout=TIMEOUT_WAIT_SERVICE):
- client = node.create_client(srv_type, srv_name)
- if client.wait_for_service(timeout) is False:
- raise Exception(f"Could not reach service '{srv_name}' within timeout of {timeout}")
-
- node.get_logger().info(f"Successfully connected to service '{srv_name}'")
- return client
-
-
-def waitForAction(node, action_name, action_type, timeout=TIMEOUT_WAIT_ACTION):
- client = ActionClient(node, action_type, action_name)
- if client.wait_for_server(timeout) is False:
- raise Exception(
- f"Could not reach action server '{action_name}' within timeout of {timeout}"
- )
-
- node.get_logger().info(f"Successfully connected to action '{action_name}'")
- return client
diff --git a/ur_robot_driver/test/robot_launch_descriptions.py b/ur_robot_driver/test/robot_launch_descriptions.py
new file mode 100644
index 000000000..09544a595
--- /dev/null
+++ b/ur_robot_driver/test/robot_launch_descriptions.py
@@ -0,0 +1,153 @@
+# Copyright 2023, FZI Forschungszentrum Informatik
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+#
+# * Neither the name of the {copyright_holder} nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+import launch_pytest
+from launch import LaunchDescription
+from launch.actions import DeclareLaunchArgument, ExecuteProcess, IncludeLaunchDescription
+from launch.launch_description_sources import PythonLaunchDescriptionSource
+from launch.substitutions import LaunchConfiguration, PathJoinSubstitution
+from launch_pytest.actions import ReadyToTest
+from launch_ros.substitutions import FindPackagePrefix, FindPackageShare
+
+CONTROLLER_SPAWNER_TIMEOUT = 120
+
+
+@launch_pytest.fixture(scope="class")
+def launch_dashboard_client():
+ """
+ Start a robot arm in ursim and a dashboard client connected to it.
+
+ This is deliberately scoped at class level, as you might want to have tests that require
+ completely resetting the robot afterwards. This can be done by putting these into a separate
+ class.
+ """
+ declared_arguments = []
+
+ declared_arguments.append(
+ DeclareLaunchArgument(
+ "ur_type",
+ default_value="ur5e",
+ description="Type/series of used UR robot.",
+ choices=["ur3", "ur3e", "ur5", "ur5e", "ur10", "ur10e", "ur16e"],
+ )
+ )
+
+ ur_type = LaunchConfiguration("ur_type")
+
+ dashboard_client = IncludeLaunchDescription(
+ PythonLaunchDescriptionSource(
+ PathJoinSubstitution(
+ [
+ FindPackageShare("ur_robot_driver"),
+ "launch",
+ "ur_dashboard_client.launch.py",
+ ]
+ )
+ ),
+ launch_arguments={
+ "robot_ip": "192.168.56.101",
+ }.items(),
+ )
+ ursim = ExecuteProcess(
+ cmd=[
+ PathJoinSubstitution(
+ [
+ FindPackagePrefix("ur_robot_driver"),
+ "lib",
+ "ur_robot_driver",
+ "start_ursim.sh",
+ ]
+ ),
+ " ",
+ "-m ",
+ ur_type,
+ ],
+ name="start_ursim",
+ output="screen",
+ )
+
+ return LaunchDescription(declared_arguments + [ReadyToTest(), dashboard_client, ursim])
+
+
+@launch_pytest.fixture(scope="class")
+def launch_robot_driver():
+ declared_arguments = []
+
+ declared_arguments.append(
+ DeclareLaunchArgument(
+ "ur_type",
+ default_value="ur5e",
+ description="Type/series of used UR robot.",
+ choices=["ur3", "ur3e", "ur5", "ur5e", "ur10", "ur10e", "ur16e"],
+ )
+ )
+
+ ur_type = LaunchConfiguration("ur_type")
+
+ robot_driver = IncludeLaunchDescription(
+ PythonLaunchDescriptionSource(
+ PathJoinSubstitution(
+ [FindPackageShare("ur_robot_driver"), "launch", "ur_control.launch.py"]
+ )
+ ),
+ launch_arguments={
+ "robot_ip": "192.168.56.101",
+ "ur_type": ur_type,
+ "launch_rviz": "false",
+ "controller_spawner_timeout": str(CONTROLLER_SPAWNER_TIMEOUT),
+ "initial_joint_controller": "scaled_joint_trajectory_controller",
+ "headless_mode": "true",
+ "launch_dashboard_client": "false",
+ "start_joint_controller": "false",
+ }.items(),
+ )
+ ursim = ExecuteProcess(
+ cmd=[
+ PathJoinSubstitution(
+ [FindPackagePrefix("ur_robot_driver"), "lib", "ur_robot_driver", "start_ursim.sh"]
+ ),
+ " ",
+ "-m ",
+ ur_type,
+ ],
+ name="start_ursim",
+ output="screen",
+ )
+ # wait_dashboard_server = ExecuteProcess(
+ # cmd=[
+ # PathJoinSubstitution(
+ # [FindPackagePrefix("ur_robot_driver"), "bin", "wait_dashboard_server.sh"]
+ # )
+ # ],
+ # name="wait_dashboard_server",
+ # output="screen",
+ # )
+ # driver_starter = RegisterEventHandler(
+ # OnProcessExit(target_action=wait_dashboard_server, on_exit=robot_driver)
+ # )
+
+ return LaunchDescription(declared_arguments + [ReadyToTest(), robot_driver, ursim])