Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix bidirectional bridge looping over the messages with fastdds #79

Open
wants to merge 1 commit into
base: humble
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 165 additions & 45 deletions src/domain_bridge/domain_bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,87 @@

namespace domain_bridge
{
/// \internal
/**
* A hack, because GenericSubscription doesn't allow to define callback that takes the
* serialized message and the message type info at the same time. See issue #1604 of rclcpp.
*/
class GenericSubscription : public rclcpp::GenericSubscription
{
public:
template<typename AllocatorT = std::allocator<void>>
GenericSubscription(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::shared_ptr<rcpputils::SharedLibrary> ts_lib,
const std::string & topic_name,
const std::string & topic_type,
const rclcpp::QoS & qos,
std::function<void(std::shared_ptr<rclcpp::SerializedMessage>, rclcpp::MessageInfo)> callback,
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options)
: rclcpp::GenericSubscription(
node_base, ts_lib, topic_name, topic_type, qos,
[](std::shared_ptr<rclcpp::SerializedMessage>) {}, options), callback_(callback)
{
}

void handle_serialized_message(
const std::shared_ptr<rclcpp::SerializedMessage> & serialized_message,
const rclcpp::MessageInfo & message_info) override
{
callback_(serialized_message, message_info);
}

private:
std::function<void(std::shared_ptr<rclcpp::SerializedMessage>, rclcpp::MessageInfo)> callback_;
};

template<typename AllocatorT = std::allocator<void>>
std::shared_ptr<GenericSubscription> create_generic_subscription(
rclcpp::node_interfaces::NodeTopicsInterface::SharedPtr topics_interface,
const std::string & topic_name,
const std::string & topic_type,
const rclcpp::QoS & qos,
std::function<void(std::shared_ptr<rclcpp::SerializedMessage>, rclcpp::MessageInfo)> callback,
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options = (
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>()
)
)
{
auto ts_lib = rclcpp::get_typesupport_library(
topic_type, "rosidl_typesupport_cpp");

auto subscription = std::make_shared<GenericSubscription>(
topics_interface->get_node_base_interface(),
std::move(ts_lib),
topic_name,
topic_type,
qos,
callback,
options);

topics_interface->add_subscription(subscription, options.callback_group);

return subscription;
}

template<typename AllocatorT>
std::shared_ptr<rclcpp::GenericSubscription> create_generic_subscription(
rclcpp::Node::SharedPtr node,
const std::string & topic_name,
const std::string & topic_type,
const rclcpp::QoS & qos,
std::function<void(std::shared_ptr<rclcpp::SerializedMessage>, rclcpp::MessageInfo)> callback,
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options)
{
return create_generic_subscription(
node->get_node_topics_interface(),
rclcpp::extend_name_with_sub_namespace(topic_name, node->get_sub_namespace()),
topic_type,
qos,
std::move(callback),
options
);
}

/// \internal
/**
Expand Down Expand Up @@ -76,6 +157,11 @@ class SerializedPublisher
((*impl_).*publish_method_pointer_)(message); // this is a bit horrible, but it works ...
}

std::shared_ptr<rclcpp::PublisherBase> get_base_publisher() const
{
return impl_;
}

private:
std::shared_ptr<rclcpp::PublisherBase> impl_;
using PointerToMemberMethod =
Expand Down Expand Up @@ -234,6 +320,17 @@ class DomainBridgeImpl
return publisher;
}

/// Check if there is a pulisher with the given gid
bool has_publisher_with_gid(const rmw_gid_t & _publisher_gid)
{
for (auto it = bridged_topics_.begin(); it != bridged_topics_.end(); ++it) {
if (*it->second.first->get_base_publisher() == _publisher_gid) {
Copy link

@AdamGoertz AdamGoertz May 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking into this, we were having problems with this issue and this PR helped us. I noticed one issue when we tried using this; we got a segmentation fault on this line. I haven't looked into the root cause, but adding this patch fixed the issue.

Suggested change
if (*it->second.first->get_base_publisher() == _publisher_gid) {
if (it->second.first && *it->second.first->get_base_publisher() == _publisher_gid) {

return true;
}
}
return false;
}

std::shared_ptr<rclcpp::SubscriptionBase> create_subscription(
rclcpp::Node::SharedPtr node,
std::shared_ptr<SerializedPublisher> publisher,
Expand All @@ -242,60 +339,83 @@ class DomainBridgeImpl
const rclcpp::QoS & qos,
rclcpp::SubscriptionOptionsWithAllocator<std::allocator<void>> & options)
{
std::function<void(std::shared_ptr<rclcpp::SerializedMessage>)> callback;
std::function<
void(std::shared_ptr<rclcpp::SerializedMessage>, const rclcpp::MessageInfo &)> callback;
switch (options_.mode()) {
case DomainBridgeOptions::Mode::Compress:
callback = [
serializer = rclcpp::Serialization<domain_bridge::msg::CompressedMsg>{},
publisher,
cctx = cctx_.get()
](std::shared_ptr<rclcpp::SerializedMessage> msg)
{
// Publish message into the other domain
domain_bridge::msg::CompressedMsg compressed_msg;
compressed_msg.data = domain_bridge::compress_message(cctx, std::move(*msg));
rclcpp::SerializedMessage serialized_compressed_msg;
serializer.serialize_message(&compressed_msg, &serialized_compressed_msg);
publisher->publish(serialized_compressed_msg);
};
{
std::function<
void(std::shared_ptr<rclcpp::SerializedMessage>)> callback = [
serializer = rclcpp::Serialization<domain_bridge::msg::CompressedMsg>{},
publisher,
cctx = cctx_.get()
](std::shared_ptr<rclcpp::SerializedMessage> msg)
{
// Publish message into the other domain
domain_bridge::msg::CompressedMsg compressed_msg;
compressed_msg.data = domain_bridge::compress_message(cctx, std::move(*msg));
rclcpp::SerializedMessage serialized_compressed_msg;
serializer.serialize_message(&compressed_msg, &serialized_compressed_msg);
publisher->publish(serialized_compressed_msg);
};
return node->create_generic_subscription(
topic_name,
type,
qos,
callback,
options);
}
break;
case DomainBridgeOptions::Mode::Decompress:
callback = [
serializer = rclcpp::Serialization<domain_bridge::msg::CompressedMsg>{},
publisher,
dctx = dctx_.get()
](std::shared_ptr<rclcpp::SerializedMessage> serialized_compressed_msg)
{
// Publish message into the other domain
domain_bridge::msg::CompressedMsg compressed_msg;
serializer.deserialize_message(serialized_compressed_msg.get(), &compressed_msg);
rclcpp::SerializedMessage msg = domain_bridge::decompress_message(
dctx, std::move(compressed_msg.data));
publisher->publish(msg);
};
{
std::function<
void(std::shared_ptr<rclcpp::SerializedMessage>)> callback = [
serializer = rclcpp::Serialization<domain_bridge::msg::CompressedMsg>{},
publisher,
dctx = dctx_.get()
](std::shared_ptr<
rclcpp::SerializedMessage> serialized_compressed_msg)
{
// Publish message into the other domain
domain_bridge::msg::CompressedMsg compressed_msg;
serializer.deserialize_message(serialized_compressed_msg.get(), &compressed_msg);
rclcpp::SerializedMessage msg = domain_bridge::decompress_message(
dctx, std::move(compressed_msg.data));
publisher->publish(msg);
};
return node->create_subscription<domain_bridge::msg::CompressedMsg>(
topic_name,
qos,
callback,
options);
}
break;
default: // fallthrough
case DomainBridgeOptions::Mode::Normal:
callback = [publisher](std::shared_ptr<rclcpp::SerializedMessage> msg) {
// Publish message into the other domain
publisher->publish(*msg);
};
{
std::function<
void(std::shared_ptr<rclcpp::SerializedMessage>,
const rclcpp::MessageInfo &)>
callback = [publisher, this](
std::shared_ptr<rclcpp::SerializedMessage> msg,
const rclcpp::MessageInfo & _message_info) {
// Check if the message comes from this bridge
if (!has_publisher_with_gid(_message_info.get_rmw_message_info().publisher_gid)) {
// Publish message into the other domain
publisher->publish(*msg);
}
};
return create_generic_subscription(
node,
topic_name,
type,
qos,
callback,
options);
}
break;
}
if (options_.mode() != DomainBridgeOptions::Mode::Decompress) {
// Create subscription
return node->create_generic_subscription(
topic_name,
type,
qos,
callback,
options);
}
return node->create_subscription<domain_bridge::msg::CompressedMsg>(
topic_name,
qos,
callback,
options);
return nullptr;
}

void bridge_topic(
Expand Down