Skip to content

Commit

Permalink
Made thread queue closable
Browse files Browse the repository at this point in the history
  • Loading branch information
fpagliughi committed Jul 10, 2024
1 parent c892569 commit 1b71a16
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 58 deletions.
114 changes: 102 additions & 12 deletions include/mqtt/thread_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@

namespace mqtt {

/**
* Exception that is thrown when operations are performed on a closed
* queue.
*/
class queue_closed : public std::runtime_error
{
public:
queue_closed() : std::runtime_error("queue is closed") {}
};

/////////////////////////////////////////////////////////////////////////////

/**
Expand All @@ -53,6 +63,12 @@ namespace mqtt {
* queue will block until the number of items are removed from the queue to
* bring the size below the new capacity.
* @par
* The queue can be closed. After that, no new items can be placed into it;
* a `put()` calls will fail. Receivers can still continue to get any items
* out of the queue that were added before it was closed. Once there are no
* more items left in the queue after it is closed, it is considered "done".
* Nothing useful can be done with the queue.
* @par
* Note that the queue uses move semantics to place items into the queue and
* remove items from the queue. This means that the type, T, of the data
* held by the queue only needs to follow move semantics; not copy
Expand Down Expand Up @@ -87,7 +103,10 @@ class thread_queue
/** Condition gets signaled then item removed from full queue */
std::condition_variable notFullCond_;
/** The capacity of the queue */
size_type cap_;
size_type cap_{MAX_CAPACITY};
/** Whether the queue is closed */
bool closed_{false};

/** The actual STL container to hold data */
std::queue<T, Container> que_;

Expand All @@ -96,13 +115,25 @@ class thread_queue
/** General purpose guard */
using unique_guard = std::unique_lock<std::mutex>;

/** Throw an excpetion if the queue is closed. */
void check_closed() {
if (closed_) throw queue_closed{};
}

/** Throw an excpetion if the queue is done. */
void check_done() {
if (closed_ && que_.empty()) throw queue_closed{};
}

public:
/**
* Constructs a queue with the maximum capacity.
* This is effectively an unbounded queue.
*/
thread_queue() : cap_(MAX_CAPACITY) {}
thread_queue() {}
/**
* Constructs a queue with the specified capacity.
* This is a bounded queue.
* @param cap The maximum number of items that can be placed in the
* queue. The minimum capacity is 1.
*/
Expand All @@ -113,15 +144,15 @@ class thread_queue
* there are any items in the queue.
*/
bool empty() const {
guard g(lock_);
guard g{lock_};
return que_.empty();
}
/**
* Gets the capacity of the queue.
* @return The maximum number of elements before the queue is full.
*/
size_type capacity() const {
guard g(lock_);
guard g{lock_};
return cap_;
}
/**
Expand All @@ -142,6 +173,46 @@ class thread_queue
guard g(lock_);
return que_.size();
}
/**
* Close the queue.
* Once closed, the queue will not accept any new items, but receievers
* will still be able to get any remaining items out of the queue until
* it is empty.
*/
void close() {
guard g{lock_};
closed_ = true;
}
/*
void close(value_type finalVal) {
unique_guard g(lock_);
if (closed_) return;
que_.emplace(std::move(finalVal));
g.unlock();
notEmptyCond_.notify_one();
}
*/
/**
* Determines if the queue is closed.
* Once closed, the queue will not accept any new items, but receievers
* will still be able to get any remaining items out of the queue until
* it is empty.
* @return @em true if the queue is closed, @false otherwise.
*/
bool closed() const {
guard g{lock_};
return closed_;
}
/**
* Determines if all possible operations are done on the queue. If the
* queue is closed and empty, then no further useful operations can be
* done on it.
* @return @true if the queue is closed and empty, @em false otherwise.
*/
bool done() const {
guard g{lock_};
return closed_ && que_.empty();
}
/**
* Put an item into the queue.
* If the queue is full, this will block the caller until items are
Expand All @@ -150,8 +221,9 @@ class thread_queue
*/
void put(value_type val) {
unique_guard g(lock_);
notFullCond_.wait(g, [this] { return que_.size() < cap_; });
notFullCond_.wait(g, [this] { return que_.size() < cap_ || closed_; });

check_closed();
que_.emplace(std::move(val));
g.unlock();
notEmptyCond_.notify_one();
Expand All @@ -164,6 +236,7 @@ class thread_queue
*/
bool try_put(value_type val) {
unique_guard g(lock_);
check_closed();
if (que_.size() >= cap_)
return false;

Expand All @@ -184,7 +257,10 @@ class thread_queue
template <typename Rep, class Period>
bool try_put_for(value_type val, const std::chrono::duration<Rep, Period>& relTime) {
unique_guard g(lock_);
if (!notFullCond_.wait_for(g, relTime, [this] { return que_.size() < cap_; }))
bool to = !notFullCond_.wait_for(g, relTime,
[this] { return que_.size() < cap_ || closed_; });
check_closed();
if (to)
return false;

que_.emplace(std::move(val));
Expand All @@ -207,7 +283,11 @@ class thread_queue
value_type val, const std::chrono::time_point<Clock, Duration>& absTime
) {
unique_guard g(lock_);
if (!notFullCond_.wait_until(g, absTime, [this] { return que_.size() < cap_; }))
bool to = !notFullCond_.wait_until(g, absTime,
[this] { return que_.size() < cap_ || closed_; });

check_closed();
if (to)
return false;

que_.emplace(std::move(val));
Expand All @@ -226,7 +306,8 @@ class thread_queue
return;

unique_guard g(lock_);
notEmptyCond_.wait(g, [this] { return !que_.empty(); });
notEmptyCond_.wait(g, [this] { return !que_.empty() || closed_; });
check_done();

*val = std::move(que_.front());
que_.pop();
Expand All @@ -241,7 +322,8 @@ class thread_queue
*/
value_type get() {
unique_guard g(lock_);
notEmptyCond_.wait(g, [this] { return !que_.empty(); });
notEmptyCond_.wait(g, [this] { return !que_.empty() || closed_; });
check_done();

value_type val = std::move(que_.front());
que_.pop();
Expand All @@ -262,8 +344,11 @@ class thread_queue
return false;

unique_guard g(lock_);
if (que_.empty())
if (que_.empty()) {
if (closed_)
throw queue_closed{};
return false;
}

*val = std::move(que_.front());
que_.pop();
Expand All @@ -287,7 +372,10 @@ class thread_queue
return false;

unique_guard g(lock_);
if (!notEmptyCond_.wait_for(g, relTime, [this] { return !que_.empty(); }))
bool to = !notEmptyCond_.wait_for(g, relTime, [this] { return !que_.empty() || closed_; });

check_done();
if (to)
return false;

*val = std::move(que_.front());
Expand All @@ -314,7 +402,9 @@ class thread_queue
return false;

unique_guard g(lock_);
if (!notEmptyCond_.wait_until(g, absTime, [this] { return !que_.empty(); }))
bool to = !notEmptyCond_.wait_until(g, absTime, [this] { return !que_.empty(); });
check_done();
if (to)
return false;

*val = std::move(que_.front());
Expand Down
42 changes: 21 additions & 21 deletions src/async_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -876,37 +876,37 @@ void async_client::stop_consuming()

const_message_ptr async_client::consume_message()
{
// For backward compatibility we ignore the 'connected' events,
// whereas disconnected/lost return an empty pointer.
while (true) {
auto evt = que_->get();
// For backward compatibility we ignore the 'connected' events,
// whereas disconnected/lost return an empty pointer.
while (true) {
auto evt = que_->get();

if (const auto* pval = std::get_if<const_message_ptr>(&evt))
return *pval;
if (const auto* pval = std::get_if<const_message_ptr>(&evt))
return *pval;

if (!std::holds_alternative<connected_event>(evt))
return const_message_ptr{};
}
if (!std::holds_alternative<connected_event>(evt))
return const_message_ptr{};
}
}

bool async_client::try_consume_message(const_message_ptr* msg)
{
event_type evt;

while (true) {
if (!que_->try_get(&evt))
return false;
while (true) {
if (!que_->try_get(&evt))
return false;

if (const auto* pval = std::get_if<const_message_ptr>(&evt)) {
*msg = std::move(*pval);
break;
}
if (const auto* pval = std::get_if<const_message_ptr>(&evt)) {
*msg = std::move(*pval);
break;
}

if (!std::holds_alternative<connected_event>(evt)) {
*msg = const_message_ptr{};
break;
}
}
if (!std::holds_alternative<connected_event>(evt)) {
*msg = const_message_ptr{};
break;
}
}
return true;
}

Expand Down
4 changes: 2 additions & 2 deletions test/unit/test_ssl_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ TEST_CASE("ssl_options test error handler", "[options]")
{
mqtt::ssl_options opts{orgOpts};

orgOpts.set_error_handler([](const std::string& msg) {
std::cerr << "SSL Error: " << msg << std::endl;
orgOpts.set_error_handler([](const std::string& msg) {
std::cerr << "SSL Error: " << msg << std::endl;
});
}
Loading

0 comments on commit 1b71a16

Please sign in to comment.