Skip to content

Commit

Permalink
Converted the consumer message queue to an event queue.
Browse files Browse the repository at this point in the history
  • Loading branch information
fpagliughi committed Jul 6, 2024
1 parent 5357a28 commit 43e8ef6
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 55 deletions.
2 changes: 2 additions & 0 deletions examples/data_publish.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,10 @@ int main(int argc, char* argv[])
{
string serverURI = (argc > 1) ? string{argv[1]} : DFLT_SERVER_URI;

// Create a persistence object
encoded_file_persistence persist{PERSIST_KEY};

// Create a client to use the persistence.
mqtt::async_client cli(serverURI, CLIENT_ID, MAX_BUFFERED_MSGS, &persist);

auto connOpts = mqtt::connect_options_builder()
Expand Down
97 changes: 89 additions & 8 deletions include/mqtt/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "mqtt/callback.h"
#include "mqtt/create_options.h"
#include "mqtt/delivery_token.h"
#include "mqtt/event.h"
#include "mqtt/exception.h"
#include "mqtt/iaction_listener.h"
#include "mqtt/iasync_client.h"
Expand Down Expand Up @@ -110,8 +111,8 @@ class async_client : public virtual iasync_client
public:
/** Smart/shared pointer for an object of this class */
using ptr_t = std::shared_ptr<async_client>;
/** Type for a thread-safe queue to consume messages synchronously */
using consumer_queue_type = std::unique_ptr<thread_queue<const_message_ptr>>;
/** Type for a thread-safe queue to consume events synchronously */
using consumer_queue_type = std::unique_ptr<thread_queue<event_type>>;

/** Handler type for registering an individual message callback */
using message_handler = std::function<void(const_message_ptr)>;
Expand Down Expand Up @@ -761,14 +762,14 @@ class async_client : public virtual iasync_client
* This blocks until a new message arrives.
* @return The message and topic.
*/
const_message_ptr consume_message() override { return que_->get(); }
const_message_ptr consume_message() override;
/**
* Try to read the next message from the queue without blocking.
* @param msg Pointer to the value to receive the message
* @return @em true is a message was read, @em false if no message was
* available.
*/
bool try_consume_message(const_message_ptr* msg) override { return que_->try_get(msg); }
bool try_consume_message(const_message_ptr* msg);
/**
* Waits a limited time for a message to arrive.
* @param msg Pointer to the value to receive the message
Expand All @@ -780,7 +781,15 @@ class async_client : public virtual iasync_client
bool try_consume_message_for(
const_message_ptr* msg, const std::chrono::duration<Rep, Period>& relTime
) {
return que_->try_get_for(msg, relTime);
event_type evt;
if (!que_->try_get_for(&evt, relTime))
return false;

if (const auto* pval = std::get_if<message_arrived_event>(&evt))
*msg = std::move(pval->msg);
else
*msg = const_message_ptr{};
return true;
}
/**
* Waits a limited time for a message to arrive.
Expand All @@ -793,7 +802,7 @@ class async_client : public virtual iasync_client
const std::chrono::duration<Rep, Period>& relTime
) {
const_message_ptr msg;
que_->try_get_for(&msg, relTime);
this->try_consume_message_for(&msg, relTime);
return msg;
}
/**
Expand All @@ -807,7 +816,15 @@ class async_client : public virtual iasync_client
bool try_consume_message_until(
const_message_ptr* msg, const std::chrono::time_point<Clock, Duration>& absTime
) {
return que_->try_get_until(msg, absTime);
event_type evt;
if (!que_->try_get_until(&evt, absTime))
return false;

if (const auto* pval = std::get_if<message_arrived_event>(&evt))
*msg = std::move(pval->msg);
else
*msg = const_message_ptr{};
return true;
}
/**
* Waits until a specific time for a message to appear.
Expand All @@ -819,9 +836,73 @@ class async_client : public virtual iasync_client
const std::chrono::time_point<Clock, Duration>& absTime
) {
const_message_ptr msg;
que_->try_get_until(&msg, absTime);
this->try_consume_message_until(&msg, absTime);
return msg;
}

/**
* Read the next message from the queue.
* This blocks until a new message arrives.
* @return The message and topic.
*/
event_type consume_event() override { return que_->get(); }
/**
* Try to read the next message from the queue without blocking.
* @param msg Pointer to the value to receive the message
* @return @em true is a message was read, @em false if no message was
* available.
*/
bool try_consume_event(event_type* evt) override { return que_->try_get(evt); }
/**
* Waits a limited time for a message to arrive.
* @param msg Pointer to the value to receive the message
* @param relTime The maximum amount of time to wait for a message.
* @return @em true if a message was read, @em false if a timeout
* occurred.
*/
template <typename Rep, class Period>
bool try_consume_event_for(
event_type* evt, const std::chrono::duration<Rep, Period>& relTime
) {
return que_->try_get_for(evt, relTime);
}
/**
* Waits a limited time for a message to arrive.
* @param relTime The maximum amount of time to wait for a message.
* @return A shared pointer to the message that was received. It will be
* empty on timeout.
*/
template <typename Rep, class Period>
event_type try_consume_event_for(const std::chrono::duration<Rep, Period>& relTime) {
event_type evt;
que_->try_get_for(&evt, relTime);
return evt;
}
/**
* Waits until a specific time for a message to appear.
* @param msg Pointer to the value to receive the message
* @param absTime The time point to wait until, before timing out.
* @return @em true if a message was read, @em false if a timeout
* occurred.
*/
template <class Clock, class Duration>
bool try_consume_event_until(
event_type* evt, const std::chrono::time_point<Clock, Duration>& absTime
) {
return que_->try_get_until(evt, absTime);
}
/**
* Waits until a specific time for a message to appear.
* @param absTime The time point to wait until, before timing out.
* @return The message, if read, an empty pointer if not.
*/
template <class Clock, class Duration>
event_type try_consume_event_until(const std::chrono::time_point<Clock, Duration>& absTime
) {
event_type evt;
que_->try_get_until(&evt, absTime);
return evt;
}
};

