From f400dcf61b0033c055b27fe82efbc26cdfb2dc0b Mon Sep 17 00:00:00 2001 From: Robert Wilbrandt Date: Tue, 26 Jul 2022 10:20:49 +0200 Subject: [PATCH 1/2] Handle shut down connections in ::send calls --- src/comm/tcp_server.cpp | 12 ++++++++++-- src/comm/tcp_socket.cpp | 12 ++++++++++-- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/src/comm/tcp_server.cpp b/src/comm/tcp_server.cpp index 8fa7bd81..306049b9 100644 --- a/src/comm/tcp_server.cpp +++ b/src/comm/tcp_server.cpp @@ -321,11 +321,19 @@ bool TCPServer::write(const int fd, const uint8_t* buf, const size_t buf_len, si // handle partial sends while (written < buf_len) { - ssize_t sent = ::send(fd, buf + written, remaining, 0); + ssize_t sent = ::send(fd, buf + written, remaining, MSG_NOSIGNAL); if (sent <= 0) { - URCL_LOG_ERROR("Sending data through socket failed."); + if (errno == EPIPE) + { + URCL_LOG_ERROR("Sending data through socket failed because the connection was shut down"); + handleDisconnect(fd); + } + else + { + URCL_LOG_ERROR("Sending data through socket failed"); + } return false; } diff --git a/src/comm/tcp_socket.cpp b/src/comm/tcp_socket.cpp index 4421be3c..5067eba3 100644 --- a/src/comm/tcp_socket.cpp +++ b/src/comm/tcp_socket.cpp @@ -184,11 +184,19 @@ bool TCPSocket::write(const uint8_t* buf, const size_t buf_len, size_t& written) // handle partial sends while (written < buf_len) { - ssize_t sent = ::send(socket_fd_, buf + written, remaining, 0); + ssize_t sent = ::send(socket_fd_, buf + written, remaining, MSG_NOSIGNAL); if (sent <= 0) { - URCL_LOG_ERROR("Sending data through socket failed."); + if (errno == EPIPE) + { + state_ = SocketState::Disconnected; + URCL_LOG_ERROR("Sending data through socket failed because the connection was shut down"); + } + else + { + URCL_LOG_ERROR("Sending data through socket failed"); + } return false; } From cffe97e15344c101c89ac1f541e0425341af3279 Mon Sep 17 00:00:00 2001 From: Robert Wilbrandt Date: Mon, 1 Aug 2022 15:58:14 +0200 Subject: [PATCH 2/2] Add persistent stream that tries to reconnect sockets --- .../comm/persistent_stream.h | 160 ++++++++++++++++++ include/ur_client_library/ur/ur_driver.h | 5 +- src/ur/ur_driver.cpp | 8 +- 3 files changed, 167 insertions(+), 6 deletions(-) create mode 100644 include/ur_client_library/comm/persistent_stream.h diff --git a/include/ur_client_library/comm/persistent_stream.h b/include/ur_client_library/comm/persistent_stream.h new file mode 100644 index 00000000..5a482917 --- /dev/null +++ b/include/ur_client_library/comm/persistent_stream.h @@ -0,0 +1,160 @@ +/* + * Copyright 2022, FZI Forschungszentrum Informatik (templating) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +#include +#include "ur_client_library/comm/stream.h" + +namespace urcl +{ +namespace comm +{ +/*! \brief A special URStream that can perform reconnects as needed + * + */ +template +class URPersistentStream : public URStream +{ +public: + /*! + * \brief Creates a new persistent stream. + * + * As with a normal URStream object, this does not immediately open the socket. This has to be done manually using the + * connect() function. + * + * \param host IP address of the remote host + * \param port Port on which the socket shall be connected + */ + URPersistentStream(const std::string& host, int port) + : URStream(host, port), timeout_(std::chrono::seconds(1)), stop_(false) + { + } + + /*! + * \brief Stop all currently running reconnect attempts + */ + ~URPersistentStream(); + + URPersistentStream(const URPersistentStream&) = delete; + URPersistentStream& operator=(const URPersistentStream&) = delete; + + /*! \brief Reads a full UR package from the socket, trying to reconnect if required. + * + * This is done by repeatedly reading bytes from the socket until the number of bytes occupied by a single package + * are received. In case of a socket disconnect during package transimission, the already received segment is + * discarded. + * + * \param[out] buf The byte buffer where the content shall be stored + * \param[in] buf_len Maximum number of bytes that can be stored in buf + * \param[out] read Number of bytes actually read from the socket + * + * \returns Success of the operation + */ + bool read(uint8_t* buf, const size_t buf_len, size_t& read); + + /*! \brief Write directly to the underlying socket, trying to reconnect if required + * + * If a socket disconnect occurs during transmission, the already sent data is discarded and the complete write + * operation is repeated after the reconnect. + * + * \param[in] buf Byte stream that should be sent + * \param[in] buf_len Number of bytes in buf to be sent + * \param[out] written Number of bytes actually written to the socket + * + * \returns Success of the operation + */ + bool write(const uint8_t* buf, const size_t buf_len, size_t& written); + +private: + bool tryReconnect(); + + std::atomic timeout_; + std::atomic stop_; +}; + +template +URPersistentStream::~URPersistentStream() +{ + stop_.store(true); +} + +template +bool URPersistentStream::read(uint8_t* buf, const size_t buf_len, size_t& read) +{ + while (!stop_.load()) + { + if (URStream::read(buf, buf_len, read)) + { + timeout_.store(std::chrono::seconds(1)); + return true; + } + + if (!tryReconnect()) + { + return false; + } + } + + return false; +} + +template +bool URPersistentStream::write(const uint8_t* buf, const size_t buf_len, size_t& written) +{ + while (!stop_.load()) + { + if (URStream::write(buf, buf_len, written)) + { + timeout_.store(std::chrono::seconds(1)); + return true; + } + + if (!tryReconnect()) + { + return false; + } + } + + return false; +} + +template +bool URPersistentStream::tryReconnect() +{ + if (URStream::closed()) + { + return false; + } + + auto cur_timeout = timeout_.load(); + URCL_LOG_WARN("Reconnecting in %ld seconds...", cur_timeout.count()); + std::this_thread::sleep_for(cur_timeout); + + if (URStream::connect()) + { + timeout_.store(std::chrono::seconds(1)); + } + else if (cur_timeout <= std::chrono::seconds(60)) + { + timeout_.compare_exchange_strong(cur_timeout, 2 * cur_timeout); + } + + return true; +} +} // namespace comm +} // namespace urcl diff --git a/include/ur_client_library/ur/ur_driver.h b/include/ur_client_library/ur/ur_driver.h index 8ece337a..02f1ce6e 100644 --- a/include/ur_client_library/ur/ur_driver.h +++ b/include/ur_client_library/ur/ur_driver.h @@ -30,6 +30,7 @@ #include +#include "ur_client_library/comm/persistent_stream.h" #include "ur_client_library/rtde/rtde_client.h" #include "ur_client_library/control/reverse_interface.h" #include "ur_client_library/control/trajectory_point_interface.h" @@ -321,8 +322,8 @@ class UrDriver std::unique_ptr reverse_interface_; std::unique_ptr trajectory_interface_; std::unique_ptr script_sender_; - std::unique_ptr> primary_stream_; - std::unique_ptr> secondary_stream_; + std::unique_ptr> primary_stream_; + std::unique_ptr> secondary_stream_; double servoj_time_; uint32_t servoj_gain_; diff --git a/src/ur/ur_driver.cpp b/src/ur/ur_driver.cpp index 60cf46ba..3144d659 100644 --- a/src/ur/ur_driver.cpp +++ b/src/ur/ur_driver.cpp @@ -65,10 +65,10 @@ urcl::UrDriver::UrDriver(const std::string& robot_ip, const std::string& script_ URCL_LOG_DEBUG("Initializing RTDE client"); rtde_client_.reset(new rtde_interface::RTDEClient(robot_ip_, notifier_, output_recipe_file, input_recipe_file)); - primary_stream_.reset( - new comm::URStream(robot_ip_, urcl::primary_interface::UR_PRIMARY_PORT)); - secondary_stream_.reset( - new comm::URStream(robot_ip_, urcl::primary_interface::UR_SECONDARY_PORT)); + primary_stream_.reset(new comm::URPersistentStream( + robot_ip_, urcl::primary_interface::UR_PRIMARY_PORT)); + secondary_stream_.reset(new comm::URPersistentStream( + robot_ip_, urcl::primary_interface::UR_SECONDARY_PORT)); secondary_stream_->connect(); non_blocking_read_ = non_blocking_read;