Skip to content

Commit

Permalink
Create MessageQueueMock.
Browse files Browse the repository at this point in the history
By providing and using a MessageQueueInterface
the tests can now use a mock to avoid the cost
of the real message queue's initialization,
allowing tests to run faster and for tests to
be easier to write and maintain.

Signed-off-by: Gianfranco Berardi <[email protected]>
  • Loading branch information
gberardi-pillar committed Oct 11, 2024
1 parent 1733977 commit 558deb8
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 84 deletions.
138 changes: 79 additions & 59 deletions include/ocpp/common/message_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,57 @@ bool allowed_to_send_message(const ControlMessage<M>& message, const DateTime& t
return true;
}

template <typename M> class MessageQueueInterface {
public:
virtual ~MessageQueueInterface() {
}
virtual void start() = 0;
virtual void reset_next_message_to_send() = 0;
virtual void get_persisted_messages_from_db(bool ignore_security_event_notifications = false) = 0;

/// \brief pushes a new \p call message onto the message queue
template <class T> void push(Call<T> call, const bool stall_until_accepted = false) {
json call_json = call;
this->push(call_json, stall_until_accepted);
}
virtual void push(const json& message, const bool stall_until_accepted = false) = 0;
template <class T> void push(CallResult<T> call_result) {
json call_result_json = call_result;
this->push_call_result(call_result_json, call_result.uniqueId);
}
virtual void push_call_result(const json& call_result_json, const MessageId& unique_id) = 0;
virtual void push(CallError call_error) = 0;
template <class T> std::future<EnhancedMessage<M>> push_async(Call<T> call) {
auto message = std::make_shared<ControlMessage<M>>(call);
return push_async_internal(message);
}
virtual std::future<EnhancedMessage<M>> push_async_internal(std::shared_ptr<ControlMessage<M>> message) = 0;
virtual EnhancedMessage<M> receive(std::string_view message) = 0;
virtual void reset_in_flight() = 0;
virtual void handle_call_result(EnhancedMessage<M>& enhanced_message) = 0;
virtual void handle_timeout_or_callerror(const std::optional<EnhancedMessage<M>>& enhanced_message_opt) = 0;
virtual void stop() = 0;
virtual void pause() = 0;
virtual void resume(std::chrono::seconds delay_on_reconnect) = 0;
virtual void set_registration_status_accepted() = 0;
virtual bool is_transaction_message_queue_empty() = 0;
virtual bool contains_transaction_messages(const CiString<36> transaction_id) = 0;
virtual bool contains_stop_transaction_message(const int32_t transaction_id) = 0;
virtual void update_transaction_message_attempts(const int transaction_message_attempts) = 0;
virtual void update_transaction_message_retry_interval(const int transaction_message_retry_interval) = 0;
virtual void update_message_timeout(const int timeout) = 0;
virtual MessageId createMessageId() = 0;
virtual void add_stopped_transaction_id(std::string stop_transaction_message_id, int32_t transaction_id) = 0;
virtual void add_meter_value_message_id(const std::string& start_transaction_message_id,
const std::string& meter_value_message_id) = 0;
virtual void notify_start_transaction_handled(const std::string& start_transaction_message_id,
const int32_t transaction_id) = 0;
virtual M string_to_messagetype(const std::string& s) = 0;
virtual std::string messagetype_to_string(M m) = 0;
};

