Skip to content

Commit

Permalink
Added consumer_clear()
Browse files Browse the repository at this point in the history
  • Loading branch information
fpagliughi committed Jul 13, 2024
1 parent 2e68d0f commit e558946
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 1 deletion.
22 changes: 21 additions & 1 deletion include/mqtt/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -747,18 +747,39 @@ class async_client : public virtual iasync_client
) override;
/**
* Start consuming messages.
*
* This initializes the client to receive messages through a queue that
* can be read synchronously.
*
* Normally this should be called _before_ connecting the client to the
* broker, in order to have the consumer queue in place in the event
* that the immediately starts sending messages (such as any retained
* messages) while the client is still in the context of the connect
* call.
*
* This _must_ also be called before calling any 'consume_message' or
* "'consume_event' methods.
*
* Internally, this just creates a thread-safe queue for `mqtt::event`
* objects, then hooks into the message and state-change callback to
* push events into the queue in the order received.
*/
void start_consuming() override;
/**
* Stop consuming messages.
*
* This shuts down the internal callback and closes the internal
* consumer queue. Any remaining messages and events can be read until
* the queue is emptied, but nothing further will be added to it.
* This will also wake up any thread waiting on the queue.
*/
void stop_consuming() override;
/**
* This clears the consumer queue, discarding any pending event.
*/
void clear_consumer() override {
if (que_) que_->clear();
}
/**
* Determines if the consumer queue has been closed.
* Once closed, any events in the queue can still be read, but no new
Expand Down Expand Up @@ -884,7 +905,6 @@ class async_client : public virtual iasync_client
this->try_consume_message_until(&msg, absTime);
return msg;
}

/**
* Read the next message from the queue.
* This blocks until a new message arrives.
Expand Down
4 changes: 4 additions & 0 deletions include/mqtt/iasync_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,10 @@ class iasync_client
* messages.
*/
virtual void stop_consuming() = 0;
/**
* This clears the consumer queue, discarding any pending event.
*/
virtual void clear_consumer() {}
/**
* Determines if the consumer queue has been closed.
* Once closed, any events in the queue can still be read, but no new
Expand Down
11 changes: 11 additions & 0 deletions include/mqtt/thread_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,17 @@ class thread_queue
guard g{lock_};
return is_done();
}
/**
* Clear the contents of the queue.
* This discards all items in the queue.
*/
void clear() {
unique_guard g{lock_};
while (!que_.empty())
que_.pop();
g.unlock();
notFullCond_.notify_all();
}
/**
* Put an item into the queue.
* If the queue is full, this will block the caller until items are
Expand Down

0 comments on commit e558946

Please sign in to comment.