Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cooperative rebalance support #907

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/librdkafka
4 changes: 4 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ export class KafkaConsumer extends Client<KafkaConsumerEvents> {
consume(cb: (err: LibrdKafkaError, messages: Message[]) => void): void;
consume(): void;

incrementalAssign(assigments: Assignment[]): this;

incrementalUnassign(assignments: Assignment[]): this;

getWatermarkOffsets(topic: string, partition: number): WatermarkOffsets;

offsetsStore(topicPartitions: TopicPartitionOffset[]): any;
Expand Down
102 changes: 82 additions & 20 deletions lib/kafka-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,49 @@ var DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY = 500;
var DEFAULT_CONSUME_TIME_OUT = 1000;
util.inherits(KafkaConsumer, Client);

var eagerRebalanceCallback = function(err, assignment) {
// Create the librdkafka error
err = LibrdKafkaError.create(err);
// Emit the event
this.emit('rebalance', err, assignment);

// That's it
try {
if (err.code === -175 /*ERR__ASSIGN_PARTITIONS*/) {
this.assign(assignment);
} else if (err.code === -174 /*ERR__REVOKE_PARTITIONS*/) {
this.unassign();
}
} catch (e) {
// Ignore exceptions if we are not connected
if (this.isConnected()) {
this.emit('rebalance.error', e);
}
}
}

var cooperativeRebalanceCallback = function(err, assignment) {
// Create the librdkafka error
err = LibrdKafkaError.create(err);
// Emit the event
this.emit('rebalance', err, assignment);

// That's it
try {
if (err.code === -175 /*ERR__ASSIGN_PARTITIONS*/) {
this.incrementalAssign(assignment);
} else if (err.code === -174 /*ERR__REVOKE_PARTITIONS*/) {
this.incrementalUnassign(assignment);
}
} catch (e) {
// Ignore exceptions if we are not connected
if (this.isConnected()) {
this.emit('rebalance.error', e);
}
}
}


/**
* KafkaConsumer class for reading messages from Kafka
*
Expand Down Expand Up @@ -52,26 +95,9 @@ function KafkaConsumer(conf, topicConf) {

// If rebalance is undefined we don't want any part of this
if (onRebalance && typeof onRebalance === 'boolean') {
conf.rebalance_cb = function(err, assignment) {
// Create the librdkafka error
err = LibrdKafkaError.create(err);
// Emit the event
self.emit('rebalance', err, assignment);

// That's it
try {
if (err.code === -175 /*ERR__ASSIGN_PARTITIONS*/) {
self.assign(assignment);
} else if (err.code === -174 /*ERR__REVOKE_PARTITIONS*/) {
self.unassign();
}
} catch (e) {
// Ignore exceptions if we are not connected
if (self.isConnected()) {
self.emit('rebalance.error', e);
}
}
};
conf.rebalance_cb = conf['partition.assignment.strategy'] === 'cooperative-sticky'
thynson marked this conversation as resolved.
Show resolved Hide resolved
? cooperativeRebalanceCallback.bind(this)
: eagerRebalanceCallback.bind(this);
} else if (onRebalance && typeof onRebalance === 'function') {
/*
* Once this is opted in to, that's it. It's going to manually rebalance
Expand Down Expand Up @@ -264,6 +290,19 @@ KafkaConsumer.prototype.assign = function(assignments) {
return this;
};

/**
* Incremental assign the consumer specific partitions and topics
*
* @param {array} assignments - Assignments array. Should contain
* objects with topic and partition set.
* @return {Client} - Returns itself
*/

KafkaConsumer.prototype.incrementalAssign = function(assignments) {
this._client.incrementalAssign(TopicPartition.map(assignments));
return this;
};

/**
* Unassign the consumer from its assigned partitions and topics.
*
Expand All @@ -275,6 +314,18 @@ KafkaConsumer.prototype.unassign = function() {
return this;
};

/**
* Incremental unassign the consumer from specific partitions and topics
*
* @param {array} assignments - Assignments array. Should contain
* objects with topic and partition set.
* @return {Client} - Returns itself
*/

KafkaConsumer.prototype.incrementalUnassign = function(assignments) {
this._client.incrementalUnassign(TopicPartition.map(assignments));
return this;
};

/**
* Get the assignments for the consumer
Expand Down Expand Up @@ -341,6 +392,17 @@ KafkaConsumer.prototype.position = function(toppars) {
return this._errorWrap(this._client.position(toppars), true);
};

/**
* Check whether the consumer considers the current assignment to have been
* lost invountarily.
*
* @throws Throws from native land if
* @returns {boolean} Whether the assignment have been lost or not
*/
KafkaConsumer.prototype.assimentLost = function() {
return this._client.assignmentLost();
}

/**
* Unsubscribe from all currently subscribed topics
*
Expand Down
Loading