/** Smart/shared pointer to an asynchronous MQTT client object */
Expand Down
1 change: 0 additions & 1 deletion include/mqtt/callback.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ using callback_ptr = callback::ptr_t;
using const_callback_ptr = callback::const_ptr_t;

/////////////////////////////////////////////////////////////////////////////
// end namespace mqtt
} // namespace mqtt

#endif // __mqtt_callback_h
2 changes: 1 addition & 1 deletion include/mqtt/create_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace mqtt {

/////////////////////////////////////////////////////////////////////////////

/** An empty type that can be used as a `persistent_type` variant opiton. */
/** An empty type that can be used as a `persistent_type` variant option. */
struct no_persistence
{
};
Expand Down
2 changes: 1 addition & 1 deletion include/mqtt/exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,4 +258,4 @@ class security_exception : public exception
/////////////////////////////////////////////////////////////////////////////
} // namespace mqtt

#endif // __mqtt_token_h
#endif // __mqtt_exception_h
1 change: 0 additions & 1 deletion include/mqtt/iaction_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ using iaction_listener_ptr = iaction_listener::ptr_t;
using const_iaction_listener_ptr = iaction_listener::const_ptr_t;

/////////////////////////////////////////////////////////////////////////////
// end namespace mqtt
} // namespace mqtt

#endif // __mqtt_iaction_listener_h
14 changes: 14 additions & 0 deletions include/mqtt/iasync_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "mqtt/connect_options.h"
#include "mqtt/delivery_token.h"
#include "mqtt/disconnect_options.h"
#include "mqtt/event.h"
#include "mqtt/exception.h"
#include "mqtt/iaction_listener.h"
#include "mqtt/iclient_persistence.h"
Expand Down Expand Up @@ -461,6 +462,19 @@ class iasync_client
* available.
*/
virtual bool try_consume_message(const_message_ptr* msg) = 0;
/**
* Read the next event from the queue.
* This blocks until a new message arrives.
* @return The message and topic.
*/
virtual event_type consume_event() = 0;
/**
* Try to read the next message from the queue without blocking.
* @param msg Pointer to the value to receive the message
* @return @em true is a message was read, @em false if no message was
* available.
*/
virtual bool try_consume_event(event_type* evt) = 0;
};

/////////////////////////////////////////////////////////////////////////////
Expand Down
Loading

0 comments on commit 43e8ef6

Please sign in to comment.