Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Config option to specify MessageTypesDiscardForQueueing #765

Merged
merged 1 commit into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion config/v16/profile_schemas/Internal.json
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,11 @@
"type": "boolean",
"readOnly": true
},
"MessageTypesDiscardForQueueing": {
"$comment": "Comma seperated list of message types that shall not be queued (when offline) even in case QueueAllMessages is true. If QueueAllMessages is false, the configuration of this paramater has no effect.",
"type": "string",
"readOnly": true
},
"MessageQueueSizeThreshold": {
"$comment": "Threshold for the size of in-memory message queues used to buffer messages (and store e.g. while offline). If threshold is exceeded, messages will be dropped according to OCPP specification to avoid memory issues.",
"type": "integer",
Expand All @@ -298,4 +303,4 @@
}
},
"additionalProperties": false
}
}
15 changes: 15 additions & 0 deletions config/v201/component_config/standardized/OCPPCommCtrlr.json
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,21 @@
"description": "When this variable is set to true, the Charging Station will queue all message until they are delivered to the CSMS.",
"type": "boolean"
},
"MessageTypesDiscardForQueueing": {
"variable_name": "MessageTypesDiscardForQueueing",
"characteristics": {
"supportsMonitoring": true,
"dataType": "SequenceList"
},
"attributes": [
{
"type": "Actual",
"mutability": "ReadOnly"
}
],
"description": "Comma seperated list of message types that shall not be queued (when offline) even in case QueueAllMessages is true. If QueueAllMessages is false, the configuration of this paramater has no effect.",
"type": "string"
},
"ResetRetries": {
"variable_name": "ResetRetries",
"characteristics": {
Expand Down
31 changes: 20 additions & 11 deletions include/ocpp/common/message_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <future>
#include <mutex>
#include <queue>
#include <set>
#include <thread>

#include <boost/uuid/uuid.hpp>
Expand All @@ -29,20 +30,28 @@ namespace ocpp {

using QueryExecutionException = common::QueryExecutionException;

struct MessageQueueConfig {
template <typename M> struct MessageQueueConfig {
int transaction_message_attempts;
int transaction_message_retry_interval; // seconds

// threshold for the accumulated sizes of the queues; if the queues exceed this limit,
// messages are potentially dropped in accordance with OCPP 2.0.1. Specification (cf. QueueAllMessages parameter)
int queues_total_size_threshold;

bool queue_all_messages; // cf. OCPP 2.0.1. "QueueAllMessages" in OCPPCommCtrlr
bool queue_all_messages{false}; // cf. OCPP 2.0.1. "QueueAllMessages" in OCPPCommCtrlr
std::set<M> message_types_discard_for_queueing; // allows to discard certain message types for offline queuing (e.g.
// Heartbeat)

int message_timeout_seconds = 30;
int boot_notification_retry_interval_seconds =
60; // interval for BootNotification.req in case response by CSMS is CALLERROR or CSMS does not respond at all
// (within specified MessageTimeout)

/// \brief Returns true if the given \p message_type shall be queued based on the configuration of
/// queue_all_messages and message_types_discard_for_queueing
bool check_queue(const M& message_type) {
return queue_all_messages and !message_types_discard_for_queueing.count(message_type);
};
};

/// \brief Contains a OCPP message in json form with additional information
Expand Down Expand Up @@ -158,7 +167,7 @@ bool allowed_to_send_message(const ControlMessage<M>& message, const DateTime& t
/// \brief contains a message queue that makes sure that OCPPs synchronicity requirements are met
template <typename M> class MessageQueue {
private:
MessageQueueConfig config;
MessageQueueConfig<M> config;
std::shared_ptr<ocpp::common::DatabaseHandlerCommon> database_handler;

std::thread worker_thread;
Expand Down Expand Up @@ -241,7 +250,7 @@ template <typename M> class MessageQueue {
} else {
this->normal_message_queue.push_back(message);
}
if (this->config.queue_all_messages) {
if (this->config.check_queue(message->messageType)) {
ocpp::common::DBTransactionMessage db_message{
message->message, messagetype_to_string(message->messageType), message->message_attempts,
message->timestamp, message->uniqueId()};
Expand Down Expand Up @@ -383,7 +392,7 @@ template <typename M> class MessageQueue {
public:
/// \brief Creates a new MessageQueue object with the provided \p configuration and \p send_callback
MessageQueue(
const std::function<bool(json message)>& send_callback, const MessageQueueConfig& config,
const std::function<bool(json message)>& send_callback, const MessageQueueConfig<M>& config,
const std::vector<M>& external_notify, std::shared_ptr<common::DatabaseHandlerCommon> database_handler,
const std::function<void(const std::string& new_message_id, const std::string& old_message_id)>
start_transaction_message_retry_callback =
Expand All @@ -403,7 +412,7 @@ template <typename M> class MessageQueue {
this->in_flight = nullptr;
}

MessageQueue(const std::function<bool(json message)>& send_callback, const MessageQueueConfig& config,
MessageQueue(const std::function<bool(json message)>& send_callback, const MessageQueueConfig<M>& config,
std::shared_ptr<common::DatabaseHandlerCommon> databaseHandler) :
MessageQueue(send_callback, config, {}, databaseHandler) {
}
Expand Down Expand Up @@ -528,7 +537,7 @@ template <typename M> class MessageQueue {
if (this->in_flight->message.at(CALL_ACTION) == "TransactionEvent") {
this->in_flight->message.at(CALL_PAYLOAD)["offline"] = true;
}
} else if (this->config.queue_all_messages) {
} else if (this->config.check_queue(this->in_flight->messageType)) {
EVLOG_info << "The message in flight will be sent again once the connection can be "
"established again since QueueAllMessages is set to 'true'.";
} else {
Expand Down Expand Up @@ -643,7 +652,7 @@ template <typename M> class MessageQueue {
} else {
// all other messages are allowed to "jump the queue" to improve user experience
// TODO: decide if we only want to allow this for a subset of messages
if (!this->paused || this->resuming || this->config.queue_all_messages ||
if (!this->paused || this->resuming || this->config.check_queue(control_message->messageType) ||
control_message->messageType == M::BootNotification) {
this->add_to_normal_message_queue(control_message);
}
Expand Down Expand Up @@ -705,7 +714,7 @@ template <typename M> class MessageQueue {
} else {
// all other messages are allowed to "jump the queue" to improve user experience
// TODO: decide if we only want to allow this for a subset of messages
if (this->paused && !this->config.queue_all_messages && !this->resuming &&
if (this->paused && !this->config.check_queue(message->messageType) && !this->resuming &&
message->messageType != M::BootNotification) {
// do not add a normal message to the queue if the queue is paused/offline
auto enhanced_message = EnhancedMessage<M>();
Expand Down Expand Up @@ -788,7 +797,7 @@ template <typename M> class MessageQueue {

const auto queue_type =
is_transaction_message(*this->in_flight) ? QueueType::Transaction : QueueType::Normal;
if (is_transaction_message(*this->in_flight) or this->config.queue_all_messages) {
if (is_transaction_message(*this->in_flight) or this->config.check_queue(this->in_flight->messageType)) {
try {
// We only remove the message as soon as a response is received. Otherwise we might miss a message
// if the charging station just boots after sending, but before receiving the result.
Expand Down Expand Up @@ -826,7 +835,7 @@ template <typename M> class MessageQueue {
}

const auto queue_type = is_transaction_message(*this->in_flight) ? QueueType::Transaction : QueueType::Normal;
if (is_transaction_message(*this->in_flight) or this->config.queue_all_messages) {
if (is_transaction_message(*this->in_flight) or this->config.check_queue(this->in_flight->messageType)) {
if (this->in_flight->message_attempts < this->config.transaction_message_attempts) {
EVLOG_warning << "Message shall be persisted and will therefore be sent again";
// Generate a new message ID for the retry
Expand Down
3 changes: 3 additions & 0 deletions include/ocpp/v16/charge_point_configuration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ class ChargePointConfiguration {
std::optional<bool> getQueueAllMessages();
std::optional<KeyValue> getQueueAllMessagesKeyValue();

std::optional<std::string> getMessageTypesDiscardForQueueing();
std::optional<KeyValue> getMessageTypesDiscardForQueueingKeyValue();

std::optional<int> getMessageQueueSizeThreshold();
std::optional<KeyValue> getMessageQueueSizeThresholdKeyValue();

Expand Down
1 change: 1 addition & 0 deletions include/ocpp/v201/ctrlr_component_variables.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ extern const RequiredComponentVariable& NetworkConfigurationPriority;
extern const RequiredComponentVariable& NetworkProfileConnectionAttempts;
extern const RequiredComponentVariable& OfflineThreshold;
extern const ComponentVariable& QueueAllMessages;
extern const ComponentVariable& MessageTypesDiscardForQueueing;
extern const RequiredComponentVariable& ResetRetries;
extern const RequiredComponentVariable& RetryBackOffRandomRange;
extern const RequiredComponentVariable& RetryBackOffRepeatTimes;
Expand Down
23 changes: 23 additions & 0 deletions lib/ocpp/v16/charge_point_configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,26 @@ std::optional<KeyValue> ChargePointConfiguration::getQueueAllMessagesKeyValue()
return queue_all_messages_kv;
}

std::optional<std::string> ChargePointConfiguration::getMessageTypesDiscardForQueueing() {
if (this->config["Internal"].contains("MessageTypesDiscardForQueueing")) {
return this->config["Internal"]["MessageTypesDiscardForQueueing"];
}
return std::nullopt;
}

std::optional<KeyValue> ChargePointConfiguration::getMessageTypesDiscardForQueueingKeyValue() {
std::optional<KeyValue> message_types_discard_for_queueing_kv = std::nullopt;
auto message_types_discard_for_queueing = this->getMessageTypesDiscardForQueueing();
if (message_types_discard_for_queueing.has_value()) {
KeyValue kv;
kv.key = "MessageTypesDiscardForQueueing";
kv.readonly = true;
kv.value.emplace(message_types_discard_for_queueing.value());
message_types_discard_for_queueing_kv.emplace(kv);
}
return message_types_discard_for_queueing_kv;
}

std::optional<int> ChargePointConfiguration::getMessageQueueSizeThreshold() {
std::optional<int> message_queue_size_threshold = std::nullopt;
if (this->config["Internal"].contains("MessageQueueSizeThreshold")) {
Expand Down Expand Up @@ -2862,6 +2882,9 @@ std::optional<KeyValue> ChargePointConfiguration::get(CiString<50> key) {
if (key == "QueueAllMessages") {
return this->getQueueAllMessagesKeyValue();
}
if (key == "MessageTypesDiscardForQueueing") {
return this->getMessageTypesDiscardForQueueingKeyValue();
}
if (key == "MessageQueueSizeThreshold") {
return this->getMessageQueueSizeThresholdKeyValue();
}
Expand Down
22 changes: 20 additions & 2 deletions lib/ocpp/v16/charge_point_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,31 @@ std::unique_ptr<ocpp::MessageQueue<v16::MessageType>> ChargePointImpl::create_me
}
};

std::set<v16::MessageType> message_types_discard_for_queueing;

if (this->configuration->getMessageTypesDiscardForQueueing().has_value()) {
try {
const auto message_types_discard_for_queueing_csl =
ocpp::split_string(this->configuration->getMessageTypesDiscardForQueueing().value(), ',');
std::transform(message_types_discard_for_queueing_csl.begin(), message_types_discard_for_queueing_csl.end(),
std::inserter(message_types_discard_for_queueing, message_types_discard_for_queueing.end()),
[](const std::string element) { return conversions::string_to_messagetype(element); });
} catch (const StringToEnumException& e) {
EVLOG_warning << "Could not convert configured MessageType value of MessageTypesDiscardForQueueing. Please "
"check you configurationMessageTypesDiscardForQueueing: "
<< e.what();
} catch (...) {
EVLOG_warning << "Could not apply MessageTypesDiscardForQueueing configuration";
}
}

return std::make_unique<ocpp::MessageQueue<v16::MessageType>>(
[this](json message) -> bool { return this->websocket->send(message.dump()); },
MessageQueueConfig{
MessageQueueConfig<v16::MessageType>{
this->configuration->getTransactionMessageAttempts(),
this->configuration->getTransactionMessageRetryInterval(),
this->configuration->getMessageQueueSizeThreshold().value_or(DEFAULT_MESSAGE_QUEUE_SIZE_THRESHOLD),
this->configuration->getQueueAllMessages().value_or(false)},
this->configuration->getQueueAllMessages().value_or(false), message_types_discard_for_queueing},
this->external_notify, this->database_handler, start_transaction_message_retry_callback);
}

Expand Down
21 changes: 20 additions & 1 deletion lib/ocpp/v201/charge_point.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,34 @@ ChargePoint::ChargePoint(const std::map<int32_t, int32_t>& evse_connector_struct

initialize(evse_connector_structure, message_log_path);

std::set<v201::MessageType> message_types_discard_for_queueing;
try {
const auto message_types_discard_for_queueing_csl = ocpp::split_string(
this->device_model
->get_optional_value<std::string>(ControllerComponentVariables::MessageTypesDiscardForQueueing)
.value_or(""),
',');
std::transform(message_types_discard_for_queueing_csl.begin(), message_types_discard_for_queueing_csl.end(),
std::inserter(message_types_discard_for_queueing, message_types_discard_for_queueing.end()),
[](const std::string element) { return conversions::string_to_messagetype(element); });
} catch (const StringToEnumException& e) {
EVLOG_warning << "Could not convert configured MessageType value of MessageTypesDiscardForQueueing. Please "
"check you configuration: "
<< e.what();
} catch (...) {
EVLOG_warning << "Could not apply MessageTypesDiscardForQueueing configuration";
}

this->message_queue = std::make_unique<ocpp::MessageQueue<v201::MessageType>>(
[this](json message) -> bool { return this->connectivity_manager->send_to_websocket(message.dump()); },
MessageQueueConfig{
MessageQueueConfig<v201::MessageType>{
this->device_model->get_value<int>(ControllerComponentVariables::MessageAttempts),
this->device_model->get_value<int>(ControllerComponentVariables::MessageAttemptInterval),
this->device_model->get_optional_value<int>(ControllerComponentVariables::MessageQueueSizeThreshold)
.value_or(DEFAULT_MESSAGE_QUEUE_SIZE_THRESHOLD),
this->device_model->get_optional_value<bool>(ControllerComponentVariables::QueueAllMessages)
.value_or(false),
message_types_discard_for_queueing,
this->device_model->get_value<int>(ControllerComponentVariables::MessageTimeout)},
this->database_handler);
}
Expand Down
7 changes: 7 additions & 0 deletions lib/ocpp/v201/ctrlr_component_variables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,13 @@ const ComponentVariable& QueueAllMessages = {
"QueueAllMessages",
}),
};
const ComponentVariable& MessageTypesDiscardForQueueing = {
ControllerComponents::OCPPCommCtrlr,
std::nullopt,
std::optional<Variable>({
"MessageTypesDiscardForQueueing",
}),
};
const RequiredComponentVariable& ResetRetries = {
ControllerComponents::OCPPCommCtrlr,
std::nullopt,
Expand Down
4 changes: 2 additions & 2 deletions tests/lib/ocpp/common/test_message_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class MessageQueueTest : public ::testing::Test {
int call_count{0};

protected:
MessageQueueConfig config{};
MessageQueueConfig<TestMessageType> config{};
std::shared_ptr<DatabaseHandlerBaseMock> db;
std::mutex call_marker_mutex;
std::condition_variable call_marker_cond_var;
Expand Down Expand Up @@ -225,7 +225,7 @@ class MessageQueueTest : public ::testing::Test {

void SetUp() override {
call_count = 0;
config = MessageQueueConfig{1, 1, 2, false};
config = MessageQueueConfig<TestMessageType>{1, 1, 2, false};
db = std::make_shared<DatabaseHandlerBaseMock>();
init_message_queue();
}
Expand Down
7 changes: 4 additions & 3 deletions tests/lib/ocpp/v201/test_charge_point.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,14 @@ class ChargePointFixture : public DatabaseTestingUtils {
const auto DEFAULT_MESSAGE_QUEUE_SIZE_THRESHOLD = 2E5;
return std::make_shared<ocpp::MessageQueue<v201::MessageType>>(
[this](json message) -> bool { return false; },
MessageQueueConfig{
MessageQueueConfig<v201::MessageType>{
this->device_model->get_value<int>(ControllerComponentVariables::MessageAttempts),
this->device_model->get_value<int>(ControllerComponentVariables::MessageAttemptInterval),
this->device_model->get_optional_value<int>(ControllerComponentVariables::MessageQueueSizeThreshold)
.value_or(DEFAULT_MESSAGE_QUEUE_SIZE_THRESHOLD),
this->device_model->get_optional_value<bool>(ControllerComponentVariables::QueueAllMessages)
.value_or(false),
{},
this->device_model->get_value<int>(ControllerComponentVariables::MessageTimeout)},
database_handler);
}
Expand Down Expand Up @@ -326,7 +327,7 @@ TEST_F(ChargePointFixture, CreateChargePoint_MissingDeviceModel_ThrowsInvalidArg
auto evse_security = std::make_shared<EvseSecurityMock>();
configure_callbacks_with_mocks();
auto message_queue = std::make_shared<ocpp::MessageQueue<v201::MessageType>>(
[this](json message) -> bool { return false; }, MessageQueueConfig{}, database_handler);
[this](json message) -> bool { return false; }, MessageQueueConfig<v201::MessageType>{}, database_handler);

EXPECT_THROW(ocpp::v201::ChargePoint(evse_connector_structure, nullptr, database_handler, message_queue, "/tmp",
evse_security, callbacks),
Expand All @@ -338,7 +339,7 @@ TEST_F(ChargePointFixture, CreateChargePoint_MissingDatabaseHandler_ThrowsInvali
auto evse_security = std::make_shared<EvseSecurityMock>();
configure_callbacks_with_mocks();
auto message_queue = std::make_shared<ocpp::MessageQueue<v201::MessageType>>(
[this](json message) -> bool { return false; }, MessageQueueConfig{}, nullptr);
[this](json message) -> bool { return false; }, MessageQueueConfig<v201::MessageType>{}, nullptr);

auto database_handler = nullptr;

Expand Down
Loading