Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/llapi check consumer from topic #59

Merged
merged 4 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion src/k2eg/service/pubsub/IPublisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <k2eg/common/types.h>

#include <cstddef>
#include <functional>
#include <map>
#include <memory>
Expand Down Expand Up @@ -46,7 +47,7 @@ typedef std::map<std::string, EventCallback> MapEvtHndlrForReqType;
typedef std::pair<std::string, EventCallback> MapEvtHndlrForReqTypePair;
typedef std::map<std::string,std::string> PublisherHeaders;

/**
/*
Define the porperties of a queue
*/
struct QueueDescription {
Expand All @@ -62,6 +63,35 @@ struct QueueDescription {
};
DEFINE_PTR_TYPES(QueueDescription)

/*
Information about the subscriber of the queue
*/
struct QueueSubscriberInfo{
std::string client_id;
std::string member_id;
std::string host;

};
DEFINE_PTR_TYPES(QueueSubscriberInfo)

/*
Information about the subscriber group of the queue
*/
struct QueueSubscriberGroupInfo{
std::string name;
std::vector<QueueSubscriberInfoUPtr> subscribers;
};
DEFINE_PTR_TYPES(QueueSubscriberGroupInfo)
/*
Define the queue metadata infromation
*/
struct QueueMetadata{
// the number of subcriber to the queue
std::string name;
std::vector<QueueSubscriberGroupInfoUPtr> subscriber_groups;
};
DEFINE_PTR_TYPES(QueueMetadata)

class IPublisher {
protected:
MapEvtHndlrForReqType eventCallbackForReqType;
Expand All @@ -77,6 +107,7 @@ class IPublisher {
virtual int setCallBackForReqType(const std::string req_type, EventCallback eventCallback);
virtual int createQueue(const QueueDescription& new_queue) = 0;
virtual int deleteQueue(const std::string& queue_name) = 0;
virtual QueueMetadataUPtr getQueueMetadata(const std::string& queue_name) = 0;
virtual int flush(const int timeo) = 0;
virtual int pushMessage(PublishMessageUniquePtr message, const PublisherHeaders& headers = PublisherHeaders()) = 0;
virtual int pushMessages(PublisherMessageVector& messages, const PublisherHeaders& headers = PublisherHeaders()) = 0;
Expand Down
172 changes: 144 additions & 28 deletions src/k2eg/service/pubsub/impl/kafka/RDKafkaPublisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@
#include <librdkafka/rdkafkacpp.h>

#include <cstddef>
#include <cstring>
#include <iostream>
#include <memory>
#include <string>

#include "k2eg/service/pubsub/IPublisher.h"
#include "librdkafka/rdkafka.h"

using namespace k2eg::service::pubsub;
using namespace k2eg::service::pubsub::impl::kafka;

RDKafkaPublisher::RDKafkaPublisher(ConstPublisherConfigurationUPtr configuration)
Expand Down Expand Up @@ -103,7 +107,7 @@ RDKafkaPublisher::createQueue(const QueueDescription &new_queue) {
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
const rd_kafka_CreateTopics_result_t *res = nullptr;
const rd_kafka_topic_result_t **restopics = nullptr;

std::unique_ptr<rd_kafka_NewTopic_t *, RdKafkaNewTopicArrayDeleter> new_topics_uptr(
static_cast<rd_kafka_NewTopic_t **>(malloc(sizeof(rd_kafka_NewTopic_t *) * 1)), RdKafkaNewTopicArrayDeleter(1));
// define the topic
Expand Down Expand Up @@ -162,10 +166,7 @@ RDKafkaPublisher::createQueue(const QueueDescription &new_queue) {
const rd_kafka_topic_result_t *tres = restopics[i];
auto topic_name = std::string(rd_kafka_topic_result_name(tres));
auto toipc_result_error = rd_kafka_topic_result_error(tres);
if (
toipc_result_error != RD_KAFKA_RESP_ERR_NO_ERROR &&
toipc_result_error != RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS
) {
if (toipc_result_error != RD_KAFKA_RESP_ERR_NO_ERROR && toipc_result_error != RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS) {
auto topic_result_error_string = std::string(rd_kafka_topic_result_error_string(tres));
auto err_name = std::string(rd_kafka_err2name(toipc_result_error));
throw std::runtime_error("Error '" + err_name + "' creating topic " + topic_name + " (" + topic_result_error_string + ")");
Expand Down Expand Up @@ -226,6 +227,144 @@ RDKafkaPublisher::deleteQueue(const std::string &queue_name) {
return 0;
}

QueueMetadataUPtr
RDKafkaPublisher::getQueueMetadata(const std::string &queue_name) {
// create empty metadata ptr
char errstr[512];
const rd_kafka_metadata_t *metadata = nullptr;
const int tmout = 30 * 1000;
QueueMetadataUPtr result = std::make_unique<QueueMetadata>();
result->name = queue_name;
// allocate topic structure
std::unique_ptr<rd_kafka_topic_t, RdKafkaTopicDeleter> topic_uptr(rd_kafka_topic_new(producer.get()->c_ptr(), queue_name.c_str(), NULL),
RdKafkaTopicDeleter());

rd_kafka_resp_err_t err = rd_kafka_metadata(producer.get()->c_ptr(), 0 /*only_given_topics*/, topic_uptr.get(), &metadata, tmout);
if (err) return result;

// consumer group
std::unique_ptr<rd_kafka_queue_t, RdKafkaQueueDeleter> queue(rd_kafka_queue_new(producer.get()->c_ptr()), RdKafkaQueueDeleter());
std::unique_ptr<rd_kafka_AdminOptions_t, RdKafkaAdminOptionDeleter> admin_options(
rd_kafka_AdminOptions_new(producer.get()->c_ptr(), RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS),
RdKafkaAdminOptionDeleter()
);

if (err = rd_kafka_AdminOptions_set_request_timeout(admin_options.get(), tmout, errstr, sizeof(errstr)); err != RD_KAFKA_RESP_ERR_NO_ERROR) {
throw std::runtime_error("Error creating kafka option reqeust timeout (" + std::string(errstr) + ")");
}

rd_kafka_ListConsumerGroups(producer.get()->c_ptr(), admin_options.get(), queue.get());
rd_kafka_event_t *event = rd_kafka_queue_poll(queue.get(), -1 /* indefinitely but limited by
* the request timeout set
* above (10s) */);
if (!event) {
/* see yield call in stop() signal handler */
throw std::runtime_error("NUll event received");
} else if (rd_kafka_event_error(event)) {
rd_kafka_resp_err_t err = rd_kafka_event_error(event);
/* ListConsumerGroups request failed */
throw std::runtime_error("Error executing ListConsumerGroups (" + std::string(rd_kafka_event_error_string(event)) + ")");
} else {
/* ListConsumerGroups request succeeded, but individual
* groups may have errors. */
const rd_kafka_ListConsumerGroups_result_t *consumer_group_info = rd_kafka_event_ListConsumerGroups_result(event);
scan_groups(consumer_group_info, *result);
}
return result;
}

/**
* @brief Print group information.
*/
int
RDKafkaPublisher::scan_groups(const rd_kafka_ListConsumerGroups_result_t *list, QueueMetadata& q_desc_ref) {
size_t i;
const rd_kafka_ConsumerGroupListing_t **result_groups;
const rd_kafka_error_t **errors;
size_t result_groups_cnt;
size_t result_error_cnt;
result_groups = rd_kafka_ListConsumerGroups_result_valid(list, &result_groups_cnt);
errors = rd_kafka_ListConsumerGroups_result_errors(list, &result_error_cnt);

if (result_groups_cnt == 0) { return 0; }

for (i = 0; i < result_groups_cnt; i++) {
const rd_kafka_ConsumerGroupListing_t *group = result_groups[i];
const char *group_id = rd_kafka_ConsumerGroupListing_group_id(group);
rd_kafka_consumer_group_state_t state = rd_kafka_ConsumerGroupListing_state(group);
int is_simple_consumer_group = rd_kafka_ConsumerGroupListing_is_simple_consumer_group(group);
const struct rd_kafka_group_list *grplistp = nullptr;

// printf("Group \"%s\", is simple %" PRId32
// ", "
// "state %s",
// group_id,
// is_simple_consumer_group,
// rd_kafka_consumer_group_state_name(state));
// printf("\n");
q_desc_ref.subscriber_groups.push_back(
get_group_info(group_id)
);

}
// for (i = 0; i < result_error_cnt; i++) {
// const rd_kafka_error_t *error = errors[i];
// printf("Error[%" PRId32 "]: %s\n", rd_kafka_error_code(error), rd_kafka_error_string(error));
// }
return 0;
}

QueueSubscriberGroupInfoUPtr
RDKafkaPublisher::get_group_info(const char *group) {

rd_kafka_resp_err_t err;
const rd_kafka_group_list *grplist;
QueueSubscriberGroupInfoUPtr result = std::make_unique<QueueSubscriberGroupInfo>();
err = rd_kafka_list_groups(producer.get()->c_ptr(), group, &grplist, 10000);
if (err) {
throw std::runtime_error("Failed to acquire group list (" + std::string(rd_kafka_err2str(err)) + ")");
}
std::unique_ptr<const rd_kafka_group_list, RdKafkaGroupListDeleter> grlist_uptr(grplist, RdKafkaGroupListDeleter());
// set group name
result->name = group;
// scan the subscriber of the group
for (int i = 0; i < grlist_uptr->group_cnt; i++) {
const struct rd_kafka_group_info *gi = &grlist_uptr->groups[i];
int j;

//printf("Group \"%s\" in state %s on broker %d (%s:%d)\n", gi->group, gi->state, gi->broker.id, gi->broker.host, gi->broker.port);
if (gi->err) throw std::runtime_error("Failed to acquire group info (" + std::string(rd_kafka_err2str(gi->err)) + ")");
// printf(
// " Protocol type \"%s\", protocol \"%s\", "
// "with %d member(s):\n",
// gi->protocol_type,
// gi->protocol,
// gi->member_cnt);

for (j = 0; j < gi->member_cnt; j++) {
const struct rd_kafka_group_member_info *mi;
mi = &gi->members[j];
// printf(" \"%s\", client id \"%s\" on host %s\n", mi->member_id, mi->client_id, mi->client_host);
// printf(" metadata: %d bytes\n", mi->member_metadata_size);
// printf(" assignment: %d bytes\n", mi->member_assignment_size);
result->subscribers.push_back(
MakeQueueSubscriberInfoUPtr(
QueueSubscriberInfo{
.client_id = mi->client_id,
.member_id = mi->member_id,
.host = mi->client_host
}
)
);
}
}

if (group && !grlist_uptr->group_cnt) fprintf(stderr, "%% No matching group (%s)\n", group);

// rd_kafka_group_list_destroy(grplist);
return result;
}

int
RDKafkaPublisher::pushMessage(PublishMessageUniquePtr message, const std::map<std::string, std::string> &headers) {
RdKafka::ErrorCode resp = RdKafka::ERR_NO_ERROR;
Expand Down Expand Up @@ -278,27 +417,4 @@ RDKafkaPublisher::pushMessages(PublisherMessageVector &messages, const std::map<
size_t
RDKafkaPublisher::getQueueMessageSize() {
return 0;
}

/**
* @brief Wait for up to \p tmout for any type of admin result.
* @returns the event
*/
rd_kafka_event_t *
RDKafkaPublisher::wait_admin_result(rd_kafka_queue_t *q, rd_kafka_event_type_t evtype, int tmout) {
rd_kafka_event_t *rkev;

while (1) {
rkev = rd_kafka_queue_poll(q, tmout);
if (!rkev) return nullptr;

if (rd_kafka_event_type(rkev) == evtype) return rkev;

if (rd_kafka_event_type(rkev) == RD_KAFKA_EVENT_ERROR) {
rd_kafka_event_error_string(rkev);
continue;
}
}

return NULL;
}
20 changes: 18 additions & 2 deletions src/k2eg/service/pubsub/impl/kafka/RDKafkaPublisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@

// smart pointer delete for rd_kafka_queue_t
namespace k2eg::service::pubsub::impl::kafka {

struct RdKafkaTopicDeleter {
void
operator()(rd_kafka_topic_t* topic) {
rd_kafka_topic_destroy(topic);
}
};

struct RdKafkaQueueDeleter {
void
operator()(rd_kafka_queue_t* queue) {
Expand All @@ -38,6 +46,13 @@ struct RdKafkaDeleteTopicArrayDeleter {
}
};

struct RdKafkaGroupListDeleter {
void
operator()(const rd_kafka_group_list* grlist) {
rd_kafka_group_list_destroy(grlist);
}
};

// smart pointer delete for rd_kafka_NewTopic_t**
struct RdKafkaNewTopicArrayDeleter {
const size_t count;
Expand All @@ -64,8 +79,8 @@ class RDKafkaPublisher : public IPublisher, RDKafkaBase, RdKafka::DeliveryReport
std::thread auto_poll_thread;
std::unique_ptr<RdKafka::Producer> producer;

rd_kafka_event_t* wait_admin_result(rd_kafka_queue_t* q, rd_kafka_event_type_t evtype, int tmout);

int scan_groups(const rd_kafka_ListConsumerGroups_result_t *list, QueueMetadata& q_desc_ref);
QueueSubscriberGroupInfoUPtr get_group_info(const char *group);
protected:
void dr_cb(RdKafka::Message& message);
void autoPoll();
Expand All @@ -77,6 +92,7 @@ class RDKafkaPublisher : public IPublisher, RDKafkaBase, RdKafka::DeliveryReport
virtual ~RDKafkaPublisher();
virtual int createQueue(const QueueDescription& new_queue);
virtual int deleteQueue(const std::string& queue_name);
virtual QueueMetadataUPtr getQueueMetadata(const std::string& queue_name);
virtual void setAutoPoll(bool autopoll);
virtual int flush(const int timeo = 10000);
virtual int pushMessage(PublishMessageUniquePtr message, const PublisherHeaders& headers = PublisherHeaders());
Expand Down
3 changes: 3 additions & 0 deletions test/controller/NodeController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class DummyPublisher : public IPublisher {
return 0;
}
int deleteQueue(const std::string& queue_name){return 0;}
QueueMetadataUPtr getQueueMetadata(const std::string& queue_name){return nullptr;}
int
flush(const int timeo) {
return 0;
Expand Down Expand Up @@ -125,6 +126,7 @@ class DummyPublisherCounter : public IPublisher {
return 0;
}
int deleteQueue(const std::string& queue_name){return 0;}
QueueMetadataUPtr getQueueMetadata(const std::string& queue_name){return nullptr;}
int
flush(const int timeo) {
return 0;
Expand Down Expand Up @@ -163,6 +165,7 @@ class DummyPublisherNoSignal : public IPublisher {
return 0;
}
int deleteQueue(const std::string& queue_name){return 0;}
QueueMetadataUPtr getQueueMetadata(const std::string& queue_name){return nullptr;}
int
flush(const int timeo) {
return 0;
Expand Down
13 changes: 12 additions & 1 deletion test/pubsub/kafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,18 @@ TEST(Kafka, CreateTopic) {
.retention_time = 1000*60*60,
.retention_size = 1024*1024*1
}), 0);

SubscriberInterfaceElementVector data;
std::unique_ptr<RDKafkaSubscriber> consumer =
std::make_unique<RDKafkaSubscriber>(std::make_unique<const SubscriberConfiguration>(SubscriberConfiguration{.server_address = "kafka:9092"}));
consumer->addQueue({"new-queue"});
consumer->getMsg(data, 10);
sleep(5);
consumer->getMsg(data, 10);
auto tipic_metadata = producer->getQueueMetadata("new-queue");
consumer.reset();
ASSERT_NE(tipic_metadata, nullptr);
ASSERT_STREQ(tipic_metadata->name.c_str(), "new-queue");
ASSERT_EQ(tipic_metadata->subscriber_groups.size(), 1);
ASSERT_EQ(producer->deleteQueue("new-queue"), 0);
}

Expand Down
Loading