diff --git a/.github/workflows/compile.yml b/.github/workflows/compile.yml index 6c1505eb16..f3cba3e7a2 100644 --- a/.github/workflows/compile.yml +++ b/.github/workflows/compile.yml @@ -17,7 +17,7 @@ jobs: build: name: 'build and run tests' runs-on: ubuntu-22.04 - container: ghcr.io/skaginn3x/skaginn3x/framework/tfc-toolchain:sha-7315878 + container: ghcr.io/skaginn3x/skaginn3x/framework/tfc-toolchain:sha-a9cc12c strategy: fail-fast: false matrix: @@ -33,7 +33,7 @@ jobs: uses: lukka/run-vcpkg@v11 with: vcpkgDirectory: '/opt/vcpkg' - vcpkgGitCommitId: 'dc6188d0eb3f9fd7b53f4e21b1878ea868e34c02' + vcpkgGitCommitId: 'fbc868ee5e12e3e81f464104be246ec06553c274' vcpkgJsonGlob: 'vcpkg.json' - name: Run CMake consuming CMakePreset.json and vcpkg.json by mean of vcpkg. diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 1fe56a5ccf..b7e6d985b3 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -10,7 +10,7 @@ jobs: build: name: 'build documentation' runs-on: ubuntu-latest - container: ghcr.io/skaginn3x/skaginn3x/framework/tfc-toolchain:sha-7315878 + container: ghcr.io/skaginn3x/skaginn3x/framework/tfc-toolchain:sha-a9cc12c steps: - uses: actions/checkout@v3 - uses: lukka/get-cmake@latest @@ -18,7 +18,7 @@ jobs: uses: lukka/run-vcpkg@v11 with: vcpkgDirectory: '/opt/vcpkg' - vcpkgGitCommitId: 'dc6188d0eb3f9fd7b53f4e21b1878ea868e34c02' + vcpkgGitCommitId: 'fbc868ee5e12e3e81f464104be246ec06553c274' vcpkgJsonGlob: 'vcpkg.json' - name: Build diff --git a/.github/workflows/tfc-toolchain.yml b/.github/workflows/tfc-toolchain.yml index 7093f1933d..a61ff1248d 100644 --- a/.github/workflows/tfc-toolchain.yml +++ b/.github/workflows/tfc-toolchain.yml @@ -4,6 +4,7 @@ on: paths: - '.github/workflows/tfc-toolchain.yml' - 'containers/tfc-toolchain.dockerfile' + workflow_dispatch: concurrency: group: ${{ github.workflow }}-${{ github.ref }} diff --git a/.github/workflows/ubuntu-packaging.yml b/.github/workflows/ubuntu-packaging.yml index 8d32343775..b211315975 100644 --- a/.github/workflows/ubuntu-packaging.yml +++ b/.github/workflows/ubuntu-packaging.yml @@ -17,7 +17,7 @@ jobs: build: name: ubuntu packaging runs-on: ubuntu-22.04 - container: ghcr.io/skaginn3x/skaginn3x/framework/tfc-toolchain:sha-7315878 + container: ghcr.io/skaginn3x/skaginn3x/framework/tfc-toolchain:sha-a9cc12c strategy: fail-fast: false matrix: @@ -35,7 +35,7 @@ jobs: uses: lukka/run-vcpkg@v11 with: vcpkgDirectory: '/opt/vcpkg' - vcpkgGitCommitId: 'dc6188d0eb3f9fd7b53f4e21b1878ea868e34c02' + vcpkgGitCommitId: 'fbc868ee5e12e3e81f464104be246ec06553c274' vcpkgJsonGlob: 'vcpkg.json' - name: Run CMake consuming CMakePreset.json and vcpkg.json by mean of vcpkg. diff --git a/cmake/tfc_options.cmake b/cmake/tfc_options.cmake index 626088f123..c97731877e 100644 --- a/cmake/tfc_options.cmake +++ b/cmake/tfc_options.cmake @@ -58,7 +58,7 @@ if(ENABLE_SANITIZATION) -fPIC -fno-omit-frame-pointer -g ) - add_link_options( -lasan -lubsan -ltsan) + add_link_options( -lasan -lubsan ) endif() if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") add_compile_options( diff --git a/libs/ipc/inc/public/tfc/ipc/details/impl.hpp b/libs/ipc/inc/public/tfc/ipc/details/impl.hpp index 3001a82ded..325a601669 100644 --- a/libs/ipc/inc/public/tfc/ipc/details/impl.hpp +++ b/libs/ipc/inc/public/tfc/ipc/details/impl.hpp @@ -12,13 +12,13 @@ #include #include +#include #include #include #include #include #include -#include #include #include #include @@ -92,33 +92,54 @@ class signal : public transmission_base, public std::enable_shared_fr /// @return std::error_code, empty if no error. auto send(value_t const& value) -> std::error_code { last_value_ = value; - packet_t packet{ .value = last_value_ }; - const auto attempted_serialize{ packet_t::serialize(packet) }; - if (!attempted_serialize.has_value()) { - return attempted_serialize.error(); + std::vector send_buffer{}; + if (auto serialize_err{ packet_t::serialize(last_value_, send_buffer) }) { + return serialize_err; } - const auto serialized = attempted_serialize.value(); - std::size_t size = socket_.send(asio::buffer(serialized, serialized.size())); - if (size != serialized.size()) { - // todo: create custom error codes and return here - std::terminate(); + std::size_t size = socket_.send(asio::buffer(send_buffer)); + if (size != send_buffer.size()) { + return std::make_error_code(std::errc::value_too_large); } return {}; } /// @brief send value to subscriber + /// @tparam completion_token_t a concept of type void(std::error_code, std::size_t) /// @param value is sent - auto async_send(value_t const& value, std::invocable auto&& callback) -> void { + template + auto async_send(value_t const& value, completion_token_t&& token) + -> asio::async_result, void(std::error_code, std::size_t)>::return_type { last_value_ = value; - packet_t packet{ .value = last_value_ }; - const auto attempted_serialize{ packet_t::serialize(packet) }; - if (!attempted_serialize.has_value()) { - callback(attempted_serialize.error(), 0); - return; + std::unique_ptr> send_buffer{ std::make_unique>() }; + if (auto serialize_error{ packet_t::serialize(last_value_, *send_buffer) }) { + return asio::async_compose( + [serialize_error](auto& self, std::error_code = {}, std::size_t = 0) { self.complete(serialize_error, 0); }, + token); } - auto serialized = std::make_shared>(attempted_serialize.value()); - socket_.async_send(asio::buffer(serialized->data(), serialized->size()), - [callback, buffer = serialized](std::error_code error, size_t size) { callback(error, size); }); + + enum struct state_e { write, complete }; + + auto& socket{ socket_ }; + return asio::async_compose( + [&socket, buffer = std::move(send_buffer), state = state_e::write](auto& self, std::error_code err = {}, + std::size_t bytes_sent = 0) mutable { + if (err) { + self.complete(err, bytes_sent); + return; + } + switch (state) { + case state_e::write: { + state = state_e::complete; + azmq::async_send(socket, asio::buffer(*buffer), std::move(self)); + break; + } + case state_e::complete: { + self.complete(err, bytes_sent); + break; + } + } + }, + token, socket_); } private: @@ -181,7 +202,8 @@ template class slot : public transmission_base { public: using value_t = typename type_desc::value_t; - using packet_t = packet; + static auto constexpr value_e{ type_desc::value_e }; + using packet_t = packet; static auto constexpr direction_v = direction_e::slot; [[nodiscard]] static auto create(asio::io_context& ctx, std::string_view name) -> std::shared_ptr> { @@ -197,12 +219,10 @@ class slot : public transmission_base { socket_ = azmq::sub_socket(socket_.get_io_context(), true); boost::system::error_code error_code; std::string const socket_path{ utils::socket::zmq::ipc_endpoint_str(signal_name) }; - socket_.connect(socket_path, error_code); - if (error_code) { + if (socket_.connect(socket_path, error_code)) { return error_code; } - socket_.set_option(azmq::socket::subscribe(""), error_code); - if (error_code) { + if (socket_.set_option(azmq::socket::subscribe(""), error_code)) { return error_code; } return {}; @@ -224,25 +244,41 @@ class slot : public transmission_base { return packet_t::deserialize(std::span(buffer.data(), bytes_received)).value; } - /** - * @brief schedule an async_read of the slot - * */ - auto async_receive(auto callback) -> void { - socket_.async_receive( - [callback](std::error_code const& err_code, azmq::message& msg, size_t bytes_received) { - if (err_code) { - callback(std::unexpected(err_code)); + /// \brief schedule an async_read on the slot + /// \tparam completion_token_t completion token in asio format, example a callback or coroutine handler + /// callback of format: void(std::expected) + /// coroutine either asio::awaitable> or + /// asio::experimental::coro> + template + auto async_receive(completion_token_t&& token) + -> asio::async_result, void(std::expected)>::return_type { + enum struct state_e { read, complete }; + + std::unique_ptr> receive_buffer{ std::make_unique>() }; + receive_buffer->resize(4096, {}); + // todo receive header first then value + + azmq::sub_socket& socket{ socket_ }; + return asio::async_compose)>( + [&socket, state = state_e::read, buffer = std::move(receive_buffer)](auto& self, std::error_code err = {}, + std::size_t bytes_received = 0) mutable { + if (err) { + self.complete(std::unexpected(err)); return; } - if (bytes_received < packet_t::header_size()) { - // TODO: return some sane code here - // callback(std::unexpected({})); - return; + switch (state) { + case state_e::read: { + state = state_e::complete; + azmq::async_receive(socket, asio::buffer(*buffer), std::move(self)); + break; + } + case state_e::complete: { + self.complete(packet_t::deserialize(std::span{ buffer->data(), bytes_received })); + break; + } } - auto packet = packet_t::deserialize(std::span(static_cast(msg.data()), bytes_received)); - callback(packet.value); }, - 0); + token, socket_); } /** diff --git a/libs/ipc/inc/public/tfc/ipc/packet.hpp b/libs/ipc/inc/public/tfc/ipc/packet.hpp index 1e0e404a79..c2c5430e97 100644 --- a/libs/ipc/inc/public/tfc/ipc/packet.hpp +++ b/libs/ipc/inc/public/tfc/ipc/packet.hpp @@ -16,11 +16,40 @@ namespace tfc::ipc::details { /// \brief Enum specifying protocol version /// This can be changed in the future to retain backwards compatibility and still /// be able to change the protocol structure -enum struct version_e : std::uint8_t { v0 }; +enum struct version_e : std::uint8_t { unknown, v0 }; -enum struct packt_errors_e { - inconsistent_buffer_size = 1, +template +struct header_t { + static constexpr auto type_v{ type_enum }; + version_e version{ version_e::v0 }; + type_e type{ type_v }; + std::size_t value_size{}; // populated in deserialize + // Todo crc + static constexpr auto size() -> std::size_t { return sizeof(version) + sizeof(type) + sizeof(value_size); } + static void serialize(header_t& header, auto&& buffer) { + std::copy_n(reinterpret_cast(&header.version), sizeof(version), std::back_inserter(buffer)); + std::copy_n(reinterpret_cast(&header.type), sizeof(type), std::back_inserter(buffer)); + std::copy_n(reinterpret_cast(&header.value_size), sizeof(value_size), std::back_inserter(buffer)); + } + static auto deserialize(header_t& result, auto&& buffer_iter) -> std::error_code { + std::copy_n(buffer_iter, sizeof(version), reinterpret_cast(&result.version)); + buffer_iter += sizeof(version); + std::copy_n(buffer_iter, sizeof(type), reinterpret_cast(&result.type)); + buffer_iter += sizeof(type); + std::copy_n(buffer_iter, sizeof(value_size), reinterpret_cast(&result.value_size)); + buffer_iter += sizeof(value_size); + + if (result.type != type_v) { + return std::make_error_code(std::errc::wrong_protocol_type); + } + if (result.version != version_e::v0) { + return std::make_error_code(std::errc::wrong_protocol_type); + // TODO: explicit version error + } + return {}; + } }; +static_assert(header_t::size() == 10); /// \brief packet struct to de/serialize data to socket template @@ -28,80 +57,64 @@ struct packet { using value_t = value_type; static constexpr auto type_v{ type_enum }; - version_e version{ version_e::v0 }; - type_e type{ type_v }; - std::size_t value_size{}; // populated in deserialize - // Todo crc + header_t header{}; value_t value{}; - static constexpr auto header_size() -> std::size_t { return sizeof(version) + sizeof(type) + sizeof(value_size); } - // value size is populated - static auto serialize(packet& pack) -> std::expected, std::error_code> { + static auto serialize(value_t const& value, std::vector& buffer) -> std::error_code { + header_t my_header{}; + if constexpr (std::is_fundamental_v) { - pack.value_size = sizeof(value_t); + my_header.value_size = sizeof(value_t); } else { static_assert(std::is_member_function_pointer_v, "Serialize for value type not supported"); static_assert(std::is_same_v); - pack.value_size = pack.value.size(); + my_header.value_size = value.size(); } - const std::size_t buffer_size{ header_size() + pack.value_size }; - std::vector buffer{}; + const std::size_t buffer_size{ header_t::size() + my_header.value_size }; buffer.reserve(buffer_size); - std::copy_n(reinterpret_cast(&pack.version), sizeof(version), std::back_inserter(buffer)); - std::copy_n(reinterpret_cast(&pack.type), sizeof(type), std::back_inserter(buffer)); - std::copy_n(reinterpret_cast(&pack.value_size), sizeof(value_size), std::back_inserter(buffer)); + header_t::serialize(my_header, buffer); if constexpr (std::is_fundamental_v) { - std::copy_n(reinterpret_cast(&pack.value), pack.value_size, std::back_inserter(buffer)); + std::copy_n(reinterpret_cast(&value), my_header.value_size, std::back_inserter(buffer)); } else { // has member function data - static_assert(std::is_pointer_v); + static_assert(std::is_pointer_v); // Todo why copy the data instead of creating a view into the data? // we should use std::span - std::copy_n(reinterpret_cast(pack.value.data()), pack.value_size, std::back_inserter(buffer)); + std::copy_n(reinterpret_cast(value.data()), my_header.value_size, std::back_inserter(buffer)); } if (buffer.size() != buffer_size) { - // todo: Create custom error codes - std::terminate(); - // return std::unexpected(packt_errors_e::inconsistent_buffer_size); + return std::make_error_code(std::errc::message_size); } - return buffer; + return {}; } - static constexpr auto deserialize(std::ranges::view auto buffer) -> packet { + static constexpr auto deserialize(std::ranges::view auto&& buffer) -> std::expected { + if (buffer.size() < header_t::size()) { + return std::unexpected(std::make_error_code(std::errc::message_size)); + } + packet result{}; auto buffer_iter{ std::begin(buffer) }; - std::copy_n(buffer_iter, sizeof(version), reinterpret_cast(&result.version)); - buffer_iter += sizeof(version); - std::copy_n(buffer_iter, sizeof(type), reinterpret_cast(&result.type)); - buffer_iter += sizeof(type); - std::copy_n(buffer_iter, sizeof(value_size), reinterpret_cast(&result.value_size)); - buffer_iter += sizeof(value_size); - - if (result.type != type_v) { - // TODO: Return error - } - if (result.version != version_e::v0) { - // TODO: Return error - } + header_t::deserialize(result.header, buffer_iter); // todo partial buffer? - if (buffer.size() != header_size() + result.value_size) { - throw std::runtime_error("Inconsistent buffer size"); + if (buffer.size() != header_t::size() + result.header.value_size) { + return std::unexpected(std::make_error_code(std::errc::message_size)); } if constexpr (std::is_fundamental_v) { static_assert(sizeof(value_t) <= 8); - std::copy_n(buffer_iter, result.value_size, reinterpret_cast(&result.value)); + std::copy_n(buffer_iter, result.header.value_size, reinterpret_cast(&result.value)); } else { // has member function data - result.value.resize(result.value_size); - std::copy_n(buffer_iter, result.value_size, reinterpret_cast(result.value.data())); + result.value.resize(result.header.value_size); + std::copy_n(buffer_iter, result.header.value_size, reinterpret_cast(result.value.data())); } - return result; + return std::move(result.value); } }; diff --git a/libs/ipc/tests/examples/CMakeLists.txt b/libs/ipc/tests/examples/CMakeLists.txt index 198075cec2..73e6adb713 100644 --- a/libs/ipc/tests/examples/CMakeLists.txt +++ b/libs/ipc/tests/examples/CMakeLists.txt @@ -3,3 +3,6 @@ target_link_libraries(ipc_connector_example PRIVATE tfc::ipc) tfc_add_example(print_signals_and_slots print_signals_and_slots.cpp) target_link_libraries(print_signals_and_slots PRIVATE tfc::base tfc::ipc) + +tfc_add_example_no_test(ipc_coroutines ipc_coroutines.cpp) +target_link_libraries(ipc_coroutines PRIVATE tfc::base tfc::ipc) diff --git a/libs/ipc/tests/examples/ipc_coroutines.cpp b/libs/ipc/tests/examples/ipc_coroutines.cpp new file mode 100644 index 0000000000..abc36a7709 --- /dev/null +++ b/libs/ipc/tests/examples/ipc_coroutines.cpp @@ -0,0 +1,86 @@ +#include +#include +#include +#include + +#include +#include + +namespace asio = boost::asio; + +namespace { +auto timer_coro(asio::steady_timer& timer, std::shared_ptr> signal) + -> asio::awaitable { + bool send_value{ true }; + while (true) { + timer.expires_from_now(std::chrono::seconds{ 3 }); + co_await timer.async_wait(asio::use_awaitable); + co_await signal->async_send(send_value, asio::use_awaitable); + send_value = !send_value; + } +} + +auto slot_coro(tfc::ipc::details::slot& slot) -> asio::awaitable { + while (true) { + std::expected msg = co_await slot.async_receive(asio::use_awaitable); + if (msg) { + fmt::print("message={}\n", msg.value()); + } else { + fmt::print("error={}\n", msg.error().message()); + } + } +} + +asio::experimental::coro work(asio::io_context& ctx) { + asio::steady_timer timer{ ctx }; + timer.expires_from_now(std::chrono::seconds{ 3 }); + co_await timer.async_wait(asio::experimental::use_coro); + co_yield 42; + timer.expires_from_now(std::chrono::seconds{ 3 }); + co_await timer.async_wait(asio::experimental::use_coro); + co_yield 31; + co_return false; // done +} + +asio::experimental::coro testme(asio::io_context& ctx) { + asio::steady_timer timer{ ctx }; + auto work_awaitable = work(ctx); + while (true) { + auto return_val = co_await work_awaitable; + if (auto* foo = std::get_if(&return_val)) { + fmt::print("experimental value is: {}\n", *foo); + } else { // work done + fmt::print("Work done\n"); + break; + } + } + co_return 1337; +} + +} // namespace + +auto main(int argc, char** argv) -> int { + tfc::base::init(argc, argv); + + asio::io_context ctx{}; + + tfc::ipc::details::slot slot{ ctx, "my_name" }; + + asio::co_spawn(ctx, slot_coro(slot), asio::detached); + + auto signal{ tfc::ipc::details::signal::create(ctx, "your_name") }; + + slot.connect(signal.value()->name_w_type()); + + asio::steady_timer timer{ ctx }; + + asio::co_spawn(ctx, timer_coro(timer, signal.value()), asio::detached); + + asio::experimental::co_spawn(testme(ctx), [](std::exception_ptr const&, int return_val) { + fmt::print("Coroutine is finished with return value: {}\n", return_val); + }); + + ctx.run(); + + return EXIT_SUCCESS; +} diff --git a/libs/ipc/tests/ipc_manager_test.cpp b/libs/ipc/tests/ipc_manager_test.cpp index a05c3ca019..672398a20f 100644 --- a/libs/ipc/tests/ipc_manager_test.cpp +++ b/libs/ipc/tests/ipc_manager_test.cpp @@ -304,7 +304,7 @@ auto main(int argc, char** argv) -> int { } }); isolated_ctx.run_for(std::chrono::seconds(3)); - ut::expect(invocation == test_values.size()); + ut::expect(invocation == test_values.size()) << "got invoked: " << invocation; }; "Test ipc communication connection and disconnection with mocking int"_test = []() { asio::io_context isolated_ctx{}; diff --git a/libs/ipc/tests/ipc_test.cpp b/libs/ipc/tests/ipc_test.cpp index 5a18165a99..f7c95664b1 100644 --- a/libs/ipc/tests/ipc_test.cpp +++ b/libs/ipc/tests/ipc_test.cpp @@ -14,6 +14,8 @@ auto main(int, char**) -> int { using boost::ut::expect; using boost::ut::bdd::given; using boost::ut::bdd::when; + using boost::ut::operator>>; + using boost::ut::fatal; using tfc::ipc::details::packet; using tfc::ipc::details::type_e; @@ -21,17 +23,13 @@ auto main(int, char**) -> int { constexpr auto deserialize_serialize{ [](auto&& pack) { using pack_t = std::remove_cvref_t; using packet_t = packet; - const auto serialized = packet_t::serialize(pack); - expect(serialized.has_value()); - const auto& serialized_value = serialized.value(); - auto supposed_packet = packet_t::deserialize(std::span(std::cbegin(serialized_value), std::cend(serialized_value))); - expect(pack.version == supposed_packet.version); - expect(pack.type == supposed_packet.type); - expect(pack.type == pack_t::type_v); - expect(pack.value_size == supposed_packet.value_size); - expect(pack.value == supposed_packet.value); - - expect(pack == supposed_packet); + std::vector serialized{}; + auto err{ packet_t::serialize(pack.value, serialized) }; + expect(!err >> fatal); + auto supposed_value = packet_t::deserialize(std::span(std::cbegin(serialized), std::cend(serialized))); + + expect(supposed_value.has_value() >> fatal); + expect(pack.value == supposed_value.value()); } }; given("serialization") = [&deserialize_serialize] { when("bool=true") = [&deserialize_serialize] { deserialize_serialize(packet{ .value = true }); }; @@ -53,18 +51,16 @@ auto main(int, char**) -> int { using pack_t = std::remove_cvref_t; using packet_t = packet; - const auto serialized = packet_t::serialize(pack); - expect(serialized.has_value()); - const auto& serialized_value = serialized.value(); - auto supposed_packet = packet_t::deserialize(std::span(std::cbegin(serialized_value), std::cend(serialized_value))); - expect(pack.version == supposed_packet.version); - expect(pack.type == supposed_packet.type); - expect(pack.type == pack_t::type_v); - expect(pack.value_size == supposed_packet.value_size); + std::vector serialized{}; + auto err{ packet_t::serialize(pack.value, serialized) }; + expect(!err >> fatal); + auto supposed_value = packet_t::deserialize(std::span(std::cbegin(serialized), std::cend(serialized))); + + expect(supposed_value.has_value() >> fatal); // unsafe equal comparison of float // expect(pack.value == supposed_packet.value); // expect(pack == supposed_packet); - expect(pack.value > 4.2 && pack.value < 4.22); + expect(supposed_value.value() > 4.2 && supposed_value.value() < 4.22); }; when("string=hello world from another world") = [&deserialize_serialize] { deserialize_serialize(packet{ .value = "hello world from another world" }); diff --git a/vcpkg-configuration.json b/vcpkg-configuration.json index f6fc4ca8e1..d1afbfd492 100644 --- a/vcpkg-configuration.json +++ b/vcpkg-configuration.json @@ -2,14 +2,15 @@ "default-registry": { "kind": "git", "repository": "https://github.com/microsoft/vcpkg", - "baseline": "dc6188d0eb3f9fd7b53f4e21b1878ea868e34c02" + "baseline": "fbc868ee5e12e3e81f464104be246ec06553c274" }, "registries": [ { "kind": "git", "repository": "https://github.com/skaginn3x/vcpkg-registry", - "baseline": "2ddb70b48f0cebc94c605fa96f0ccd458f403ed0", + "baseline": "0a5015490425b3b31044f21a357cc4861e987806", "packages": [ + "azmq", "libcap", "libsystemd", "libxcrypt",