/// \brief contains a message queue that makes sure that OCPPs synchronicity requirements are met
template <typename M> class MessageQueue {
template <typename M> class MessageQueue : public MessageQueueInterface<M> {
private:
MessageQueueConfig<M> config;
std::shared_ptr<ocpp::common::DatabaseHandlerCommon> database_handler;
Expand Down Expand Up @@ -417,7 +466,7 @@ template <typename M> class MessageQueue {
MessageQueue(send_callback, config, {}, databaseHandler) {
}

void start() {
void start() override {
this->worker_thread = std::thread([this]() {
// TODO(kai): implement message timeout
while (this->running) {
Expand Down Expand Up @@ -577,13 +626,13 @@ template <typename M> class MessageQueue {
}

/// \brief Resets next message to send. Can be used in situation when we dont want to reply to a CALL message
void reset_next_message_to_send() {
void reset_next_message_to_send() override {
std::lock_guard<std::recursive_mutex> lk(this->next_message_mutex);
this->next_message_to_send.reset();
}

/// \brief Gets all persisted messages of normal message queue and persisted message queue from the database
void get_persisted_messages_from_db(bool ignore_security_event_notifications = false) {
void get_persisted_messages_from_db(bool ignore_security_event_notifications = false) override {
std::vector<QueueType> queue_types = {QueueType::Normal, QueueType::Transaction};
// do for Normal and Transaction queue
for (const auto queue_type : queue_types) {
Expand Down Expand Up @@ -627,16 +676,7 @@ template <typename M> class MessageQueue {
}
}

/// \brief pushes a new \p call message onto the message queue
template <class T> void push(Call<T> call, const bool stall_until_accepted = false) {
if (!running) {
return;
}
json call_json = call;
push(call_json, stall_until_accepted);
}

void push(const json& message, const bool stall_until_accepted = false) {
void push(const json& message, const bool stall_until_accepted = false) override {
if (!running) {
return;
}
Expand All @@ -661,16 +701,16 @@ template <typename M> class MessageQueue {
}

/// \brief Sends a new \p call_result message over the websocket
template <class T> void push(CallResult<T> call_result) {
void push_call_result(const json& call_result_json, const MessageId& unique_id) override {
if (!running) {
return;
}

this->send_callback(call_result);
this->send_callback(call_result_json);
{
std::lock_guard<std::recursive_mutex> lk(this->next_message_mutex);
if (next_message_to_send.has_value()) {
if (next_message_to_send.value() == call_result.uniqueId) {
if (next_message_to_send.value() == unique_id) {
next_message_to_send.reset();
}
}
Expand All @@ -680,7 +720,7 @@ template <typename M> class MessageQueue {
}

/// \brief Sends a new \p call_error message over the websocket
void push(CallError call_error) {
void push(CallError call_error) override {
if (!running) {
return;
}
Expand All @@ -700,9 +740,7 @@ template <typename M> class MessageQueue {

/// \brief pushes a new \p call message onto the message queue
/// \returns a future from which the CallResult can be extracted
template <class T> std::future<EnhancedMessage<M>> push_async(Call<T> call) {
auto message = std::make_shared<ControlMessage<M>>(call);

std::future<EnhancedMessage<M>> push_async_internal(std::shared_ptr<ControlMessage<M>> message) override {
if (!running) {
auto enhanced_message = EnhancedMessage<M>();
enhanced_message.offline = true;
Expand Down Expand Up @@ -730,7 +768,7 @@ template <typename M> class MessageQueue {
/// \brief Enhances a received \p json_message with additional meta information, checks if it is a valid CallResult
/// with a corresponding Call message on top of the queue
/// \returns the enhanced message
EnhancedMessage<M> receive(std::string_view message) {
EnhancedMessage<M> receive(std::string_view message) override {
EnhancedMessage<M> enhanced_message;

enhanced_message.message = json::parse(message);
Expand Down Expand Up @@ -783,12 +821,12 @@ template <typename M> class MessageQueue {
return enhanced_message;
}

void reset_in_flight() {
void reset_in_flight() override {
this->in_flight = nullptr;
this->in_flight_timeout_timer.stop();
}

void handle_call_result(EnhancedMessage<M>& enhanced_message) {
void handle_call_result(EnhancedMessage<M>& enhanced_message) override {
if (this->in_flight->uniqueId() == enhanced_message.uniqueId) {
enhanced_message.call_message = this->in_flight->message;
enhanced_message.messageType = this->string_to_messagetype(
Expand Down Expand Up @@ -822,7 +860,7 @@ template <typename M> class MessageQueue {
}

/// \brief Handles a message timeout or a CALLERROR. \p enhanced_message_opt is set only in case of CALLERROR
void handle_timeout_or_callerror(const std::optional<EnhancedMessage<M>>& enhanced_message_opt) {
void handle_timeout_or_callerror(const std::optional<EnhancedMessage<M>>& enhanced_message_opt) override {
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
// We got a timeout iff enhanced_message_opt is empty. Otherwise, enhanced_message_opt contains the CallError.
bool timeout = !enhanced_message_opt.has_value();
Expand Down Expand Up @@ -925,7 +963,7 @@ template <typename M> class MessageQueue {
}

/// \brief Stops the message queue
void stop() {
void stop() override {
EVLOG_debug << "stop()";
// stop the running thread
this->running = false;
Expand All @@ -935,7 +973,7 @@ template <typename M> class MessageQueue {
}

/// \brief Pauses the message queue
void pause() {
void pause() override {
EVLOG_debug << "pause()";
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
this->pause_resume_ctr++;
Expand All @@ -947,7 +985,7 @@ template <typename M> class MessageQueue {
}

/// \brief Resumes the message queue
void resume(std::chrono::seconds delay_on_reconnect) {
void resume(std::chrono::seconds delay_on_reconnect) override {
EVLOG_debug << "resume() called";
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
if (!this->paused) {
Expand All @@ -966,77 +1004,59 @@ template <typename M> class MessageQueue {
}
}

void set_registration_status_accepted() {
void set_registration_status_accepted() override {
{
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
this->is_registration_status_accepted = true;
}
this->cv.notify_all();
}

bool is_transaction_message_queue_empty() {
bool is_transaction_message_queue_empty() override {
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
return this->transaction_message_queue.empty();
}

bool contains_transaction_messages(const CiString<36> transaction_id) {
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
for (const auto control_message : this->transaction_message_queue) {
if (control_message->messageType == v201::MessageType::TransactionEvent) {
v201::TransactionEventRequest req = control_message->message.at(CALL_PAYLOAD);
if (req.transactionInfo.transactionId == transaction_id) {
return true;
}
}
}
bool contains_transaction_messages(const CiString<36> transaction_id) override {
return false;
}

bool contains_stop_transaction_message(const int32_t transaction_id) {
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
for (const auto control_message : this->transaction_message_queue) {
if (control_message->messageType == v16::MessageType::StopTransaction) {
v16::StopTransactionRequest req = control_message->message.at(CALL_PAYLOAD);
if (req.transactionId == transaction_id) {
return true;
}
}
}
bool contains_stop_transaction_message(const int32_t transaction_id) override {
return false;
}

/// \brief Set transaction_message_attempts to given \p transaction_message_attempts
void update_transaction_message_attempts(const int transaction_message_attempts) {
void update_transaction_message_attempts(const int transaction_message_attempts) override {
this->config.transaction_message_attempts = transaction_message_attempts;
}

/// \brief Set transaction_message_retry_interval to given \p transaction_message_retry_interval in seconds
void update_transaction_message_retry_interval(const int transaction_message_retry_interval) {
void update_transaction_message_retry_interval(const int transaction_message_retry_interval) override {
this->config.transaction_message_retry_interval = transaction_message_retry_interval;
}

/// \brief Set message_timeout to given \p timeout (in seconds)
void update_message_timeout(const int timeout) {
void update_message_timeout(const int timeout) override {
this->config.message_timeout_seconds = timeout;
}

/// \brief Creates a unique message ID
/// \returns the unique message ID
MessageId createMessageId() {
MessageId createMessageId() override {
std::stringstream s;
s << this->uuid_generator();
return MessageId(s.str());
}

/// \brief Adds the given \p transaction_id to the message_id_transaction_id_map using the key \p
/// stop_transaction_message_id
void add_stopped_transaction_id(std::string stop_transaction_message_id, int32_t transaction_id) {
void add_stopped_transaction_id(std::string stop_transaction_message_id, int32_t transaction_id) override {
EVLOG_debug << "adding " << stop_transaction_message_id << " for transaction " << transaction_id;
this->message_id_transaction_id_map[stop_transaction_message_id] = transaction_id;
}

void add_meter_value_message_id(const std::string& start_transaction_message_id,
const std::string& meter_value_message_id) {
const std::string& meter_value_message_id) override {
if (this->start_transaction_mid_meter_values_mid_map.count(start_transaction_message_id)) {
this->start_transaction_mid_meter_values_mid_map.at(start_transaction_message_id)
.push_back(meter_value_message_id);
Expand All @@ -1048,7 +1068,7 @@ template <typename M> class MessageQueue {
}

void notify_start_transaction_handled(const std::string& start_transaction_message_id,
const int32_t transaction_id) {
const int32_t transaction_id) override {
this->cv.notify_one();

// replace transaction id in meter values if start_transaction_message_id is present in map
Expand All @@ -1069,8 +1089,8 @@ template <typename M> class MessageQueue {
this->start_transaction_mid_meter_values_mid_map.erase(start_transaction_message_id);
}

M string_to_messagetype(const std::string& s);
std::string messagetype_to_string(M m);
M string_to_messagetype(const std::string& s) override;
std::string messagetype_to_string(M m) override;
};

} // namespace ocpp
Expand Down
2 changes: 1 addition & 1 deletion include/ocpp/v16/charge_point_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class ChargePointImpl : ocpp::ChargingStationBase {

std::unique_ptr<Websocket> websocket;
Everest::SteadyTimer websocket_timer;
std::unique_ptr<MessageQueue<v16::MessageType>> message_queue;
std::unique_ptr<MessageQueueInterface<v16::MessageType>> message_queue;
std::map<int32_t, std::shared_ptr<Connector>> connectors;
std::unique_ptr<SmartChargingHandler> smart_charging_handler;
int32_t heartbeat_interval;
Expand Down
7 changes: 4 additions & 3 deletions include/ocpp/v201/charge_point.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ class ChargePoint : public ChargePointInterface, private ocpp::ChargingStationBa
std::unique_ptr<ConnectivityManager> connectivity_manager;

// utility
std::shared_ptr<MessageQueue<v201::MessageType>> message_queue;
std::shared_ptr<MessageQueueInterface<v201::MessageType>> message_queue;
std::shared_ptr<DatabaseHandler> database_handler;

std::map<int32_t, AvailabilityChange> scheduled_change_availability_requests;
Expand Down Expand Up @@ -750,8 +750,9 @@ class ChargePoint : public ChargePointInterface, private ocpp::ChargingStationBa
/// \param callbacks Callbacks that will be registered for ChargePoint
ChargePoint(const std::map<int32_t, int32_t>& evse_connector_structure, std::shared_ptr<DeviceModel> device_model,
std::shared_ptr<DatabaseHandler> database_handler,
std::shared_ptr<MessageQueue<v201::MessageType>> message_queue, const std::string& message_log_path,
const std::shared_ptr<EvseSecurity> evse_security, const Callbacks& callbacks);
std::shared_ptr<MessageQueueInterface<v201::MessageType>> message_queue,
const std::string& message_log_path, const std::shared_ptr<EvseSecurity> evse_security,
const Callbacks& callbacks);

/// \brief Construct a new ChargePoint object
/// \param evse_connector_structure Map that defines the structure of EVSE and connectors of the chargepoint. The
Expand Down
13 changes: 13 additions & 0 deletions lib/ocpp/v16/message_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,17 @@ template <> std::string MessageQueue<v16::MessageType>::messagetype_to_string(v1
return v16::conversions::messagetype_to_string(m);
}

template <> bool MessageQueue<v16::MessageType>::contains_stop_transaction_message(const int32_t transaction_id) {
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
for (const auto control_message : this->transaction_message_queue) {
if (control_message->messageType == v16::MessageType::StopTransaction) {
v16::StopTransactionRequest req = control_message->message.at(CALL_PAYLOAD);
if (req.transactionId == transaction_id) {
return true;
}
}
}
return false;
}

} // namespace ocpp
2 changes: 1 addition & 1 deletion lib/ocpp/v201/charge_point.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ static DisplayMessage message_info_to_display_message(const MessageInfo& message

ChargePoint::ChargePoint(const std::map<int32_t, int32_t>& evse_connector_structure,
std::shared_ptr<DeviceModel> device_model, std::shared_ptr<DatabaseHandler> database_handler,
std::shared_ptr<MessageQueue<v201::MessageType>> message_queue,
std::shared_ptr<MessageQueueInterface<v201::MessageType>> message_queue,
const std::string& message_log_path, const std::shared_ptr<EvseSecurity> evse_security,
const Callbacks& callbacks) :
ocpp::ChargingStationBase(evse_security),
Expand Down
13 changes: 13 additions & 0 deletions lib/ocpp/v201/message_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,17 @@ template <> std::string MessageQueue<v201::MessageType>::messagetype_to_string(c
return v201::conversions::messagetype_to_string(m);
}

template <> bool MessageQueue<v201::MessageType>::contains_transaction_messages(const CiString<36> transaction_id) {
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
for (const auto control_message : this->transaction_message_queue) {
if (control_message->messageType == v201::MessageType::TransactionEvent) {
v201::TransactionEventRequest req = control_message->message.at(CALL_PAYLOAD);
if (req.transactionInfo.transactionId == transaction_id) {
return true;
}
}
}
return false;
}

} // namespace ocpp
Loading

0 comments on commit 558deb8

Please sign in to comment.