Skip to content

Commit

Permalink
working on tests
Browse files Browse the repository at this point in the history
  • Loading branch information
magni-mar committed Jul 5, 2023
1 parent ae460ef commit 4afdf30
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 195 deletions.
44 changes: 28 additions & 16 deletions exes/mqtt-broadcaster/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
cmake_minimum_required(VERSION 3.21)
project(mqtt_broadcaster)

add_executable(mqtt-broadcaster "src/main.cpp")
add_executable(mqtt-broadcaster-tests "tests/mqtt_broadcaster_tests.cpp")

find_package(Boost 1.81.0 REQUIRED COMPONENTS log program_options)
# find_package(unofficial-libuuid CONFIG REQUIRED)
# find_package(eclipse-paho-mqtt-c CONFIG REQUIRED)

set(COMMON_LIBS
target_include_directories(mqtt-broadcaster
PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}/inc
)

target_link_libraries(mqtt-broadcaster
tfc::base
tfc::logger
tfc::stx
Expand All @@ -18,22 +18,34 @@
Boost::boost
Boost::log
Boost::program_options
# eclipse-paho-mqtt-c::paho-mqtt3as-static
# unofficial::UUID::uuid
)
)

set(COMMON_COMPILE_DEFINITIONS PUBLIC ASYNC_MQTT_USE_LOG)
target_compile_definitions(mqtt-broadcaster
PUBLIC
ASYNC_MQTT_USE_LOG
)

set(COMMON_INCLUDE_DIRECTORIES
target_include_directories(mqtt-broadcaster-tests
PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}/inc
)

foreach (TARGET mqtt-broadcaster mqtt-broadcaster-tests)
target_compile_definitions(${TARGET} ${COMMON_COMPILE_DEFINITIONS})
target_link_libraries(${TARGET} PUBLIC ${COMMON_LIBS})
target_include_directories(${TARGET} ${COMMON_INCLUDE_DIRECTORIES})
endforeach ()
target_link_libraries(mqtt-broadcaster-tests
tfc::base
tfc::logger
tfc::stx
tfc::ipc
tfc::operation_mode
tfc::confman
Boost::boost
Boost::log
Boost::program_options
)

target_compile_definitions(mqtt-broadcaster-tests
PUBLIC
ASYNC_MQTT_USE_LOG
)

