From 438a11961ba51f93921998ca2e82e55756221179 Mon Sep 17 00:00:00 2001 From: Cyrille Berger Date: Wed, 18 Oct 2023 20:42:28 +0200 Subject: [PATCH] fix bidirectional bridge looping over the messages with fastdds --- src/domain_bridge/domain_bridge.cpp | 210 ++++++++++++++++++++++------ 1 file changed, 165 insertions(+), 45 deletions(-) diff --git a/src/domain_bridge/domain_bridge.cpp b/src/domain_bridge/domain_bridge.cpp index 3b51eb5..58d9d40 100644 --- a/src/domain_bridge/domain_bridge.cpp +++ b/src/domain_bridge/domain_bridge.cpp @@ -49,6 +49,87 @@ namespace domain_bridge { +/// \internal +/** + * A hack, because GenericSubscription doesn't allow to define callback that takes the + * serialized message and the message type info at the same time. See issue #1604 of rclcpp. + */ +class GenericSubscription : public rclcpp::GenericSubscription +{ +public: + template> + GenericSubscription( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const std::shared_ptr ts_lib, + const std::string & topic_name, + const std::string & topic_type, + const rclcpp::QoS & qos, + std::function, rclcpp::MessageInfo)> callback, + const rclcpp::SubscriptionOptionsWithAllocator & options) + : rclcpp::GenericSubscription( + node_base, ts_lib, topic_name, topic_type, qos, + [](std::shared_ptr) {}, options), callback_(callback) + { + } + + void handle_serialized_message( + const std::shared_ptr & serialized_message, + const rclcpp::MessageInfo & message_info) override + { + callback_(serialized_message, message_info); + } + +private: + std::function, rclcpp::MessageInfo)> callback_; +}; + +template> +std::shared_ptr create_generic_subscription( + rclcpp::node_interfaces::NodeTopicsInterface::SharedPtr topics_interface, + const std::string & topic_name, + const std::string & topic_type, + const rclcpp::QoS & qos, + std::function, rclcpp::MessageInfo)> callback, + const rclcpp::SubscriptionOptionsWithAllocator & options = ( + rclcpp::SubscriptionOptionsWithAllocator() + ) +) +{ + auto ts_lib = rclcpp::get_typesupport_library( + topic_type, "rosidl_typesupport_cpp"); + + auto subscription = std::make_shared( + topics_interface->get_node_base_interface(), + std::move(ts_lib), + topic_name, + topic_type, + qos, + callback, + options); + + topics_interface->add_subscription(subscription, options.callback_group); + + return subscription; +} + +template +std::shared_ptr create_generic_subscription( + rclcpp::Node::SharedPtr node, + const std::string & topic_name, + const std::string & topic_type, + const rclcpp::QoS & qos, + std::function, rclcpp::MessageInfo)> callback, + const rclcpp::SubscriptionOptionsWithAllocator & options) +{ + return create_generic_subscription( + node->get_node_topics_interface(), + rclcpp::extend_name_with_sub_namespace(topic_name, node->get_sub_namespace()), + topic_type, + qos, + std::move(callback), + options + ); +} /// \internal /** @@ -76,6 +157,11 @@ class SerializedPublisher ((*impl_).*publish_method_pointer_)(message); // this is a bit horrible, but it works ... } + std::shared_ptr get_base_publisher() const + { + return impl_; + } + private: std::shared_ptr impl_; using PointerToMemberMethod = @@ -234,6 +320,17 @@ class DomainBridgeImpl return publisher; } + /// Check if there is a pulisher with the given gid + bool has_publisher_with_gid(const rmw_gid_t & _publisher_gid) + { + for (auto it = bridged_topics_.begin(); it != bridged_topics_.end(); ++it) { + if (*it->second.first->get_base_publisher() == _publisher_gid) { + return true; + } + } + return false; + } + std::shared_ptr create_subscription( rclcpp::Node::SharedPtr node, std::shared_ptr publisher, @@ -242,60 +339,83 @@ class DomainBridgeImpl const rclcpp::QoS & qos, rclcpp::SubscriptionOptionsWithAllocator> & options) { - std::function)> callback; + std::function< + void(std::shared_ptr, const rclcpp::MessageInfo &)> callback; switch (options_.mode()) { case DomainBridgeOptions::Mode::Compress: - callback = [ - serializer = rclcpp::Serialization{}, - publisher, - cctx = cctx_.get() - ](std::shared_ptr msg) - { - // Publish message into the other domain - domain_bridge::msg::CompressedMsg compressed_msg; - compressed_msg.data = domain_bridge::compress_message(cctx, std::move(*msg)); - rclcpp::SerializedMessage serialized_compressed_msg; - serializer.serialize_message(&compressed_msg, &serialized_compressed_msg); - publisher->publish(serialized_compressed_msg); - }; + { + std::function< + void(std::shared_ptr)> callback = [ + serializer = rclcpp::Serialization{}, + publisher, + cctx = cctx_.get() + ](std::shared_ptr msg) + { + // Publish message into the other domain + domain_bridge::msg::CompressedMsg compressed_msg; + compressed_msg.data = domain_bridge::compress_message(cctx, std::move(*msg)); + rclcpp::SerializedMessage serialized_compressed_msg; + serializer.serialize_message(&compressed_msg, &serialized_compressed_msg); + publisher->publish(serialized_compressed_msg); + }; + return node->create_generic_subscription( + topic_name, + type, + qos, + callback, + options); + } break; case DomainBridgeOptions::Mode::Decompress: - callback = [ - serializer = rclcpp::Serialization{}, - publisher, - dctx = dctx_.get() - ](std::shared_ptr serialized_compressed_msg) - { - // Publish message into the other domain - domain_bridge::msg::CompressedMsg compressed_msg; - serializer.deserialize_message(serialized_compressed_msg.get(), &compressed_msg); - rclcpp::SerializedMessage msg = domain_bridge::decompress_message( - dctx, std::move(compressed_msg.data)); - publisher->publish(msg); - }; + { + std::function< + void(std::shared_ptr)> callback = [ + serializer = rclcpp::Serialization{}, + publisher, + dctx = dctx_.get() + ](std::shared_ptr< + rclcpp::SerializedMessage> serialized_compressed_msg) + { + // Publish message into the other domain + domain_bridge::msg::CompressedMsg compressed_msg; + serializer.deserialize_message(serialized_compressed_msg.get(), &compressed_msg); + rclcpp::SerializedMessage msg = domain_bridge::decompress_message( + dctx, std::move(compressed_msg.data)); + publisher->publish(msg); + }; + return node->create_subscription( + topic_name, + qos, + callback, + options); + } break; default: // fallthrough case DomainBridgeOptions::Mode::Normal: - callback = [publisher](std::shared_ptr msg) { - // Publish message into the other domain - publisher->publish(*msg); - }; + { + std::function< + void(std::shared_ptr, + const rclcpp::MessageInfo &)> + callback = [publisher, this]( + std::shared_ptr msg, + const rclcpp::MessageInfo & _message_info) { + // Check if the message comes from this bridge + if (!has_publisher_with_gid(_message_info.get_rmw_message_info().publisher_gid)) { + // Publish message into the other domain + publisher->publish(*msg); + } + }; + return create_generic_subscription( + node, + topic_name, + type, + qos, + callback, + options); + } break; } - if (options_.mode() != DomainBridgeOptions::Mode::Decompress) { - // Create subscription - return node->create_generic_subscription( - topic_name, - type, - qos, - callback, - options); - } - return node->create_subscription( - topic_name, - qos, - callback, - options); + return nullptr; } void bridge_topic(