Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try reconnecting shut down connections #106

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 160 additions & 0 deletions include/ur_client_library/comm/persistent_stream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Copyright 2022, FZI Forschungszentrum Informatik (templating)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once
#include <chrono>
#include <atomic>
#include <thread>
#include "ur_client_library/comm/stream.h"

namespace urcl
{
namespace comm
{
/*! \brief A special URStream that can perform reconnects as needed
*
*/
template <typename T>
class URPersistentStream : public URStream<T>
{
public:
/*!
* \brief Creates a new persistent stream.
*
* As with a normal URStream object, this does not immediately open the socket. This has to be done manually using the
* connect() function.
*
* \param host IP address of the remote host
* \param port Port on which the socket shall be connected
*/
URPersistentStream(const std::string& host, int port)
: URStream<T>(host, port), timeout_(std::chrono::seconds(1)), stop_(false)
{
}

/*!
* \brief Stop all currently running reconnect attempts
*/
~URPersistentStream();

URPersistentStream(const URPersistentStream&) = delete;
URPersistentStream& operator=(const URPersistentStream&) = delete;

/*! \brief Reads a full UR package from the socket, trying to reconnect if required.
*
* This is done by repeatedly reading bytes from the socket until the number of bytes occupied by a single package
* are received. In case of a socket disconnect during package transimission, the already received segment is
* discarded.
*
* \param[out] buf The byte buffer where the content shall be stored
* \param[in] buf_len Maximum number of bytes that can be stored in buf
* \param[out] read Number of bytes actually read from the socket
*
* \returns Success of the operation
*/
bool read(uint8_t* buf, const size_t buf_len, size_t& read);

/*! \brief Write directly to the underlying socket, trying to reconnect if required
*
* If a socket disconnect occurs during transmission, the already sent data is discarded and the complete write
* operation is repeated after the reconnect.
*
* \param[in] buf Byte stream that should be sent
* \param[in] buf_len Number of bytes in buf to be sent
* \param[out] written Number of bytes actually written to the socket
*
* \returns Success of the operation
*/
bool write(const uint8_t* buf, const size_t buf_len, size_t& written);

private:
bool tryReconnect();

std::atomic<std::chrono::seconds> timeout_;
std::atomic<bool> stop_;
};

template <typename T>
URPersistentStream<T>::~URPersistentStream()
{
stop_.store(true);
}

template <typename T>
bool URPersistentStream<T>::read(uint8_t* buf, const size_t buf_len, size_t& read)
{
while (!stop_.load())
{
if (URStream<T>::read(buf, buf_len, read))
{
timeout_.store(std::chrono::seconds(1));
return true;
}

if (!tryReconnect())
{
return false;
}
}

return false;
}

template <typename T>
bool URPersistentStream<T>::write(const uint8_t* buf, const size_t buf_len, size_t& written)
{
while (!stop_.load())
{
if (URStream<T>::write(buf, buf_len, written))
{
timeout_.store(std::chrono::seconds(1));
return true;
}

if (!tryReconnect())
{
return false;
}
}

return false;
}

template <typename T>
bool URPersistentStream<T>::tryReconnect()
{
if (URStream<T>::closed())
{
return false;
}

auto cur_timeout = timeout_.load();
URCL_LOG_WARN("Reconnecting in %ld seconds...", cur_timeout.count());
std::this_thread::sleep_for(cur_timeout);

if (URStream<T>::connect())
{
timeout_.store(std::chrono::seconds(1));
}
else if (cur_timeout <= std::chrono::seconds(60))
{
timeout_.compare_exchange_strong(cur_timeout, 2 * cur_timeout);
}

return true;
}
} // namespace comm
} // namespace urcl
5 changes: 3 additions & 2 deletions include/ur_client_library/ur/ur_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include <functional>

#include "ur_client_library/comm/persistent_stream.h"
#include "ur_client_library/rtde/rtde_client.h"
#include "ur_client_library/control/reverse_interface.h"
#include "ur_client_library/control/trajectory_point_interface.h"
Expand Down Expand Up @@ -321,8 +322,8 @@ class UrDriver
std::unique_ptr<control::ReverseInterface> reverse_interface_;
std::unique_ptr<control::TrajectoryPointInterface> trajectory_interface_;
std::unique_ptr<control::ScriptSender> script_sender_;
std::unique_ptr<comm::URStream<primary_interface::PrimaryPackage>> primary_stream_;
std::unique_ptr<comm::URStream<primary_interface::PrimaryPackage>> secondary_stream_;
std::unique_ptr<comm::URPersistentStream<primary_interface::PrimaryPackage>> primary_stream_;
std::unique_ptr<comm::URPersistentStream<primary_interface::PrimaryPackage>> secondary_stream_;

double servoj_time_;
uint32_t servoj_gain_;
Expand Down
12 changes: 10 additions & 2 deletions src/comm/tcp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,11 +321,19 @@ bool TCPServer::write(const int fd, const uint8_t* buf, const size_t buf_len, si
// handle partial sends
while (written < buf_len)
{
ssize_t sent = ::send(fd, buf + written, remaining, 0);
ssize_t sent = ::send(fd, buf + written, remaining, MSG_NOSIGNAL);

if (sent <= 0)
{
URCL_LOG_ERROR("Sending data through socket failed.");
if (errno == EPIPE)
{
URCL_LOG_ERROR("Sending data through socket failed because the connection was shut down");
handleDisconnect(fd);
}
else
{
URCL_LOG_ERROR("Sending data through socket failed");
}
return false;
}

Expand Down
12 changes: 10 additions & 2 deletions src/comm/tcp_socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,19 @@ bool TCPSocket::write(const uint8_t* buf, const size_t buf_len, size_t& written)
// handle partial sends
while (written < buf_len)
{
ssize_t sent = ::send(socket_fd_, buf + written, remaining, 0);
ssize_t sent = ::send(socket_fd_, buf + written, remaining, MSG_NOSIGNAL);

if (sent <= 0)
{
URCL_LOG_ERROR("Sending data through socket failed.");
if (errno == EPIPE)
{
state_ = SocketState::Disconnected;
URCL_LOG_ERROR("Sending data through socket failed because the connection was shut down");
}
else
{
URCL_LOG_ERROR("Sending data through socket failed");
}
return false;
}

Expand Down
8 changes: 4 additions & 4 deletions src/ur/ur_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ urcl::UrDriver::UrDriver(const std::string& robot_ip, const std::string& script_
URCL_LOG_DEBUG("Initializing RTDE client");
rtde_client_.reset(new rtde_interface::RTDEClient(robot_ip_, notifier_, output_recipe_file, input_recipe_file));

primary_stream_.reset(
new comm::URStream<primary_interface::PrimaryPackage>(robot_ip_, urcl::primary_interface::UR_PRIMARY_PORT));
secondary_stream_.reset(
new comm::URStream<primary_interface::PrimaryPackage>(robot_ip_, urcl::primary_interface::UR_SECONDARY_PORT));
primary_stream_.reset(new comm::URPersistentStream<primary_interface::PrimaryPackage>(
robot_ip_, urcl::primary_interface::UR_PRIMARY_PORT));
secondary_stream_.reset(new comm::URPersistentStream<primary_interface::PrimaryPackage>(
robot_ip_, urcl::primary_interface::UR_SECONDARY_PORT));
secondary_stream_->connect();

non_blocking_read_ = non_blocking_read;
Expand Down