diff --git a/config/v16/profile_schemas/Internal.json b/config/v16/profile_schemas/Internal.json index 0a0399823..fa927caca 100644 --- a/config/v16/profile_schemas/Internal.json +++ b/config/v16/profile_schemas/Internal.json @@ -277,6 +277,11 @@ "type": "boolean", "readOnly": true }, + "MessageTypesDiscardForQueueing": { + "$comment": "Comma seperated list of message types that shall not be queued (when offline) even in case QueueAllMessages is true. If QueueAllMessages is false, the configuration of this paramater has no effect.", + "type": "string", + "readOnly": true + }, "MessageQueueSizeThreshold": { "$comment": "Threshold for the size of in-memory message queues used to buffer messages (and store e.g. while offline). If threshold is exceeded, messages will be dropped according to OCPP specification to avoid memory issues.", "type": "integer", @@ -298,4 +303,4 @@ } }, "additionalProperties": false -} \ No newline at end of file +} diff --git a/config/v201/component_config/standardized/OCPPCommCtrlr.json b/config/v201/component_config/standardized/OCPPCommCtrlr.json index 7cdeef36b..ba16724f8 100644 --- a/config/v201/component_config/standardized/OCPPCommCtrlr.json +++ b/config/v201/component_config/standardized/OCPPCommCtrlr.json @@ -209,6 +209,21 @@ "description": "When this variable is set to true, the Charging Station will queue all message until they are delivered to the CSMS.", "type": "boolean" }, + "MessageTypesDiscardForQueueing": { + "variable_name": "MessageTypesDiscardForQueueing", + "characteristics": { + "supportsMonitoring": true, + "dataType": "SequenceList" + }, + "attributes": [ + { + "type": "Actual", + "mutability": "ReadOnly" + } + ], + "description": "Comma seperated list of message types that shall not be queued (when offline) even in case QueueAllMessages is true. If QueueAllMessages is false, the configuration of this paramater has no effect.", + "type": "string" + }, "ResetRetries": { "variable_name": "ResetRetries", "characteristics": { diff --git a/include/ocpp/common/message_queue.hpp b/include/ocpp/common/message_queue.hpp index 86b16923f..b7df66cf4 100644 --- a/include/ocpp/common/message_queue.hpp +++ b/include/ocpp/common/message_queue.hpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -29,7 +30,7 @@ namespace ocpp { using QueryExecutionException = common::QueryExecutionException; -struct MessageQueueConfig { +template struct MessageQueueConfig { int transaction_message_attempts; int transaction_message_retry_interval; // seconds @@ -37,12 +38,20 @@ struct MessageQueueConfig { // messages are potentially dropped in accordance with OCPP 2.0.1. Specification (cf. QueueAllMessages parameter) int queues_total_size_threshold; - bool queue_all_messages; // cf. OCPP 2.0.1. "QueueAllMessages" in OCPPCommCtrlr + bool queue_all_messages{false}; // cf. OCPP 2.0.1. "QueueAllMessages" in OCPPCommCtrlr + std::set message_types_discard_for_queueing; // allows to discard certain message types for offline queuing (e.g. + // Heartbeat) int message_timeout_seconds = 30; int boot_notification_retry_interval_seconds = 60; // interval for BootNotification.req in case response by CSMS is CALLERROR or CSMS does not respond at all // (within specified MessageTimeout) + + /// \brief Returns true if the given \p message_type shall be queued based on the configuration of + /// queue_all_messages and message_types_discard_for_queueing + bool check_queue(const M& message_type) { + return queue_all_messages and !message_types_discard_for_queueing.count(message_type); + }; }; /// \brief Contains a OCPP message in json form with additional information @@ -158,7 +167,7 @@ bool allowed_to_send_message(const ControlMessage& message, const DateTime& t /// \brief contains a message queue that makes sure that OCPPs synchronicity requirements are met template class MessageQueue { private: - MessageQueueConfig config; + MessageQueueConfig config; std::shared_ptr database_handler; std::thread worker_thread; @@ -241,7 +250,7 @@ template class MessageQueue { } else { this->normal_message_queue.push_back(message); } - if (this->config.queue_all_messages) { + if (this->config.check_queue(message->messageType)) { ocpp::common::DBTransactionMessage db_message{ message->message, messagetype_to_string(message->messageType), message->message_attempts, message->timestamp, message->uniqueId()}; @@ -383,7 +392,7 @@ template class MessageQueue { public: /// \brief Creates a new MessageQueue object with the provided \p configuration and \p send_callback MessageQueue( - const std::function& send_callback, const MessageQueueConfig& config, + const std::function& send_callback, const MessageQueueConfig& config, const std::vector& external_notify, std::shared_ptr database_handler, const std::function start_transaction_message_retry_callback = @@ -403,7 +412,7 @@ template class MessageQueue { this->in_flight = nullptr; } - MessageQueue(const std::function& send_callback, const MessageQueueConfig& config, + MessageQueue(const std::function& send_callback, const MessageQueueConfig& config, std::shared_ptr databaseHandler) : MessageQueue(send_callback, config, {}, databaseHandler) { } @@ -528,7 +537,7 @@ template class MessageQueue { if (this->in_flight->message.at(CALL_ACTION) == "TransactionEvent") { this->in_flight->message.at(CALL_PAYLOAD)["offline"] = true; } - } else if (this->config.queue_all_messages) { + } else if (this->config.check_queue(this->in_flight->messageType)) { EVLOG_info << "The message in flight will be sent again once the connection can be " "established again since QueueAllMessages is set to 'true'."; } else { @@ -643,7 +652,7 @@ template class MessageQueue { } else { // all other messages are allowed to "jump the queue" to improve user experience // TODO: decide if we only want to allow this for a subset of messages - if (!this->paused || this->resuming || this->config.queue_all_messages || + if (!this->paused || this->resuming || this->config.check_queue(control_message->messageType) || control_message->messageType == M::BootNotification) { this->add_to_normal_message_queue(control_message); } @@ -705,7 +714,7 @@ template class MessageQueue { } else { // all other messages are allowed to "jump the queue" to improve user experience // TODO: decide if we only want to allow this for a subset of messages - if (this->paused && !this->config.queue_all_messages && !this->resuming && + if (this->paused && !this->config.check_queue(message->messageType) && !this->resuming && message->messageType != M::BootNotification) { // do not add a normal message to the queue if the queue is paused/offline auto enhanced_message = EnhancedMessage(); @@ -788,7 +797,7 @@ template class MessageQueue { const auto queue_type = is_transaction_message(*this->in_flight) ? QueueType::Transaction : QueueType::Normal; - if (is_transaction_message(*this->in_flight) or this->config.queue_all_messages) { + if (is_transaction_message(*this->in_flight) or this->config.check_queue(this->in_flight->messageType)) { try { // We only remove the message as soon as a response is received. Otherwise we might miss a message // if the charging station just boots after sending, but before receiving the result. @@ -826,7 +835,7 @@ template class MessageQueue { } const auto queue_type = is_transaction_message(*this->in_flight) ? QueueType::Transaction : QueueType::Normal; - if (is_transaction_message(*this->in_flight) or this->config.queue_all_messages) { + if (is_transaction_message(*this->in_flight) or this->config.check_queue(this->in_flight->messageType)) { if (this->in_flight->message_attempts < this->config.transaction_message_attempts) { EVLOG_warning << "Message shall be persisted and will therefore be sent again"; // Generate a new message ID for the retry diff --git a/include/ocpp/v16/charge_point_configuration.hpp b/include/ocpp/v16/charge_point_configuration.hpp index 01a32ee38..845166af3 100644 --- a/include/ocpp/v16/charge_point_configuration.hpp +++ b/include/ocpp/v16/charge_point_configuration.hpp @@ -133,6 +133,9 @@ class ChargePointConfiguration { std::optional getQueueAllMessages(); std::optional getQueueAllMessagesKeyValue(); + std::optional getMessageTypesDiscardForQueueing(); + std::optional getMessageTypesDiscardForQueueingKeyValue(); + std::optional getMessageQueueSizeThreshold(); std::optional getMessageQueueSizeThresholdKeyValue(); diff --git a/include/ocpp/v201/ctrlr_component_variables.hpp b/include/ocpp/v201/ctrlr_component_variables.hpp index 96ab3c7f1..ce1d73c68 100644 --- a/include/ocpp/v201/ctrlr_component_variables.hpp +++ b/include/ocpp/v201/ctrlr_component_variables.hpp @@ -165,6 +165,7 @@ extern const RequiredComponentVariable& NetworkConfigurationPriority; extern const RequiredComponentVariable& NetworkProfileConnectionAttempts; extern const RequiredComponentVariable& OfflineThreshold; extern const ComponentVariable& QueueAllMessages; +extern const ComponentVariable& MessageTypesDiscardForQueueing; extern const RequiredComponentVariable& ResetRetries; extern const RequiredComponentVariable& RetryBackOffRandomRange; extern const RequiredComponentVariable& RetryBackOffRepeatTimes; diff --git a/lib/ocpp/v16/charge_point_configuration.cpp b/lib/ocpp/v16/charge_point_configuration.cpp index 1af9014cd..8975d42a9 100644 --- a/lib/ocpp/v16/charge_point_configuration.cpp +++ b/lib/ocpp/v16/charge_point_configuration.cpp @@ -856,6 +856,26 @@ std::optional ChargePointConfiguration::getQueueAllMessagesKeyValue() return queue_all_messages_kv; } +std::optional ChargePointConfiguration::getMessageTypesDiscardForQueueing() { + if (this->config["Internal"].contains("MessageTypesDiscardForQueueing")) { + return this->config["Internal"]["MessageTypesDiscardForQueueing"]; + } + return std::nullopt; +} + +std::optional ChargePointConfiguration::getMessageTypesDiscardForQueueingKeyValue() { + std::optional message_types_discard_for_queueing_kv = std::nullopt; + auto message_types_discard_for_queueing = this->getMessageTypesDiscardForQueueing(); + if (message_types_discard_for_queueing.has_value()) { + KeyValue kv; + kv.key = "MessageTypesDiscardForQueueing"; + kv.readonly = true; + kv.value.emplace(message_types_discard_for_queueing.value()); + message_types_discard_for_queueing_kv.emplace(kv); + } + return message_types_discard_for_queueing_kv; +} + std::optional ChargePointConfiguration::getMessageQueueSizeThreshold() { std::optional message_queue_size_threshold = std::nullopt; if (this->config["Internal"].contains("MessageQueueSizeThreshold")) { @@ -2862,6 +2882,9 @@ std::optional ChargePointConfiguration::get(CiString<50> key) { if (key == "QueueAllMessages") { return this->getQueueAllMessagesKeyValue(); } + if (key == "MessageTypesDiscardForQueueing") { + return this->getMessageTypesDiscardForQueueingKeyValue(); + } if (key == "MessageQueueSizeThreshold") { return this->getMessageQueueSizeThresholdKeyValue(); } diff --git a/lib/ocpp/v16/charge_point_impl.cpp b/lib/ocpp/v16/charge_point_impl.cpp index adc2f6105..861114365 100644 --- a/lib/ocpp/v16/charge_point_impl.cpp +++ b/lib/ocpp/v16/charge_point_impl.cpp @@ -241,13 +241,31 @@ std::unique_ptr> ChargePointImpl::create_me } }; + std::set message_types_discard_for_queueing; + + if (this->configuration->getMessageTypesDiscardForQueueing().has_value()) { + try { + const auto message_types_discard_for_queueing_csl = + ocpp::split_string(this->configuration->getMessageTypesDiscardForQueueing().value(), ','); + std::transform(message_types_discard_for_queueing_csl.begin(), message_types_discard_for_queueing_csl.end(), + std::inserter(message_types_discard_for_queueing, message_types_discard_for_queueing.end()), + [](const std::string element) { return conversions::string_to_messagetype(element); }); + } catch (const StringToEnumException& e) { + EVLOG_warning << "Could not convert configured MessageType value of MessageTypesDiscardForQueueing. Please " + "check you configurationMessageTypesDiscardForQueueing: " + << e.what(); + } catch (...) { + EVLOG_warning << "Could not apply MessageTypesDiscardForQueueing configuration"; + } + } + return std::make_unique>( [this](json message) -> bool { return this->websocket->send(message.dump()); }, - MessageQueueConfig{ + MessageQueueConfig{ this->configuration->getTransactionMessageAttempts(), this->configuration->getTransactionMessageRetryInterval(), this->configuration->getMessageQueueSizeThreshold().value_or(DEFAULT_MESSAGE_QUEUE_SIZE_THRESHOLD), - this->configuration->getQueueAllMessages().value_or(false)}, + this->configuration->getQueueAllMessages().value_or(false), message_types_discard_for_queueing}, this->external_notify, this->database_handler, start_transaction_message_retry_callback); } diff --git a/lib/ocpp/v201/charge_point.cpp b/lib/ocpp/v201/charge_point.cpp index aed79595e..87b30c1b8 100644 --- a/lib/ocpp/v201/charge_point.cpp +++ b/lib/ocpp/v201/charge_point.cpp @@ -158,15 +158,34 @@ ChargePoint::ChargePoint(const std::map& evse_connector_struct initialize(evse_connector_structure, message_log_path); + std::set message_types_discard_for_queueing; + try { + const auto message_types_discard_for_queueing_csl = ocpp::split_string( + this->device_model + ->get_optional_value(ControllerComponentVariables::MessageTypesDiscardForQueueing) + .value_or(""), + ','); + std::transform(message_types_discard_for_queueing_csl.begin(), message_types_discard_for_queueing_csl.end(), + std::inserter(message_types_discard_for_queueing, message_types_discard_for_queueing.end()), + [](const std::string element) { return conversions::string_to_messagetype(element); }); + } catch (const StringToEnumException& e) { + EVLOG_warning << "Could not convert configured MessageType value of MessageTypesDiscardForQueueing. Please " + "check you configuration: " + << e.what(); + } catch (...) { + EVLOG_warning << "Could not apply MessageTypesDiscardForQueueing configuration"; + } + this->message_queue = std::make_unique>( [this](json message) -> bool { return this->connectivity_manager->send_to_websocket(message.dump()); }, - MessageQueueConfig{ + MessageQueueConfig{ this->device_model->get_value(ControllerComponentVariables::MessageAttempts), this->device_model->get_value(ControllerComponentVariables::MessageAttemptInterval), this->device_model->get_optional_value(ControllerComponentVariables::MessageQueueSizeThreshold) .value_or(DEFAULT_MESSAGE_QUEUE_SIZE_THRESHOLD), this->device_model->get_optional_value(ControllerComponentVariables::QueueAllMessages) .value_or(false), + message_types_discard_for_queueing, this->device_model->get_value(ControllerComponentVariables::MessageTimeout)}, this->database_handler); } diff --git a/lib/ocpp/v201/ctrlr_component_variables.cpp b/lib/ocpp/v201/ctrlr_component_variables.cpp index 1607d6572..3616e30d2 100644 --- a/lib/ocpp/v201/ctrlr_component_variables.cpp +++ b/lib/ocpp/v201/ctrlr_component_variables.cpp @@ -878,6 +878,13 @@ const ComponentVariable& QueueAllMessages = { "QueueAllMessages", }), }; +const ComponentVariable& MessageTypesDiscardForQueueing = { + ControllerComponents::OCPPCommCtrlr, + std::nullopt, + std::optional({ + "MessageTypesDiscardForQueueing", + }), +}; const RequiredComponentVariable& ResetRetries = { ControllerComponents::OCPPCommCtrlr, std::nullopt, diff --git a/tests/lib/ocpp/common/test_message_queue.cpp b/tests/lib/ocpp/common/test_message_queue.cpp index b0a7b258b..4fca812bb 100644 --- a/tests/lib/ocpp/common/test_message_queue.cpp +++ b/tests/lib/ocpp/common/test_message_queue.cpp @@ -154,7 +154,7 @@ class MessageQueueTest : public ::testing::Test { int call_count{0}; protected: - MessageQueueConfig config{}; + MessageQueueConfig config{}; std::shared_ptr db; std::mutex call_marker_mutex; std::condition_variable call_marker_cond_var; @@ -225,7 +225,7 @@ class MessageQueueTest : public ::testing::Test { void SetUp() override { call_count = 0; - config = MessageQueueConfig{1, 1, 2, false}; + config = MessageQueueConfig{1, 1, 2, false}; db = std::make_shared(); init_message_queue(); } diff --git a/tests/lib/ocpp/v201/test_charge_point.cpp b/tests/lib/ocpp/v201/test_charge_point.cpp index 6172041e3..d429f0b09 100644 --- a/tests/lib/ocpp/v201/test_charge_point.cpp +++ b/tests/lib/ocpp/v201/test_charge_point.cpp @@ -190,13 +190,14 @@ class ChargePointFixture : public DatabaseTestingUtils { const auto DEFAULT_MESSAGE_QUEUE_SIZE_THRESHOLD = 2E5; return std::make_shared>( [this](json message) -> bool { return false; }, - MessageQueueConfig{ + MessageQueueConfig{ this->device_model->get_value(ControllerComponentVariables::MessageAttempts), this->device_model->get_value(ControllerComponentVariables::MessageAttemptInterval), this->device_model->get_optional_value(ControllerComponentVariables::MessageQueueSizeThreshold) .value_or(DEFAULT_MESSAGE_QUEUE_SIZE_THRESHOLD), this->device_model->get_optional_value(ControllerComponentVariables::QueueAllMessages) .value_or(false), + {}, this->device_model->get_value(ControllerComponentVariables::MessageTimeout)}, database_handler); } @@ -326,7 +327,7 @@ TEST_F(ChargePointFixture, CreateChargePoint_MissingDeviceModel_ThrowsInvalidArg auto evse_security = std::make_shared(); configure_callbacks_with_mocks(); auto message_queue = std::make_shared>( - [this](json message) -> bool { return false; }, MessageQueueConfig{}, database_handler); + [this](json message) -> bool { return false; }, MessageQueueConfig{}, database_handler); EXPECT_THROW(ocpp::v201::ChargePoint(evse_connector_structure, nullptr, database_handler, message_queue, "/tmp", evse_security, callbacks), @@ -338,7 +339,7 @@ TEST_F(ChargePointFixture, CreateChargePoint_MissingDatabaseHandler_ThrowsInvali auto evse_security = std::make_shared(); configure_callbacks_with_mocks(); auto message_queue = std::make_shared>( - [this](json message) -> bool { return false; }, MessageQueueConfig{}, nullptr); + [this](json message) -> bool { return false; }, MessageQueueConfig{}, nullptr); auto database_handler = nullptr;