Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add c++20 coroutines for detail signal and slot #106

Merged
merged 3 commits into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/compile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
build:
name: 'build and run tests'
runs-on: ubuntu-22.04
container: ghcr.io/skaginn3x/skaginn3x/framework/tfc-toolchain:sha-7315878
container: ghcr.io/skaginn3x/skaginn3x/framework/tfc-toolchain:sha-a9cc12c
strategy:
fail-fast: false
matrix:
Expand All @@ -33,7 +33,7 @@ jobs:
uses: lukka/run-vcpkg@v11
with:
vcpkgDirectory: '/opt/vcpkg'
vcpkgGitCommitId: 'dc6188d0eb3f9fd7b53f4e21b1878ea868e34c02'
vcpkgGitCommitId: 'fbc868ee5e12e3e81f464104be246ec06553c274'
vcpkgJsonGlob: 'vcpkg.json'

- name: Run CMake consuming CMakePreset.json and vcpkg.json by mean of vcpkg.
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ jobs:
build:
name: 'build documentation'
runs-on: ubuntu-latest
container: ghcr.io/skaginn3x/skaginn3x/framework/tfc-toolchain:sha-7315878
container: ghcr.io/skaginn3x/skaginn3x/framework/tfc-toolchain:sha-a9cc12c
steps:
- uses: actions/checkout@v3
- uses: lukka/get-cmake@latest
- name: Restore artifacts, or setup vcpkg (do not install any package)
uses: lukka/run-vcpkg@v11
with:
vcpkgDirectory: '/opt/vcpkg'
vcpkgGitCommitId: 'dc6188d0eb3f9fd7b53f4e21b1878ea868e34c02'
vcpkgGitCommitId: 'fbc868ee5e12e3e81f464104be246ec06553c274'
vcpkgJsonGlob: 'vcpkg.json'

- name: Build
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/tfc-toolchain.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
paths:
- '.github/workflows/tfc-toolchain.yml'
- 'containers/tfc-toolchain.dockerfile'
workflow_dispatch:

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/ubuntu-packaging.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
build:
name: ubuntu packaging
runs-on: ubuntu-22.04
container: ghcr.io/skaginn3x/skaginn3x/framework/tfc-toolchain:sha-7315878
container: ghcr.io/skaginn3x/skaginn3x/framework/tfc-toolchain:sha-a9cc12c
strategy:
fail-fast: false
matrix:
Expand All @@ -35,7 +35,7 @@ jobs:
uses: lukka/run-vcpkg@v11
with:
vcpkgDirectory: '/opt/vcpkg'
vcpkgGitCommitId: 'dc6188d0eb3f9fd7b53f4e21b1878ea868e34c02'
vcpkgGitCommitId: 'fbc868ee5e12e3e81f464104be246ec06553c274'
vcpkgJsonGlob: 'vcpkg.json'