include(GNUInstallDirs)
install(
Expand Down
14 changes: 14 additions & 0 deletions exes/mqtt-broadcaster/inc/config.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#pragma once

#include <tfc/confman.hpp>
#include <tfc/confman/observable.hpp>

// File under /etc/tfc/mqtt-broadcaster/def/mqtt_broadcaster.json which lists the signals that are supposed to be published
// on the mqtt broker. Updating the signals in this file will cause the running program to stop and restart
struct config {
tfc::confman::observable<std::vector<std::string>> _allowed_topics{};
struct glaze {
static constexpr auto value{ glz::object("_allowed_topics", &config::_allowed_topics) };
static constexpr auto name{ "mqtt_broadcaster" };
};
};
127 changes: 71 additions & 56 deletions exes/mqtt-broadcaster/inc/mqtt_broadcaster.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,37 @@
namespace asio = boost::asio;
namespace details = tfc::ipc::details;

// File under /etc/tfc/mqtt-broadcaster/def/mqtt_broadcaster.json which lists the signals that are not supposed to be
// published on the mqtt broker. Updating the signals in this file will cause the running program to stop and restart
struct config {
tfc::confman::observable<std::vector<std::string>> _banned_topics{};
struct glaze {
static constexpr auto value{ glz::object("_banned_topics", &config::_banned_topics) };
static constexpr auto name{ "mqtt_broadcaster" };
};
};

// In order to ease testing, types are injected into the class
template <class ipc_client_type, class mqtt_client_type>
template <class ipc_client_type, class mqtt_client_type, class config_manager>
class mqtt_broadcaster {
public:
// Constructor used for testing, enables injection of config
mqtt_broadcaster(asio::io_context& ctx,
std::string mqtt_address,
std::string mqtt_port,
std::string mqtt_username,
std::string mqtt_password,
ipc_client_type& ipc_client,
std::shared_ptr<mqtt_client_type> mqtt_client,
// TODO: the stub deletes if it is not passed by reference
config_manager& cfg)
: object_path_(tfc::dbus::make_dbus_path("ipc_ruler")), interface_name_(tfc::dbus::make_dbus_name("manager")),
mqtt_host_(std::move(mqtt_address)), mqtt_port_(std::move(mqtt_port)), mqtt_username_(std::move(mqtt_username)),
mqtt_password_(std::move(mqtt_password)), ctx_(ctx), ipc_client_(ipc_client), mqtt_client_(std::move(mqtt_client)),
logger_("mqtt_broadcaster"),
dbus_connection_(std::make_unique<sdbusplus::asio::connection>(ctx, tfc::dbus::sd_bus_open_system())),
signal_updates_(std::make_unique<sdbusplus::bus::match::match>(
*dbus_connection_,
sdbusplus::bus::match::rules::propertiesChanged(object_path_, interface_name_),
std::bind_front(&mqtt_broadcaster::update_signals, this))),
config_(cfg) {

std::cout << "topics: " << config_.string() << std::endl;

asio::co_spawn(mqtt_client_->strand(), initialize(), asio::detached);
}

// Normal constructor
mqtt_broadcaster(asio::io_context& ctx,
std::string mqtt_address,
std::string mqtt_port,
Expand All @@ -49,27 +66,29 @@ class mqtt_broadcaster {
// Initial connect to the broker
if (!connect_active_) {
connect_active_ = true;
logger_.trace("Initial connect to MQTT broker");
co_await asio::co_spawn(mqtt_client_->strand(), connect_to_broker(), asio::use_awaitable);
}

// When a new list of banned string arrives the program is restarted
config_.value()._banned_topics.observe([this](auto& new_conf, [[maybe_unused]] auto& old_conf) {
banned_signals_.clear();
// When a new list of allowed string arrives the program is restarted
config_.value()._allowed_topics.observe([this](auto& new_conf, [[maybe_unused]] auto& old_conf) {
allowed_signals_.clear();
for (auto const& con : new_conf) {
banned_signals_.push_back(con);
allowed_signals_.push_back(con);
}
restart();
});

asio::co_spawn(mqtt_client_->strand(), tfc::base::exit_signals(ctx_), asio::detached);

banned_signals_ = config_.value()._banned_topics.value();
allowed_signals_ = config_.value()._allowed_topics.value();
load_signals();

}

// Connect the mqtt client to the broker. Only a single "thread" can run this at a time. The routine that calls this
// function must set the connect_active_ flag to true before calling this function and the function sets it to false when
// finished.
// Connect the mqtt client to the broker. This function can only be spawned once to avoid multiple spawns to open/close
// the socket at the same time. The routine that calls this function must set the connect_active_ flag to true before
// calling this function and the function sets it to false when finished.
auto connect_to_broker() -> asio::awaitable<void> {
while (true) {
try {
Expand All @@ -86,29 +105,28 @@ class mqtt_broadcaster {

logger_.trace("Sending Connect request to the MQTT broker");

// Connect is initialized with clean start set to false, which means that the broker will keep the session alive even
// if the client disconnect, and it will keep that session alive for UINT_MAX seconds.
auto connect_packet =
async_mqtt::v5::connect_packet{ false,
std::chrono::seconds(100).count(),
async_mqtt::allocate_buffer("cid1"),
async_mqtt::nullopt,
async_mqtt::allocate_buffer(mqtt_username_),
async_mqtt::allocate_buffer(mqtt_password_),
{ async_mqtt::property::session_expiry_interval{ UINT_MAX } } };
// Connect is initialized with clean start set to false, which means that the broker will keep the session alive
// even if the client disconnects, and it will keep that session alive for UINT_MAX seconds.
auto connect_packet = async_mqtt::v5::connect_packet{ false,
std::chrono::seconds(100).count(),
async_mqtt::allocate_buffer("cid1"),
async_mqtt::nullopt,
async_mqtt::allocate_buffer(mqtt_username_),
async_mqtt::allocate_buffer(mqtt_password_),
{ async_mqtt::property::session_expiry_interval{
std::numeric_limits<unsigned int>::max() } } };

co_await mqtt_client_->send(connect_packet, asio::use_awaitable);

logger_.trace("Waiting for MQTT connection acknowledgement");
co_await mqtt_client_->recv(async_mqtt::filter::match, { async_mqtt::control_packet_type::connack },
asio::use_awaitable);

logger_.trace("Acknowledgement received, connection established");
connect_active_ = false;
co_return;
} catch (std::exception& e) {
logger_.error("Error while connecting to MQTT broker: {}", e.what());
} catch (...) {
logger_.error("Unknown error while connecting to MQTT broker");
}

// If the connection failed, then clear the socket and try again after 1 second.
Expand All @@ -120,6 +138,7 @@ class mqtt_broadcaster {

// Function which has oversight over the signals
auto load_signals() -> void {
logger_.info("Loading signals");
// Stop reading the current signals by canceling the slots
for (auto& slot : slots_) {
std::visit([](auto& ptr) { ptr->cancel(); }, slot);
Expand All @@ -135,6 +154,7 @@ class mqtt_broadcaster {
}

auto get_signals() -> void {
logger_.info("Getting signals from IPC client");
ipc_client_.signals([&](const std::vector<tfc::ipc_ruler::signal>& signals) {
for (const tfc::ipc_ruler::signal& signal : signals) {
active_signals_.push_back(signal);
Expand All @@ -144,21 +164,28 @@ class mqtt_broadcaster {
ctx_.run_for(std::chrono::milliseconds(20));
}

// This function removes signals that are banned
// This function filter out signals that are not allowed
auto clean_signals() -> void {
logger_.info("Filtering signals");

for (auto& signal : allowed_signals_) {
logger_.trace("Signal {}", signal);
}

active_signals_.erase(std::remove_if(active_signals_.begin(), active_signals_.end(),
[&](const tfc::ipc_ruler::signal& signal) {
return std::ranges::any_of(banned_signals_.begin(), banned_signals_.end(),
[&](const std::string& banned_string) {
return signal.name.find(banned_string) !=
std::string::npos;
});
return std::ranges::none_of(allowed_signals_.begin(), allowed_signals_.end(),
[&](const std::string& allowed_string) {
return signal.name.find(allowed_string) !=
std::string::npos;
});
}),
active_signals_.end());
}

// This function checks the type of the signal in order to determine how to read from the slot
auto handle_signal(tfc::ipc_ruler::signal& signal) -> void {
logger_.info("Handling signal {}", signal.name);
switch (signal.type) {
case details::type_e::_bool: {
run_slot<tfc::ipc::details::type_bool, bool>(signal);
Expand Down Expand Up @@ -193,7 +220,8 @@ class mqtt_broadcaster {
// This function runs the coroutine for the slot
template <typename slot_type, typename value_type>
auto run_slot(tfc::ipc_ruler::signal& signal) -> void {
slots_.push_back(std::make_shared<details::slot<slot_type>>(ctx_, signal.name));
logger_.info("Running slot for signal {}", signal.name);
slots_.emplace_back(std::make_shared<details::slot<slot_type>>(ctx_, signal.name));
auto& slot = std::get<std::shared_ptr<details::slot<slot_type>>>(slots_.back());
slot->connect(signal.name);
asio::co_spawn(mqtt_client_->strand(), slot_coroutine<slot_type, value_type>(*slot, signal.name), asio::detached);
Expand All @@ -209,21 +237,7 @@ class mqtt_broadcaster {
}

// This function is called when a signal is received
void update_signals([[maybe_unused]] sdbusplus::message::message& msg) noexcept { restart(); }

// This function is called when a value is received and its value needs to be converted to a string
template <typename value_type>
auto convert_to_string(value_type value) -> std::string {
if constexpr (std::is_same_v<value_type, bool>) {
return value ? "true" : "false";
} else if constexpr (std::is_same_v<value_type, std::string>) {
return value;
} else if constexpr (std::is_arithmetic_v<value_type>) {
return std::to_string(value);
} else {
return "unknown";
}
}
void update_signals([[maybe_unused]] sdbusplus::message::message& msg) { restart(); }

// This function runs a coroutine for a slot, when a new value is received a message is sent to the MQTT broker
template <class slot_type, class value_type>
Expand All @@ -234,7 +248,7 @@ class mqtt_broadcaster {
while (!stop_coroutine_) {
std::expected<value_type, std::error_code> msg = co_await slot.async_receive(asio::use_awaitable);
if (msg) {
std::string message_value = convert_to_string(msg.value());
std::string message_value = fmt::format("{}", msg.value());
co_await send_message<value_type>(signal_name, message_value);
}
}
Expand Down Expand Up @@ -288,9 +302,10 @@ class mqtt_broadcaster {
tfc::logger::logger logger_;
std::unique_ptr<sdbusplus::asio::connection, std::function<void(sdbusplus::asio::connection*)>> dbus_connection_;
std::unique_ptr<sdbusplus::bus::match::match, std::function<void(sdbusplus::bus::match::match*)>> signal_updates_;
tfc::confman::config<config> config_;

std::vector<std::string> banned_signals_;
config_manager& config_;

std::vector<std::string> allowed_signals_;
std::vector<tfc::ipc_ruler::signal> active_signals_;
std::vector<std::variant<std::shared_ptr<details::slot<details::type_bool>>,
std::shared_ptr<details::slot<details::type_string>>,
Expand Down
49 changes: 23 additions & 26 deletions exes/mqtt-broadcaster/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,39 @@
#include <boost/program_options.hpp>
#include <tfc/ipc.hpp>
#include "mqtt_broadcaster.hpp"
#include "config.hpp"
#include <tfc/confman.hpp>
#include <tfc/confman/observable.hpp>

auto main(int argc, char* argv[]) -> int {
try {
auto program_description{ tfc::base::default_description() };
auto program_description{ tfc::base::default_description() };

std::string mqtt_host;
std::string mqtt_port;
std::string mqtt_username;
std::string mqtt_password;
std::string mqtt_host;
std::string mqtt_port = "1883";
std::string mqtt_username;
std::string mqtt_password;

program_description.add_options()("mqtt_host", boost::program_options::value<std::string>(&mqtt_host)->required(),
"ip address of mqtt broker")(
"mqtt_port", boost::program_options::value<std::string>(&mqtt_port)->required(), "port of mqtt broker")(
"mqtt_username", boost::program_options::value<std::string>(&mqtt_username), "username of mqtt broker")(
"mqtt_password", boost::program_options::value<std::string>(&mqtt_password), "password of mqtt broker");
program_description.add_options()("mqtt_host", boost::program_options::value<std::string>(&mqtt_host)->required(),
"ip address of mqtt broker")(
"mqtt_port", boost::program_options::value<std::string>(&mqtt_port), "port of mqtt broker")(
"mqtt_username", boost::program_options::value<std::string>(&mqtt_username), "username of mqtt broker")(
"mqtt_password", boost::program_options::value<std::string>(&mqtt_password), "password of mqtt broker");

tfc::base::init(argc, argv, program_description);
tfc::base::init(argc, argv, program_description);

asio::io_context ctx{};
asio::io_context ctx{};

tfc::ipc_ruler::ipc_manager_client ipc_client{ ctx };
tfc::ipc_ruler::ipc_manager_client ipc_client{ ctx };

const std::shared_ptr<async_mqtt::endpoint<async_mqtt::role::client, async_mqtt::protocol::mqtt>> mqtt_client =
std::make_shared<async_mqtt::endpoint<async_mqtt::role::client, async_mqtt::protocol::mqtt>>(
async_mqtt::protocol_version::v5, ctx.get_executor());
const std::shared_ptr<async_mqtt::endpoint<async_mqtt::role::client, async_mqtt::protocol::mqtt>> mqtt_client =
std::make_shared<async_mqtt::endpoint<async_mqtt::role::client, async_mqtt::protocol::mqtt>>(
async_mqtt::protocol_version::v5, ctx.get_executor());

const mqtt_broadcaster<tfc::ipc_ruler::ipc_manager_client&,
async_mqtt::endpoint<async_mqtt::role::client, async_mqtt::protocol::mqtt>>
application(ctx, mqtt_host, mqtt_port, mqtt_username, mqtt_password, ipc_client, mqtt_client);
const mqtt_broadcaster<tfc::ipc_ruler::ipc_manager_client&,
async_mqtt::endpoint<async_mqtt::role::client, async_mqtt::protocol::mqtt>, config, tfc::confman::config>
application(ctx, mqtt_host, mqtt_port, mqtt_username, mqtt_password, ipc_client, mqtt_client);

ctx.run();
ctx.run();

} catch (std::exception& e) {
std::cerr << "Exception caught in main: " << e.what() << "\n";
} catch (...) {
std::cerr << "Unknown exception caught in main\n";
}
return 0;
}
Loading

0 comments on commit 4afdf30

Please sign in to comment.