Skip to content

Commit

Permalink
remove lock mq step in broadcasting mode rebalancing
Browse files Browse the repository at this point in the history
  • Loading branch information
redlsz committed Sep 29, 2024
1 parent 551c8c3 commit f3f7ea0
Showing 1 changed file with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ private boolean rebalanceByTopic(final String topic, final boolean isOrder) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, false);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}", consumerGroup, topic, mqSet, mqSet);
Expand Down Expand Up @@ -477,7 +477,7 @@ private void truncateMessageQueueNotMyTopic() {
}

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
final boolean needLockMq) {
boolean changed = false;

// drop process queues no longer belong me
Expand Down Expand Up @@ -518,7 +518,7 @@ private boolean updateProcessQueueTableInRebalance(final String topic, final Set
List<PullRequest> pullRequestList = new ArrayList<>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) {
if (needLockMq && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
allMQLocked = false;
continue;
Expand Down

0 comments on commit f3f7ea0

Please sign in to comment.