- name: Run CMake consuming CMakePreset.json and vcpkg.json by mean of vcpkg.
Expand Down
2 changes: 1 addition & 1 deletion cmake/tfc_options.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ if(ENABLE_SANITIZATION)
-fPIC
-fno-omit-frame-pointer
-g )
add_link_options( -lasan -lubsan -ltsan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not figure out why this needs to be removed?

add_link_options( -lasan -lubsan )
endif()
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
add_compile_options(
Expand Down
114 changes: 75 additions & 39 deletions libs/ipc/inc/public/tfc/ipc/details/impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@

#include <fmt/format.h>
#include <azmq/socket.hpp>
#include <boost/asio/compose.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/system/error_code.hpp>

#include <tfc/ipc/enums.hpp>
#include <tfc/ipc/packet.hpp>
#include <tfc/logger.hpp>
#include <tfc/progbase.hpp>
#include <tfc/utils/pragmas.hpp>
#include <tfc/utils/socket.hpp>
Expand Down Expand Up @@ -92,33 +92,54 @@ class signal : public transmission_base<type_desc>, public std::enable_shared_fr
/// @return std::error_code, empty if no error.
auto send(value_t const& value) -> std::error_code {
last_value_ = value;
packet_t packet{ .value = last_value_ };
const auto attempted_serialize{ packet_t::serialize(packet) };
if (!attempted_serialize.has_value()) {
return attempted_serialize.error();
std::vector<std::byte> send_buffer{};
if (auto serialize_err{ packet_t::serialize(last_value_, send_buffer) }) {
return serialize_err;
}
const auto serialized = attempted_serialize.value();
std::size_t size = socket_.send(asio::buffer(serialized, serialized.size()));
if (size != serialized.size()) {
// todo: create custom error codes and return here
std::terminate();
std::size_t size = socket_.send(asio::buffer(send_buffer));
if (size != send_buffer.size()) {
return std::make_error_code(std::errc::value_too_large);
}
return {};
}

/// @brief send value to subscriber
/// @tparam completion_token_t a concept of type void(std::error_code, std::size_t)
/// @param value is sent
auto async_send(value_t const& value, std::invocable<std::error_code, std::size_t> auto&& callback) -> void {
template <typename completion_token_t>
auto async_send(value_t const& value, completion_token_t&& token)
-> asio::async_result<std::decay_t<completion_token_t>, void(std::error_code, std::size_t)>::return_type {
last_value_ = value;
packet_t packet{ .value = last_value_ };
const auto attempted_serialize{ packet_t::serialize(packet) };
if (!attempted_serialize.has_value()) {
callback(attempted_serialize.error(), 0);
return;
std::unique_ptr<std::vector<std::byte>> send_buffer{ std::make_unique<std::vector<std::byte>>() };
if (auto serialize_error{ packet_t::serialize(last_value_, *send_buffer) }) {
return asio::async_compose<completion_token_t, void(std::error_code, std::size_t)>(
[serialize_error](auto& self, std::error_code = {}, std::size_t = 0) { self.complete(serialize_error, 0); },
token);
}
auto serialized = std::make_shared<std::vector<std::byte>>(attempted_serialize.value());
socket_.async_send(asio::buffer(serialized->data(), serialized->size()),
[callback, buffer = serialized](std::error_code error, size_t size) { callback(error, size); });

enum struct state_e { write, complete };

auto& socket{ socket_ };
return asio::async_compose<completion_token_t, void(std::error_code, std::size_t)>(
[&socket, buffer = std::move(send_buffer), state = state_e::write](auto& self, std::error_code err = {},
std::size_t bytes_sent = 0) mutable {
if (err) {
self.complete(err, bytes_sent);
return;
}
switch (state) {
case state_e::write: {
state = state_e::complete;
azmq::async_send(socket, asio::buffer(*buffer), std::move(self));
break;
}
case state_e::complete: {
self.complete(err, bytes_sent);
break;
}
}
},
token, socket_);
}

private:
Expand Down Expand Up @@ -181,7 +202,8 @@ template <typename type_desc>
class slot : public transmission_base<type_desc> {
public:
using value_t = typename type_desc::value_t;
using packet_t = packet<value_t, type_desc::value_e>;
static auto constexpr value_e{ type_desc::value_e };
using packet_t = packet<value_t, value_e>;
static auto constexpr direction_v = direction_e::slot;

[[nodiscard]] static auto create(asio::io_context& ctx, std::string_view name) -> std::shared_ptr<slot<type_desc>> {
Expand All @@ -197,12 +219,10 @@ class slot : public transmission_base<type_desc> {
socket_ = azmq::sub_socket(socket_.get_io_context(), true);
boost::system::error_code error_code;
std::string const socket_path{ utils::socket::zmq::ipc_endpoint_str(signal_name) };
socket_.connect(socket_path, error_code);
if (error_code) {
if (socket_.connect(socket_path, error_code)) {
return error_code;
}
socket_.set_option(azmq::socket::subscribe(""), error_code);
if (error_code) {
if (socket_.set_option(azmq::socket::subscribe(""), error_code)) {
return error_code;
}
return {};
Expand All @@ -224,25 +244,41 @@ class slot : public transmission_base<type_desc> {
return packet_t::deserialize(std::span(buffer.data(), bytes_received)).value;
}

/**
* @brief schedule an async_read of the slot
* */
auto async_receive(auto callback) -> void {
socket_.async_receive(
[callback](std::error_code const& err_code, azmq::message& msg, size_t bytes_received) {
if (err_code) {
callback(std::unexpected(err_code));
/// \brief schedule an async_read on the slot
/// \tparam completion_token_t completion token in asio format, example a callback or coroutine handler
/// callback of format: void(std::expected<type_desc::value_t, std::error_code>)
/// coroutine either asio::awaitable<std::expected<value_t, std::error_code>> or
/// asio::experimental::coro<void, std::expected<value_t, std::error_code>>
template <typename completion_token_t>
auto async_receive(completion_token_t&& token)
-> asio::async_result<std::decay_t<completion_token_t>, void(std::expected<value_t, std::error_code>)>::return_type {
enum struct state_e { read, complete };

std::unique_ptr<std::vector<std::byte>> receive_buffer{ std::make_unique<std::vector<std::byte>>() };
receive_buffer->resize(4096, {});
// todo receive header first then value

azmq::sub_socket& socket{ socket_ };
return asio::async_compose<completion_token_t, void(std::expected<value_t, std::error_code>)>(
[&socket, state = state_e::read, buffer = std::move(receive_buffer)](auto& self, std::error_code err = {},
std::size_t bytes_received = 0) mutable {
if (err) {
self.complete(std::unexpected(err));
return;
}
if (bytes_received < packet_t::header_size()) {
// TODO: return some sane code here
// callback(std::unexpected({}));
return;
switch (state) {
case state_e::read: {
state = state_e::complete;
azmq::async_receive(socket, asio::buffer(*buffer), std::move(self));
break;
}
case state_e::complete: {
self.complete(packet_t::deserialize(std::span{ buffer->data(), bytes_received }));
break;
}
}
auto packet = packet_t::deserialize(std::span(static_cast<std::byte const*>(msg.data()), bytes_received));
callback(packet.value);
},
0);
token, socket_);
}

/**
Expand Down
101 changes: 57 additions & 44 deletions libs/ipc/inc/public/tfc/ipc/packet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,92 +16,105 @@ namespace tfc::ipc::details {
/// \brief Enum specifying protocol version
/// This can be changed in the future to retain backwards compatibility and still
/// be able to change the protocol structure
enum struct version_e : std::uint8_t { v0 };
enum struct version_e : std::uint8_t { unknown, v0 };

enum struct packt_errors_e {
inconsistent_buffer_size = 1,
template <type_e type_enum>
struct header_t {
static constexpr auto type_v{ type_enum };
version_e version{ version_e::v0 };
type_e type{ type_v };
std::size_t value_size{}; // populated in deserialize
// Todo crc
static constexpr auto size() -> std::size_t { return sizeof(version) + sizeof(type) + sizeof(value_size); }
static void serialize(header_t& header, auto&& buffer) {
std::copy_n(reinterpret_cast<std::byte*>(&header.version), sizeof(version), std::back_inserter(buffer));
std::copy_n(reinterpret_cast<std::byte*>(&header.type), sizeof(type), std::back_inserter(buffer));
std::copy_n(reinterpret_cast<std::byte*>(&header.value_size), sizeof(value_size), std::back_inserter(buffer));
}
static auto deserialize(header_t& result, auto&& buffer_iter) -> std::error_code {
std::copy_n(buffer_iter, sizeof(version), reinterpret_cast<std::byte*>(&result.version));
buffer_iter += sizeof(version);
std::copy_n(buffer_iter, sizeof(type), reinterpret_cast<std::byte*>(&result.type));
buffer_iter += sizeof(type);
std::copy_n(buffer_iter, sizeof(value_size), reinterpret_cast<std::byte*>(&result.value_size));
buffer_iter += sizeof(value_size);

if (result.type != type_v) {
return std::make_error_code(std::errc::wrong_protocol_type);
}
if (result.version != version_e::v0) {
return std::make_error_code(std::errc::wrong_protocol_type);
// TODO: explicit version error
}
return {};
}
};
static_assert(header_t<type_e::unknown>::size() == 10);

/// \brief packet struct to de/serialize data to socket
template <typename value_type, type_e type_enum>
struct packet {
using value_t = value_type;
static constexpr auto type_v{ type_enum };

version_e version{ version_e::v0 };
type_e type{ type_v };
std::size_t value_size{}; // populated in deserialize
// Todo crc
header_t<type_enum> header{};
value_t value{};

static constexpr auto header_size() -> std::size_t { return sizeof(version) + sizeof(type) + sizeof(value_size); }

// value size is populated
static auto serialize(packet& pack) -> std::expected<std::vector<std::byte>, std::error_code> {
static auto serialize(value_t const& value, std::vector<std::byte>& buffer) -> std::error_code {
header_t<type_enum> my_header{};

if constexpr (std::is_fundamental_v<value_t>) {
pack.value_size = sizeof(value_t);
my_header.value_size = sizeof(value_t);
} else {
static_assert(std::is_member_function_pointer_v<decltype(&value_t::size)>, "Serialize for value type not supported");
static_assert(std::is_same_v<decltype(value_t().size()), std::size_t>);
pack.value_size = pack.value.size();
my_header.value_size = value.size();
}

const std::size_t buffer_size{ header_size() + pack.value_size };
std::vector<std::byte> buffer{};
const std::size_t buffer_size{ header_t<type_enum>::size() + my_header.value_size };
buffer.reserve(buffer_size);
std::copy_n(reinterpret_cast<std::byte*>(&pack.version), sizeof(version), std::back_inserter(buffer));
std::copy_n(reinterpret_cast<std::byte*>(&pack.type), sizeof(type), std::back_inserter(buffer));
std::copy_n(reinterpret_cast<std::byte*>(&pack.value_size), sizeof(value_size), std::back_inserter(buffer));
header_t<type_enum>::serialize(my_header, buffer);

if constexpr (std::is_fundamental_v<value_t>) {
std::copy_n(reinterpret_cast<std::byte*>(&pack.value), pack.value_size, std::back_inserter(buffer));
std::copy_n(reinterpret_cast<std::byte const*>(&value), my_header.value_size, std::back_inserter(buffer));
} else {
// has member function data
static_assert(std::is_pointer_v<decltype(pack.value.data())>);
static_assert(std::is_pointer_v<decltype(value.data())>);
// Todo why copy the data instead of creating a view into the data?
// we should use std::span
std::copy_n(reinterpret_cast<std::byte*>(pack.value.data()), pack.value_size, std::back_inserter(buffer));
std::copy_n(reinterpret_cast<std::byte const*>(value.data()), my_header.value_size, std::back_inserter(buffer));
}

if (buffer.size() != buffer_size) {
// todo: Create custom error codes
std::terminate();
// return std::unexpected(packt_errors_e::inconsistent_buffer_size);
return std::make_error_code(std::errc::message_size);
}
return buffer;
return {};
}

static constexpr auto deserialize(std::ranges::view auto buffer) -> packet<value_t, type_v> {
static constexpr auto deserialize(std::ranges::view auto&& buffer) -> std::expected<value_t, std::error_code> {
if (buffer.size() < header_t<type_enum>::size()) {
return std::unexpected(std::make_error_code(std::errc::message_size));
}

packet<value_t, type_v> result{};
auto buffer_iter{ std::begin(buffer) };
std::copy_n(buffer_iter, sizeof(version), reinterpret_cast<std::byte*>(&result.version));
buffer_iter += sizeof(version);
std::copy_n(buffer_iter, sizeof(type), reinterpret_cast<std::byte*>(&result.type));
buffer_iter += sizeof(type);
std::copy_n(buffer_iter, sizeof(value_size), reinterpret_cast<std::byte*>(&result.value_size));
buffer_iter += sizeof(value_size);

if (result.type != type_v) {
// TODO: Return error
}
if (result.version != version_e::v0) {
// TODO: Return error
}
header_t<type_enum>::deserialize(result.header, buffer_iter);

// todo partial buffer?
if (buffer.size() != header_size() + result.value_size) {
throw std::runtime_error("Inconsistent buffer size");
if (buffer.size() != header_t<type_enum>::size() + result.header.value_size) {
return std::unexpected(std::make_error_code(std::errc::message_size));
}

if constexpr (std::is_fundamental_v<value_t>) {
static_assert(sizeof(value_t) <= 8);
std::copy_n(buffer_iter, result.value_size, reinterpret_cast<std::byte*>(&result.value));
std::copy_n(buffer_iter, result.header.value_size, reinterpret_cast<std::byte*>(&result.value));
} else {
// has member function data
result.value.resize(result.value_size);
std::copy_n(buffer_iter, result.value_size, reinterpret_cast<std::byte*>(result.value.data()));
result.value.resize(result.header.value_size);
std::copy_n(buffer_iter, result.header.value_size, reinterpret_cast<std::byte*>(result.value.data()));
}
return result;
return std::move(result.value);
}
};

Expand Down
3 changes: 3 additions & 0 deletions libs/ipc/tests/examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ target_link_libraries(ipc_connector_example PRIVATE tfc::ipc)

tfc_add_example(print_signals_and_slots print_signals_and_slots.cpp)
target_link_libraries(print_signals_and_slots PRIVATE tfc::base tfc::ipc)

tfc_add_example_no_test(ipc_coroutines ipc_coroutines.cpp)
target_link_libraries(ipc_coroutines PRIVATE tfc::base tfc::ipc)
Loading
Loading