Skip to content

Commit

Permalink
fix: Added CPP Event prioritization (#197)
Browse files Browse the repository at this point in the history
* fix: Addded prioritize

fix: addded prioritize

* fix: Added vector in the event map

* fix: Prioritize signatures

Prioritize signatures

* fix: Updated event.h to priortize internal events

Updated event.h to priortize internal events

* fix: Removed logs

* fix: Removed logs

* fix: Revoked Package.json changes

* fix: Created internal and external eventMap in Transport.h

Created internal and external eventMap in Transport.h

* fix: Added comments

Added comments

---------

Co-authored-by: kschrief <[email protected]>
  • Loading branch information
kdivya153 and kschrief authored Jul 29, 2024
1 parent 3a05dfa commit 11d25e9
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 83 deletions.
133 changes: 84 additions & 49 deletions languages/cpp/src/shared/src/Event/Event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
namespace FireboltSDK {
Event* Event::_singleton = nullptr;
Event::Event()
: _eventMap()
: _internalEventMap()
, _externalEventMap()
, _adminLock()
, _transport(nullptr)
{
Expand Down Expand Up @@ -87,77 +88,111 @@ namespace FireboltSDK {
}
return result;
}



/* This function combines both internal and external event maps, and iterates over them to find the specified event.
If the event is found, it iterates over its associated callbacks, updating their states and executing them if applicable.
Callbacks in the REVOKED state are removed.
*/
Firebolt::Error Event::Dispatch(const string& eventName, const WPEFramework::Core::ProxyType<WPEFramework::Core::JSONRPC::Message>& jsonResponse) /* override */
{
string response = jsonResponse->Result.Value();
_adminLock.Lock();
EventMap::iterator eventIndex = _eventMap.find(eventName);
if (eventIndex != _eventMap.end()) {
CallbackMap::iterator callbackIndex = eventIndex->second.begin();
while(callbackIndex != eventIndex->second.end()) {
State state;
if (callbackIndex->second.state != State::REVOKED) {
callbackIndex->second.state = State::EXECUTING;
}
state = callbackIndex->second.state;
_adminLock.Unlock();
if (state == State::EXECUTING) {
callbackIndex->second.lambda(callbackIndex->first, callbackIndex->second.userdata, (jsonResponse->Result.Value()));
}
_adminLock.Lock();
if (callbackIndex->second.state == State::REVOKED) {
callbackIndex = eventIndex->second.erase(callbackIndex);
if (eventIndex->second.size() == 0) {
_eventMap.erase(eventIndex);
std::vector<EventMap*> eventMaps = {&_internalEventMap, &_externalEventMap};

// Combine both _internalEventMap and _externalEventMap into a single loop
for (auto eventMap : eventMaps) {
_adminLock.Lock();
EventMap::iterator eventIndex = eventMap->find(eventName);
if (eventIndex != eventMap->end()) {
CallbackMap& callbacks = eventIndex->second;
for (CallbackMap::iterator callbackIndex = callbacks.begin(); callbackIndex != callbacks.end();) {
State state;
if (callbackIndex->second.state != State::REVOKED) {
callbackIndex->second.state = State::EXECUTING;
}
state = callbackIndex->second.state;
_adminLock.Unlock();
if (state == State::EXECUTING) {
callbackIndex->second.lambda(callbackIndex->first, callbackIndex->second.userdata, response);
}
_adminLock.Lock();
if (callbackIndex->second.state == State::REVOKED) {
callbackIndex = callbacks.erase(callbackIndex);
if (callbacks.empty()) {
eventMap->erase(eventIndex); // Erase from the correct eventMap
break; // No need to continue iterating if map is empty
}
} else {
callbackIndex->second.state = State::IDLE;
++callbackIndex;
}
} else {
callbackIndex->second.state = State::IDLE;
callbackIndex++;
}
}
_adminLock.Unlock();
}
_adminLock.Unlock();

return Firebolt::Error::None;;
return Firebolt::Error::None;
}


Firebolt::Error Event::Revoke(const string& eventName, void* usercb)
{
Firebolt::Error status = Firebolt::Error::None;
_adminLock.Lock();
EventMap::iterator eventIndex = _eventMap.begin();
if (eventIndex != _eventMap.end()) {
CallbackMap::iterator callbackIndex = eventIndex->second.find(usercb);
if (callbackIndex->second.state != State::EXECUTING) {

// Combine both _internalEventMap and _externalEventMap into a single loop
std::vector<EventMap*> eventMaps = {&_internalEventMap, &_externalEventMap};

for (auto eventMap : eventMaps) {
_adminLock.Lock(); // Lock inside the loop

// Find the eventIndex for eventName in the current eventMap
EventMap::iterator eventIndex = eventMap->find(eventName);
if (eventIndex != eventMap->end()) {
// Find the callbackIndex for usercb in the current CallbackMap
CallbackMap::iterator callbackIndex = eventIndex->second.find(usercb);
if (callbackIndex != eventIndex->second.end()) {
eventIndex->second.erase(callbackIndex);
// Check if callback is not executing, then erase it
if (callbackIndex->second.state != State::EXECUTING) {
eventIndex->second.erase(callbackIndex);
} else {
// Mark the callback as revoked
callbackIndex->second.state = State::REVOKED;
}

// Check if the CallbackMap is empty after potential erasure
if (eventIndex->second.empty()) {
eventMap->erase(eventIndex);
} else {
// Set status to General error if CallbackMap is not empty
status = Firebolt::Error::General;
}
}
} else {
callbackIndex->second.state = State::REVOKED;
}
if (eventIndex->second.size() == 0) {
_eventMap.erase(eventIndex);
} else {
status = Firebolt::Error::General;
}

_adminLock.Unlock(); // Unlock after processing each eventMap
}
_adminLock.Unlock();

return status;
}

void Event::Clear()
{
EventMap::iterator eventIndex = _eventMap.begin();
while (eventIndex != _eventMap.end()) {
CallbackMap::iterator callbackIndex = eventIndex->second.begin();
while (callbackIndex != eventIndex->second.end()) {
callbackIndex = eventIndex->second.erase(callbackIndex);
// Clear both _internalEventMap and _externalEventMap
std::vector<EventMap*> eventMaps = {&_internalEventMap, &_externalEventMap};

for (auto eventMap : eventMaps) {
_adminLock.Lock(); // Lock before clearing

EventMap::iterator eventIndex = eventMap->begin();
while (eventIndex != eventMap->end()) {
CallbackMap::iterator callbackIndex = eventIndex->second.begin();
while (callbackIndex != eventIndex->second.end()) {
callbackIndex = eventIndex->second.erase(callbackIndex);
}
eventIndex = eventMap->erase(eventIndex);
}
eventIndex = _eventMap.erase(eventIndex);

_adminLock.Unlock(); // Unlock after clearing
}
_adminLock.Unlock();
}

}
}
43 changes: 28 additions & 15 deletions languages/cpp/src/shared/src/Event/Event.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,65 +80,77 @@ namespace FireboltSDK {
}

template <typename RESULT, typename CALLBACK>
Firebolt::Error Subscribe(const string& eventName, JsonObject& jsonParameters, const CALLBACK& callback, void* usercb, const void* userdata)
Firebolt::Error Subscribe(const string& eventName, JsonObject& jsonParameters, const CALLBACK& callback, void* usercb, const void* userdata, bool prioritize = false)
{
Firebolt::Error status = Firebolt::Error::General;

if (_transport != nullptr) {
EventMap& eventMap = prioritize ? _internalEventMap : _externalEventMap;

status = Assign<RESULT, CALLBACK>(eventMap, eventName, callback, usercb, userdata);

status = Assign<RESULT, CALLBACK>(eventName, callback, usercb, userdata);
if (status == Firebolt::Error::None) {
Response response;

WPEFramework::Core::JSON::Variant Listen = true;
jsonParameters.Set(_T("listen"), Listen);
string parameters;
jsonParameters.ToString(parameters);

status = _transport->Subscribe<Response>(eventName, parameters, response);
status = _transport->Subscribe<Response>(eventName, parameters, response, prioritize);

if (status != Firebolt::Error::None) {
Revoke(eventName, usercb);
} else if ((response.Listening.IsSet() == true) &&
(response.Listening.Value() == true)) {
} else if (response.Listening.IsSet() && response.Listening.Value()) {
status = Firebolt::Error::None;
}
}
}
return status;
}

// To prioritize internal and external events and its corresponding callbacks
template <typename RESULT, typename CALLBACK>
Firebolt::Error Prioritize(const string& eventName,JsonObject& jsonParameters, const CALLBACK& callback, void* usercb, const void* userdata)
{
Firebolt::Error status = Firebolt::Error::General;
// Assuming prioritized events also need subscription via transport
status = Subscribe<RESULT, CALLBACK>(eventName, jsonParameters, callback, usercb, userdata, true);
return status;
}


Firebolt::Error Unsubscribe(const string& eventName, void* usercb);

private:
template <typename PARAMETERS, typename CALLBACK>
Firebolt::Error Assign(const string& eventName, const CALLBACK& callback, void* usercb, const void* userdata)
Firebolt::Error Assign(EventMap& eventMap, const string& eventName, const CALLBACK& callback, void* usercb, const void* userdata)
{

Firebolt::Error status = Firebolt::Error::General;
std::function<void(void* usercb, const void* userdata, void* parameters)> actualCallback = callback;
DispatchFunction implementation = [actualCallback](void* usercb, const void* userdata, const string& parameters) -> Firebolt::Error {

WPEFramework::Core::ProxyType<PARAMETERS>* inbound = new WPEFramework::Core::ProxyType<PARAMETERS>();
*inbound = WPEFramework::Core::ProxyType<PARAMETERS>::Create();
(*inbound)->FromString(parameters);
actualCallback(usercb, userdata, static_cast<void*>(inbound));
return (Firebolt::Error::None);
};
CallbackData callbackData = {implementation, userdata, State::IDLE};

_adminLock.Lock();
EventMap::iterator eventIndex = _eventMap.find(eventName);
if (eventIndex != _eventMap.end()) {
EventMap::iterator eventIndex = eventMap.find(eventName);
if (eventIndex != eventMap.end()) {
CallbackMap::iterator callbackIndex = eventIndex->second.find(usercb);

if (callbackIndex == eventIndex->second.end()) {
std::cout << "Registering new callback for event: " << eventName << std::endl;
eventIndex->second.emplace(std::piecewise_construct, std::forward_as_tuple(usercb), std::forward_as_tuple(callbackData));
status = Firebolt::Error::None;
}
} else {

CallbackMap callbackMap;
callbackMap.emplace(std::piecewise_construct, std::forward_as_tuple(usercb), std::forward_as_tuple(callbackData));
_eventMap.emplace(std::piecewise_construct, std::forward_as_tuple(eventName), std::forward_as_tuple(callbackMap));
eventMap.emplace(std::piecewise_construct, std::forward_as_tuple(eventName), std::forward_as_tuple(callbackMap));
status = Firebolt::Error::None;

}
Expand All @@ -153,11 +165,12 @@ namespace FireboltSDK {
Firebolt::Error ValidateResponse(const WPEFramework::Core::ProxyType<WPEFramework::Core::JSONRPC::Message>& jsonResponse, bool& enabled) override;
Firebolt::Error Dispatch(const string& eventName, const WPEFramework::Core::ProxyType<WPEFramework::Core::JSONRPC::Message>& jsonResponse) override;

private:
EventMap _eventMap;
private:
EventMap _internalEventMap;
EventMap _externalEventMap;
WPEFramework::Core::CriticalSection _adminLock;
Transport<WPEFramework::Core::JSON::IElement>* _transport;

static Event* _singleton;
};
}
}
63 changes: 47 additions & 16 deletions languages/cpp/src/shared/src/Transport/Transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,13 @@ namespace FireboltSDK {
void Revoke(const string& eventName)
{
_adminLock.Lock();
_eventMap.erase(eventName);

// Remove from internal event map
_internalEventMap.erase(eventName);

// Remove from external event map
_externalEventMap.erase(eventName);

_adminLock.Unlock();
}

Expand Down Expand Up @@ -640,22 +646,31 @@ namespace FireboltSDK {
}

template <typename RESPONSE>
Firebolt::Error Subscribe(const string& eventName, const string& parameters, RESPONSE& response)

Firebolt::Error Subscribe(const string& eventName, const string& parameters, RESPONSE& response, bool updateInternal = false)
{
Entry slot;
uint32_t id = _channel->Sequence();
Firebolt::Error result = Send(eventName, parameters, id);

if (result == Firebolt::Error::None) {
_adminLock.Lock();
_eventMap.emplace(std::piecewise_construct,
std::forward_as_tuple(eventName),
std::forward_as_tuple(~0));

// Choose the map based on updateInternal flag
EventMap& eventMap = updateInternal ? _internalEventMap : _externalEventMap;

// Add to the selected event map
eventMap.emplace(std::piecewise_construct,
std::forward_as_tuple(eventName),
std::forward_as_tuple(id));

_adminLock.Unlock();

result = WaitForEventResponse(id, eventName, response, _waitTime);
result = WaitForEventResponse(id, eventName, response, _waitTime, eventMap);

}

return (result);
return result;
}

Firebolt::Error Unsubscribe(const string& eventName, const string& parameters)
Expand Down Expand Up @@ -692,18 +707,34 @@ namespace FireboltSDK {

private:
friend Channel;

inline bool IsEvent(const uint32_t id, string& eventName)
{
_adminLock.Lock();
for (auto& event : _eventMap) {
if (event.second == id) {
eventName = event.first;
break;

bool eventExist = false;

// List of maps to search
std::vector<EventMap*> maps = {&_internalEventMap, &_externalEventMap};

// Loop through each map
for (const auto* map : maps) {
for (const auto& event : *map) {
if (event.second == id) {
eventName = event.first;
eventExist = true;
break; // Break the inner loop
}
}
if (eventExist) {
break; // Break the outer loop
}
}

_adminLock.Unlock();
return (eventName.empty() != true);
return eventExist;
}

uint64_t Timed()
{
uint64_t result = ~0;
Expand Down Expand Up @@ -802,8 +833,7 @@ namespace FireboltSDK {

return (result);
}



template <typename PARAMETERS>
Firebolt::Error Send(const string& method, const PARAMETERS& parameters, const uint32_t& id)
{
Expand Down Expand Up @@ -844,7 +874,7 @@ namespace FireboltSDK {

static constexpr uint32_t WAITSLOT_TIME = 100;
template <typename RESPONSE>
Firebolt::Error WaitForEventResponse(const uint32_t& id, const string& eventName, RESPONSE& response, const uint32_t waitTime)
Firebolt::Error WaitForEventResponse(const uint32_t& id, const string& eventName, RESPONSE& response, const uint32_t waitTime, EventMap& _eventMap)
{
Firebolt::Error result = Firebolt::Error::Timedout;
_adminLock.Lock();
Expand Down Expand Up @@ -973,7 +1003,8 @@ namespace FireboltSDK {
WPEFramework::Core::ProxyType<Channel> _channel;
IEventHandler* _eventHandler;
PendingMap _pendingQueue;
EventMap _eventMap;
EventMap _internalEventMap;
EventMap _externalEventMap;
uint64_t _scheduledTime;
uint32_t _waitTime;
Listener _listener;
Expand Down
Loading

0 comments on commit 11d25e9

Please sign in to comment.