diff --git a/exes/mqtt-broadcaster/CMakeLists.txt b/exes/mqtt-broadcaster/CMakeLists.txt index 00d5c42b72..62dc4d857e 100644 --- a/exes/mqtt-broadcaster/CMakeLists.txt +++ b/exes/mqtt-broadcaster/CMakeLists.txt @@ -1,14 +1,14 @@ - cmake_minimum_required(VERSION 3.21) - project(mqtt_broadcaster) - add_executable(mqtt-broadcaster "src/main.cpp") add_executable(mqtt-broadcaster-tests "tests/mqtt_broadcaster_tests.cpp") find_package(Boost 1.81.0 REQUIRED COMPONENTS log program_options) - # find_package(unofficial-libuuid CONFIG REQUIRED) - # find_package(eclipse-paho-mqtt-c CONFIG REQUIRED) - set(COMMON_LIBS + target_include_directories(mqtt-broadcaster + PUBLIC + ${CMAKE_CURRENT_SOURCE_DIR}/inc + ) + + target_link_libraries(mqtt-broadcaster tfc::base tfc::logger tfc::stx @@ -18,22 +18,34 @@ Boost::boost Boost::log Boost::program_options - # eclipse-paho-mqtt-c::paho-mqtt3as-static - # unofficial::UUID::uuid - ) + ) - set(COMMON_COMPILE_DEFINITIONS PUBLIC ASYNC_MQTT_USE_LOG) + target_compile_definitions(mqtt-broadcaster + PUBLIC + ASYNC_MQTT_USE_LOG + ) - set(COMMON_INCLUDE_DIRECTORIES + target_include_directories(mqtt-broadcaster-tests PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/inc ) - foreach (TARGET mqtt-broadcaster mqtt-broadcaster-tests) - target_compile_definitions(${TARGET} ${COMMON_COMPILE_DEFINITIONS}) - target_link_libraries(${TARGET} PUBLIC ${COMMON_LIBS}) - target_include_directories(${TARGET} ${COMMON_INCLUDE_DIRECTORIES}) - endforeach () + target_link_libraries(mqtt-broadcaster-tests + tfc::base + tfc::logger + tfc::stx + tfc::ipc + tfc::operation_mode + tfc::confman + Boost::boost + Boost::log + Boost::program_options + ) + + target_compile_definitions(mqtt-broadcaster-tests + PUBLIC + ASYNC_MQTT_USE_LOG + ) include(GNUInstallDirs) install( diff --git a/exes/mqtt-broadcaster/inc/config.hpp b/exes/mqtt-broadcaster/inc/config.hpp new file mode 100644 index 0000000000..53c2d70f6e --- /dev/null +++ b/exes/mqtt-broadcaster/inc/config.hpp @@ -0,0 +1,14 @@ +#pragma once + +#include +#include + +// File under /etc/tfc/mqtt-broadcaster/def/mqtt_broadcaster.json which lists the signals that are supposed to be published +// on the mqtt broker. Updating the signals in this file will cause the running program to stop and restart +struct config { + tfc::confman::observable> _allowed_topics{}; + struct glaze { + static constexpr auto value{ glz::object("_allowed_topics", &config::_allowed_topics) }; + static constexpr auto name{ "mqtt_broadcaster" }; + }; +}; \ No newline at end of file diff --git a/exes/mqtt-broadcaster/inc/mqtt_broadcaster.hpp b/exes/mqtt-broadcaster/inc/mqtt_broadcaster.hpp index 91d5db880e..0ce23aa10a 100644 --- a/exes/mqtt-broadcaster/inc/mqtt_broadcaster.hpp +++ b/exes/mqtt-broadcaster/inc/mqtt_broadcaster.hpp @@ -10,20 +10,37 @@ namespace asio = boost::asio; namespace details = tfc::ipc::details; -// File under /etc/tfc/mqtt-broadcaster/def/mqtt_broadcaster.json which lists the signals that are not supposed to be -// published on the mqtt broker. Updating the signals in this file will cause the running program to stop and restart -struct config { - tfc::confman::observable> _banned_topics{}; - struct glaze { - static constexpr auto value{ glz::object("_banned_topics", &config::_banned_topics) }; - static constexpr auto name{ "mqtt_broadcaster" }; - }; -}; - // In order to ease testing, types are injected into the class -template +template class mqtt_broadcaster { public: + // Constructor used for testing, enables injection of config + mqtt_broadcaster(asio::io_context& ctx, + std::string mqtt_address, + std::string mqtt_port, + std::string mqtt_username, + std::string mqtt_password, + ipc_client_type& ipc_client, + std::shared_ptr mqtt_client, + // TODO: the stub deletes if it is not passed by reference + config_manager& cfg) + : object_path_(tfc::dbus::make_dbus_path("ipc_ruler")), interface_name_(tfc::dbus::make_dbus_name("manager")), + mqtt_host_(std::move(mqtt_address)), mqtt_port_(std::move(mqtt_port)), mqtt_username_(std::move(mqtt_username)), + mqtt_password_(std::move(mqtt_password)), ctx_(ctx), ipc_client_(ipc_client), mqtt_client_(std::move(mqtt_client)), + logger_("mqtt_broadcaster"), + dbus_connection_(std::make_unique(ctx, tfc::dbus::sd_bus_open_system())), + signal_updates_(std::make_unique( + *dbus_connection_, + sdbusplus::bus::match::rules::propertiesChanged(object_path_, interface_name_), + std::bind_front(&mqtt_broadcaster::update_signals, this))), + config_(cfg) { + + std::cout << "topics: " << config_.string() << std::endl; + + asio::co_spawn(mqtt_client_->strand(), initialize(), asio::detached); + } + + // Normal constructor mqtt_broadcaster(asio::io_context& ctx, std::string mqtt_address, std::string mqtt_port, @@ -49,27 +66,29 @@ class mqtt_broadcaster { // Initial connect to the broker if (!connect_active_) { connect_active_ = true; + logger_.trace("Initial connect to MQTT broker"); co_await asio::co_spawn(mqtt_client_->strand(), connect_to_broker(), asio::use_awaitable); } - // When a new list of banned string arrives the program is restarted - config_.value()._banned_topics.observe([this](auto& new_conf, [[maybe_unused]] auto& old_conf) { - banned_signals_.clear(); + // When a new list of allowed string arrives the program is restarted + config_.value()._allowed_topics.observe([this](auto& new_conf, [[maybe_unused]] auto& old_conf) { + allowed_signals_.clear(); for (auto const& con : new_conf) { - banned_signals_.push_back(con); + allowed_signals_.push_back(con); } restart(); }); asio::co_spawn(mqtt_client_->strand(), tfc::base::exit_signals(ctx_), asio::detached); - banned_signals_ = config_.value()._banned_topics.value(); + allowed_signals_ = config_.value()._allowed_topics.value(); load_signals(); + } - // Connect the mqtt client to the broker. Only a single "thread" can run this at a time. The routine that calls this - // function must set the connect_active_ flag to true before calling this function and the function sets it to false when - // finished. + // Connect the mqtt client to the broker. This function can only be spawned once to avoid multiple spawns to open/close + // the socket at the same time. The routine that calls this function must set the connect_active_ flag to true before + // calling this function and the function sets it to false when finished. auto connect_to_broker() -> asio::awaitable { while (true) { try { @@ -86,16 +105,16 @@ class mqtt_broadcaster { logger_.trace("Sending Connect request to the MQTT broker"); - // Connect is initialized with clean start set to false, which means that the broker will keep the session alive even - // if the client disconnect, and it will keep that session alive for UINT_MAX seconds. - auto connect_packet = - async_mqtt::v5::connect_packet{ false, - std::chrono::seconds(100).count(), - async_mqtt::allocate_buffer("cid1"), - async_mqtt::nullopt, - async_mqtt::allocate_buffer(mqtt_username_), - async_mqtt::allocate_buffer(mqtt_password_), - { async_mqtt::property::session_expiry_interval{ UINT_MAX } } }; + // Connect is initialized with clean start set to false, which means that the broker will keep the session alive + // even if the client disconnects, and it will keep that session alive for UINT_MAX seconds. + auto connect_packet = async_mqtt::v5::connect_packet{ false, + std::chrono::seconds(100).count(), + async_mqtt::allocate_buffer("cid1"), + async_mqtt::nullopt, + async_mqtt::allocate_buffer(mqtt_username_), + async_mqtt::allocate_buffer(mqtt_password_), + { async_mqtt::property::session_expiry_interval{ + std::numeric_limits::max() } } }; co_await mqtt_client_->send(connect_packet, asio::use_awaitable); @@ -103,12 +122,11 @@ class mqtt_broadcaster { co_await mqtt_client_->recv(async_mqtt::filter::match, { async_mqtt::control_packet_type::connack }, asio::use_awaitable); + logger_.trace("Acknowledgement received, connection established"); connect_active_ = false; co_return; } catch (std::exception& e) { logger_.error("Error while connecting to MQTT broker: {}", e.what()); - } catch (...) { - logger_.error("Unknown error while connecting to MQTT broker"); } // If the connection failed, then clear the socket and try again after 1 second. @@ -120,6 +138,7 @@ class mqtt_broadcaster { // Function which has oversight over the signals auto load_signals() -> void { + logger_.info("Loading signals"); // Stop reading the current signals by canceling the slots for (auto& slot : slots_) { std::visit([](auto& ptr) { ptr->cancel(); }, slot); @@ -135,6 +154,7 @@ class mqtt_broadcaster { } auto get_signals() -> void { + logger_.info("Getting signals from IPC client"); ipc_client_.signals([&](const std::vector& signals) { for (const tfc::ipc_ruler::signal& signal : signals) { active_signals_.push_back(signal); @@ -144,21 +164,28 @@ class mqtt_broadcaster { ctx_.run_for(std::chrono::milliseconds(20)); } - // This function removes signals that are banned + // This function filter out signals that are not allowed auto clean_signals() -> void { + logger_.info("Filtering signals"); + + for (auto& signal : allowed_signals_) { + logger_.trace("Signal {}", signal); + } + active_signals_.erase(std::remove_if(active_signals_.begin(), active_signals_.end(), [&](const tfc::ipc_ruler::signal& signal) { - return std::ranges::any_of(banned_signals_.begin(), banned_signals_.end(), - [&](const std::string& banned_string) { - return signal.name.find(banned_string) != - std::string::npos; - }); + return std::ranges::none_of(allowed_signals_.begin(), allowed_signals_.end(), + [&](const std::string& allowed_string) { + return signal.name.find(allowed_string) != + std::string::npos; + }); }), active_signals_.end()); } // This function checks the type of the signal in order to determine how to read from the slot auto handle_signal(tfc::ipc_ruler::signal& signal) -> void { + logger_.info("Handling signal {}", signal.name); switch (signal.type) { case details::type_e::_bool: { run_slot(signal); @@ -193,7 +220,8 @@ class mqtt_broadcaster { // This function runs the coroutine for the slot template auto run_slot(tfc::ipc_ruler::signal& signal) -> void { - slots_.push_back(std::make_shared>(ctx_, signal.name)); + logger_.info("Running slot for signal {}", signal.name); + slots_.emplace_back(std::make_shared>(ctx_, signal.name)); auto& slot = std::get>>(slots_.back()); slot->connect(signal.name); asio::co_spawn(mqtt_client_->strand(), slot_coroutine(*slot, signal.name), asio::detached); @@ -209,21 +237,7 @@ class mqtt_broadcaster { } // This function is called when a signal is received - void update_signals([[maybe_unused]] sdbusplus::message::message& msg) noexcept { restart(); } - - // This function is called when a value is received and its value needs to be converted to a string - template - auto convert_to_string(value_type value) -> std::string { - if constexpr (std::is_same_v) { - return value ? "true" : "false"; - } else if constexpr (std::is_same_v) { - return value; - } else if constexpr (std::is_arithmetic_v) { - return std::to_string(value); - } else { - return "unknown"; - } - } + void update_signals([[maybe_unused]] sdbusplus::message::message& msg) { restart(); } // This function runs a coroutine for a slot, when a new value is received a message is sent to the MQTT broker template @@ -234,7 +248,7 @@ class mqtt_broadcaster { while (!stop_coroutine_) { std::expected msg = co_await slot.async_receive(asio::use_awaitable); if (msg) { - std::string message_value = convert_to_string(msg.value()); + std::string message_value = fmt::format("{}", msg.value()); co_await send_message(signal_name, message_value); } } @@ -288,9 +302,10 @@ class mqtt_broadcaster { tfc::logger::logger logger_; std::unique_ptr> dbus_connection_; std::unique_ptr> signal_updates_; - tfc::confman::config config_; - std::vector banned_signals_; + config_manager& config_; + + std::vector allowed_signals_; std::vector active_signals_; std::vector>, std::shared_ptr>, diff --git a/exes/mqtt-broadcaster/src/main.cpp b/exes/mqtt-broadcaster/src/main.cpp index d78e8ccf5b..cd28e8acc4 100644 --- a/exes/mqtt-broadcaster/src/main.cpp +++ b/exes/mqtt-broadcaster/src/main.cpp @@ -2,42 +2,39 @@ #include #include #include "mqtt_broadcaster.hpp" +#include "config.hpp" +#include +#include auto main(int argc, char* argv[]) -> int { - try { - auto program_description{ tfc::base::default_description() }; + auto program_description{ tfc::base::default_description() }; - std::string mqtt_host; - std::string mqtt_port; - std::string mqtt_username; - std::string mqtt_password; + std::string mqtt_host; + std::string mqtt_port = "1883"; + std::string mqtt_username; + std::string mqtt_password; - program_description.add_options()("mqtt_host", boost::program_options::value(&mqtt_host)->required(), - "ip address of mqtt broker")( - "mqtt_port", boost::program_options::value(&mqtt_port)->required(), "port of mqtt broker")( - "mqtt_username", boost::program_options::value(&mqtt_username), "username of mqtt broker")( - "mqtt_password", boost::program_options::value(&mqtt_password), "password of mqtt broker"); + program_description.add_options()("mqtt_host", boost::program_options::value(&mqtt_host)->required(), + "ip address of mqtt broker")( + "mqtt_port", boost::program_options::value(&mqtt_port), "port of mqtt broker")( + "mqtt_username", boost::program_options::value(&mqtt_username), "username of mqtt broker")( + "mqtt_password", boost::program_options::value(&mqtt_password), "password of mqtt broker"); - tfc::base::init(argc, argv, program_description); + tfc::base::init(argc, argv, program_description); - asio::io_context ctx{}; + asio::io_context ctx{}; - tfc::ipc_ruler::ipc_manager_client ipc_client{ ctx }; + tfc::ipc_ruler::ipc_manager_client ipc_client{ ctx }; - const std::shared_ptr> mqtt_client = - std::make_shared>( - async_mqtt::protocol_version::v5, ctx.get_executor()); + const std::shared_ptr> mqtt_client = + std::make_shared>( + async_mqtt::protocol_version::v5, ctx.get_executor()); - const mqtt_broadcaster> - application(ctx, mqtt_host, mqtt_port, mqtt_username, mqtt_password, ipc_client, mqtt_client); + const mqtt_broadcaster, config, tfc::confman::config> + application(ctx, mqtt_host, mqtt_port, mqtt_username, mqtt_password, ipc_client, mqtt_client); - ctx.run(); + ctx.run(); - } catch (std::exception& e) { - std::cerr << "Exception caught in main: " << e.what() << "\n"; - } catch (...) { - std::cerr << "Unknown exception caught in main\n"; - } return 0; } diff --git a/exes/mqtt-broadcaster/tests/mqtt_broadcaster_tests.cpp b/exes/mqtt-broadcaster/tests/mqtt_broadcaster_tests.cpp index 867df525fb..ba09438e6c 100644 --- a/exes/mqtt-broadcaster/tests/mqtt_broadcaster_tests.cpp +++ b/exes/mqtt-broadcaster/tests/mqtt_broadcaster_tests.cpp @@ -1,12 +1,16 @@ #include #include +#include +#include #include +#include +#include "config.hpp" #include "mqtt_broadcaster.hpp" namespace ut = boost::ut; using boost::ut::operator""_test; -const int TIMEOUT_IN_MS = 100; +constexpr std::chrono::duration timeout_duration = std::chrono::milliseconds(25); // This class mocks the response to send for the MQTT client class result { @@ -105,26 +109,23 @@ class mock_mqtt_client { uint16_t packet_id_ = 0; }; -// This function tests the sending of a simple value (bool) to the MQTT client -static auto send_simple_value(std::string mqtt_host, - std::string mqtt_port, - std::string mqtt_username, - std::string mqtt_password) -> void { - boost::asio::io_context ctx{}; - - tfc::ipc_ruler::ipc_manager_client_mock ipc_client; - - tfc::ipc::signal sig(ctx, ipc_client, "test_signal", - "description"); - - const std::shared_ptr mqtt_client = - std::make_shared(async_mqtt::protocol_version::v5, ctx.get_executor()); +template +class file_testable : public tfc::confman::file_storage { +public: + using tfc::confman::file_storage::file_storage; - const mqtt_broadcaster application( - ctx, std::move(mqtt_host), std::move(mqtt_port), std::move(mqtt_username), std::move(mqtt_password), ipc_client, - mqtt_client); + ~file_testable() { + std::error_code ignore{}; + std::filesystem::remove(this->file(), ignore); + } +}; - ctx.run_for(std::chrono::milliseconds(TIMEOUT_IN_MS)); +// This function tests the sending of a simple value (bool) to the MQTT client +static auto send_simple_value(asio::io_context& ctx, + std::shared_ptr mqtt_client, + tfc::ipc::signal& sig) + -> void { + ctx.run_for(timeout_duration); auto last_message = mqtt_client->get_last_message(); @@ -136,7 +137,7 @@ static auto send_simple_value(std::string mqtt_host, << "qos should be: " << last_message.opts().get_qos(); sig.send(true); - ctx.run_for(std::chrono::milliseconds(TIMEOUT_IN_MS)); + ctx.run_for(timeout_duration); last_message = mqtt_client->get_last_message(); ut::expect(last_message.packet_id() == 2) << "packet id should be: " << last_message.packet_id(); @@ -149,25 +150,12 @@ static auto send_simple_value(std::string mqtt_host, // This function tests adding a signal while the program is running to see weather the signal is added correctly and values // are sent from the added signal. -static auto add_signal_in_running(std::string mqtt_host, - std::string mqtt_port, - std::string mqtt_username, - std::string mqtt_password) -> void { - boost::asio::io_context ctx{}; - - tfc::ipc_ruler::ipc_manager_client_mock ipc_client; - - tfc::ipc::signal sig(ctx, ipc_client, "test_signal", - "description"); - - const std::shared_ptr mqtt_client = - std::make_shared(async_mqtt::protocol_version::v5, ctx.get_executor()); - - const mqtt_broadcaster application( - ctx, std::move(mqtt_host), std::move(mqtt_port), std::move(mqtt_username), std::move(mqtt_password), ipc_client, - mqtt_client); - - ctx.run_for(std::chrono::milliseconds(TIMEOUT_IN_MS)); +static auto add_signal_in_running( + asio::io_context& ctx, + std::shared_ptr mqtt_client, + tfc::ipc_ruler::ipc_manager_client_mock ipc_client, + tfc::ipc::signal& sig) -> void { + ctx.run_for(timeout_duration); auto last_message = mqtt_client->get_last_message(); @@ -179,7 +167,7 @@ static auto add_signal_in_running(std::string mqtt_host, << "qos should be: " << last_message.opts().get_qos(); sig.send(true); - ctx.run_for(std::chrono::milliseconds(TIMEOUT_IN_MS)); + ctx.run_for(timeout_duration); last_message = mqtt_client->get_last_message(); ut::expect(last_message.packet_id() == 2) << "packet id should be: " << last_message.packet_id(); @@ -189,16 +177,16 @@ static auto add_signal_in_running(std::string mqtt_host, ut::expect(last_message.opts().get_qos() == async_mqtt::qos::at_least_once) << "qos should be: " << last_message.opts().get_qos(); - ctx.run_for(std::chrono::milliseconds(TIMEOUT_IN_MS)); + ctx.run_for(timeout_duration); tfc::ipc::signal sig2( ctx, ipc_client, "test_signal2", "description2"); - ctx.run_for(std::chrono::milliseconds(TIMEOUT_IN_MS)); + ctx.run_for(timeout_duration); sig2.send(true); - ctx.run_for(std::chrono::milliseconds(TIMEOUT_IN_MS)); + ctx.run_for(timeout_duration); last_message = mqtt_client->get_last_message(); ut::expect(last_message.packet_id() == 2) << "packet id should be: " << last_message.packet_id(); @@ -210,25 +198,11 @@ static auto add_signal_in_running(std::string mqtt_host, } // This function tests if the program works correctly when the MQTT broker goes down and comes back up. -static auto mqtt_broker_goes_down(std::string mqtt_host, - std::string mqtt_port, - std::string mqtt_username, - std::string mqtt_password) -> void { - boost::asio::io_context ctx{}; - - tfc::ipc_ruler::ipc_manager_client_mock ipc_client; - - tfc::ipc::signal sig(ctx, ipc_client, "test_signal", - "description"); - - const std::shared_ptr mqtt_client = - std::make_shared(async_mqtt::protocol_version::v5, ctx.get_executor()); - - const mqtt_broadcaster application( - ctx, std::move(mqtt_host), std::move(mqtt_port), std::move(mqtt_username), std::move(mqtt_password), ipc_client, - mqtt_client); - - ctx.run_for(std::chrono::milliseconds(TIMEOUT_IN_MS)); +static auto mqtt_broker_goes_down( + asio::io_context& ctx, + std::shared_ptr mqtt_client, + tfc::ipc::signal& sig) -> void { + ctx.run_for(timeout_duration); auto last_message = mqtt_client->get_last_message(); @@ -240,7 +214,7 @@ static auto mqtt_broker_goes_down(std::string mqtt_host, << "qos should be: " << last_message.opts().get_qos(); sig.send(true); - ctx.run_for(std::chrono::milliseconds(TIMEOUT_IN_MS)); + ctx.run_for(timeout_duration); last_message = mqtt_client->get_last_message(); @@ -251,11 +225,11 @@ static auto mqtt_broker_goes_down(std::string mqtt_host, ut::expect(last_message.opts().get_qos() == async_mqtt::qos::at_least_once) << "qos should be: " << last_message.opts().get_qos(); - ctx.run_for(std::chrono::milliseconds(TIMEOUT_IN_MS)); + ctx.run_for(timeout_duration); mqtt_client->set_online(false); - ctx.run_for(std::chrono::milliseconds(TIMEOUT_IN_MS)); + ctx.run_for(timeout_duration); sig.send(false); - ctx.run_for(std::chrono::milliseconds(TIMEOUT_IN_MS)); + ctx.run_for(timeout_duration); last_message = mqtt_client->get_last_message(); @@ -267,7 +241,7 @@ static auto mqtt_broker_goes_down(std::string mqtt_host, << "qos is: " << last_message.opts().get_qos() << " when it should be: at_least_once"; mqtt_client->set_online(true); - ctx.run_for(std::chrono::milliseconds(TIMEOUT_IN_MS)); + ctx.run_for(timeout_duration); last_message = mqtt_client->get_last_message(); @@ -284,11 +258,45 @@ static auto mqtt_broker_goes_down(std::string mqtt_host, auto main(int argc, char* argv[]) -> int { tfc::base::init(argc, argv); - "Sending a single value"_test = [] { send_simple_value("localhost", "1883", "", ""); }; + std::string mqtt_host{ "localhost" }; + std::string mqtt_port{ "1883" }; + std::string mqtt_username{ "" }; + std::string mqtt_password{ "" }; + + boost::asio::io_context ctx{}; + + tfc::ipc_ruler::ipc_manager_client_mock ipc_client; + + tfc::ipc::signal sig(ctx, ipc_client, "test_signal", + "description"); + + const std::shared_ptr mqtt_client = + std::make_shared(async_mqtt::protocol_version::v5, ctx.get_executor()); + + tfc::confman::stub_config cfg{ + ctx, "mqtt_broadcaster", config{ ._allowed_topics = tfc::confman::observable>{} } + }; + + cfg.access()._allowed_topics = { "test_signal" }; + + const mqtt_broadcaster> + application(ctx, std::move(mqtt_host), std::move(mqtt_port), std::move(mqtt_username), std::move(mqtt_password), + ipc_client, mqtt_client, cfg); + + "Sending a single value"_test = [&] { + std::cout << "First test\n"; + send_simple_value(ctx, mqtt_client, sig); + }; - "Adding a signal while the program is running"_test = [] { add_signal_in_running("localhost", "1883", "", ""); }; + "Adding a signal while the program is running"_test = [&] { + std::cout << "Second test\n"; + add_signal_in_running(ctx, mqtt_client, ipc_client, sig); + }; - "MQTT broker goes down"_test = [] { mqtt_broker_goes_down("localhost", "1883", "", ""); }; + "MQTT broker goes down"_test = [&] { + std::cout << "Third test\n"; + mqtt_broker_goes_down(ctx, mqtt_client, sig); + }; return 0; } diff --git a/libs/ipc/inc/public/tfc/ipc/details/dbus_server_iface.hpp b/libs/ipc/inc/public/tfc/ipc/details/dbus_server_iface.hpp index 30e3c2d73e..0dc6577348 100644 --- a/libs/ipc/inc/public/tfc/ipc/details/dbus_server_iface.hpp +++ b/libs/ipc/inc/public/tfc/ipc/details/dbus_server_iface.hpp @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -24,12 +25,14 @@ namespace tfc::ipc_ruler { using tfc::ipc::details::type_e; +static constexpr std::string_view dbus_name{ "ipc_ruler" }; +static constexpr std::string_view dbus_manager_name{ "manager" }; // service name -static auto ipc_ruler_service_name = "com.skaginn3x.ipc_ruler"; +static constexpr auto const_ipc_ruler_service_name = dbus::const_dbus_name; // object path -static auto ipc_ruler_object_path = "/com/skaginn3x/ipc_ruler"; +static constexpr auto const_ipc_ruler_object_path = dbus::const_dbus_path; // Interface name -static auto ipc_ruler_interface_name = "com.skaginn3x.manager"; +static constexpr auto const_ipc_ruler_interface_name = dbus::const_dbus_name; using dbus_error = tfc::dbus::exception::runtime; @@ -227,8 +230,9 @@ class ipc_manager_server { : ipc_manager_{ std::move(ipc_manager) } { connection_ = std::make_shared(ctx, tfc::dbus::sd_bus_open_system()); object_server_ = std::make_unique(connection_); - connection_->request_name(ipc_ruler_service_name); - dbus_iface_ = object_server_->add_unique_interface(ipc_ruler_object_path, ipc_ruler_interface_name); + connection_->request_name(const_ipc_ruler_service_name.data()); + dbus_iface_ = + object_server_->add_unique_interface(const_ipc_ruler_object_path.data(), const_ipc_ruler_interface_name.data()); ipc_manager_->set_callback([&](std::string_view slot_name, std::string_view signal_name) { auto message = dbus_iface_->new_signal("ConnectionChange"); @@ -284,11 +288,25 @@ class ipc_manager_client { public: explicit ipc_manager_client(boost::asio::io_context& ctx) { connection_ = std::make_unique(ctx, tfc::dbus::sd_bus_open_system()); - match_ = std::make_unique( - *connection_, - fmt::format("sender='{}',interface='{}',path='{}',type='signal'", ipc_ruler_service_name, ipc_ruler_interface_name, - ipc_ruler_object_path), - std::bind_front(&ipc_manager_client::match_callback, this)); + match_ = make_match(); + } + // Todo copy constructors can be implemented but I don't see why we need them + ipc_manager_client(ipc_manager_client const&) = delete; + auto operator=(ipc_manager_client const&) -> ipc_manager_client& = delete; + ipc_manager_client(ipc_manager_client&& to_be_erased) noexcept { + connection_ = std::move(to_be_erased.connection_); + slot_callbacks_ = std::move(to_be_erased.slot_callbacks_); + // It is pretty safe to construct new match here it mostly invokes C api where it does not explicitly throw + // it could throw if we are out of memory but then we are already screwed and the process will terminate. + match_ = make_match(); + } + auto operator=(ipc_manager_client&& to_be_erased) noexcept -> ipc_manager_client& { + connection_ = std::move(to_be_erased.connection_); + slot_callbacks_ = std::move(to_be_erased.slot_callbacks_); + // It is pretty safe to construct new match here it mostly invokes C api where it does not explicitly throw + // it could throw if we are out of memory but then we are already screwed and the process will terminate. + match_ = make_match(); + return *this; } /** @@ -301,8 +319,8 @@ class ipc_manager_client { const std::string_view description, type_e type, std::invocable auto&& handler) -> void { - connection_->async_method_call(std::forward(handler), ipc_ruler_service_name, ipc_ruler_object_path, - ipc_ruler_interface_name, "RegisterSignal", name, description, + connection_->async_method_call(std::forward(handler), ipc_ruler_service_name_, ipc_ruler_object_path_, + ipc_ruler_interface_name_, "RegisterSignal", name, description, static_cast(type)); } @@ -316,8 +334,8 @@ class ipc_manager_client { const std::string_view description, type_e type, std::invocable auto&& handler) -> void { - connection_->async_method_call(std::forward(handler), ipc_ruler_service_name, ipc_ruler_object_path, - ipc_ruler_interface_name, "RegisterSlot", name, description, static_cast(type)); + connection_->async_method_call(std::forward(handler), ipc_ruler_service_name_, ipc_ruler_object_path_, + ipc_ruler_interface_name_, "RegisterSlot", name, description, static_cast(type)); } /** @@ -327,16 +345,16 @@ class ipc_manager_client { * @param handler a function like object that is called back with a vector of signals */ auto signals(std::invocable&> auto&& handler) -> void { - sdbusplus::asio::getProperty(*connection_, ipc_ruler_service_name, ipc_ruler_object_path, - ipc_ruler_interface_name, "Signals", - [handler = std::forward(handler)]( + sdbusplus::asio::getProperty(*connection_, ipc_ruler_service_name_, ipc_ruler_object_path_, + ipc_ruler_interface_name_, "Signals", + [captured_handler = std::forward(handler)]( const boost::system::error_code& error, const std::string& response) { if (error) { return; } auto signals = glz::read_json>(response); if (signals) { - handler(signals.value()); + captured_handler(signals.value()); } }); } @@ -348,16 +366,16 @@ class ipc_manager_client { * @param handler a function like object that is called back with a vector of slots */ auto slots(std::invocable&> auto&& handler) -> void { - sdbusplus::asio::getProperty(*connection_, ipc_ruler_service_name, ipc_ruler_object_path, - ipc_ruler_interface_name, "Slots", - [handler = std::forward(handler)]( + sdbusplus::asio::getProperty(*connection_, ipc_ruler_service_name_, ipc_ruler_object_path_, + ipc_ruler_interface_name_, "Slots", + [captured_handler = std::forward(handler)]( const boost::system::error_code& error, const std::string& response) { if (error) { return; } auto slots = glz::read_json>(response); if (slots) { - handler(slots.value()); + captured_handler(slots.value()); } }); } @@ -368,7 +386,7 @@ class ipc_manager_client { */ auto connections(std::invocable>&> auto&& handler) -> void { sdbusplus::asio::getProperty( - *connection_, ipc_ruler_service_name, ipc_ruler_object_path, ipc_ruler_interface_name, "Connections", + *connection_, ipc_ruler_service_name_, ipc_ruler_object_path_, ipc_ruler_interface_name_, "Connections", [handler](const boost::system::error_code& error, const std::string& response) { if (error) { return; @@ -391,8 +409,8 @@ class ipc_manager_client { auto connect(const std::string& slot_name, const std::string& signal_name, std::invocable auto&& handler) -> void { - connection_->async_method_call(std::forward(handler), ipc_ruler_service_name, ipc_ruler_object_path, - ipc_ruler_interface_name, "Connect", slot_name, signal_name); + connection_->async_method_call(std::forward(handler), ipc_ruler_service_name_, ipc_ruler_object_path_, + ipc_ruler_interface_name_, "Connect", slot_name, signal_name); } /** @@ -403,8 +421,8 @@ class ipc_manager_client { */ template auto disconnect(const std::string& slot_name, message_handler&& handler) -> void { - connection_->async_method_call(std::forward(handler), ipc_ruler_service_name, ipc_ruler_object_path, - ipc_ruler_interface_name, "disconnect", slot_name); + connection_->async_method_call(std::forward(handler), ipc_ruler_service_name_, ipc_ruler_object_path_, + ipc_ruler_interface_name_, "Disconnect", slot_name); } /** @@ -421,6 +439,13 @@ class ipc_manager_client { } private: + auto make_match() -> std::unique_ptr { + return std::make_unique( + *connection_, + fmt::format("sender='{}',interface='{}',path='{}',type='signal'", ipc_ruler_service_name_, ipc_ruler_interface_name_, + ipc_ruler_object_path_), + std::bind_front(&ipc_manager_client::match_callback, this)); + } auto match_callback(sdbusplus::message_t& msg) -> void { auto container = msg.unpack>(); std::string const slot_name = std::get<0>(container); @@ -430,6 +455,9 @@ class ipc_manager_client { std::invoke(iterator->second, signal_name); } } + const std::string ipc_ruler_service_name_{ const_ipc_ruler_service_name }; + const std::string ipc_ruler_interface_name_{ const_ipc_ruler_interface_name }; + const std::string ipc_ruler_object_path_{ const_ipc_ruler_object_path }; std::unique_ptr connection_; std::unique_ptr match_;