From 5b255dc3afd8dddd9af3546cca631e04787a604e Mon Sep 17 00:00:00 2001 From: LetLetMe Date: Thu, 8 Aug 2024 11:34:24 +0800 Subject: [PATCH 1/6] =?UTF-8?q?cq=E5=8F=8C=E5=86=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rocketmq/broker/BrokerController.java | 5 + .../broker/offset/ConsumerOffsetManager.java | 20 +++ .../offset/RocksDBConsumerOffsetManager.java | 70 +++++++- .../processor/AdminBrokerProcessor.java | 98 ++++++++++++ .../RocksDBSubscriptionGroupManager.java | 22 +-- .../SubscriptionGroupManager.java | 20 +++ .../topic/RocksDBTopicConfigManager.java | 18 ++- .../broker/topic/TopicConfigManager.java | 20 +++ .../RocksdbTransferOffsetAndCqTest.java | 151 ++++++++++++++++++ .../rocketmq/client/impl/MQClientAPIImpl.java | 12 ++ .../common/config/AbstractRocksDBStorage.java | 1 + .../remoting/protocol/RequestCode.java | 1 + .../body/DiffConsumeQueueResponseBody.java | 35 ++++ .../rocketmq/store/DefaultMessageStore.java | 42 ++++- .../rocketmq/store/RocksDBMessageStore.java | 28 ++++ .../store/config/MessageStoreConfig.java | 20 +++ .../apache/rocketmq/store/queue/CqUnit.java | 1 + .../store/queue/RocksDBConsumeQueue.java | 2 +- .../tools/admin/DefaultMQAdminExt.java | 7 + .../tools/admin/DefaultMQAdminExtImpl.java | 6 + .../rocketmq/tools/admin/MQAdminExt.java | 3 + .../ExportMetadataInRocksDBCommand.java | 17 +- .../queue/DiffConsumeQueueCommand.java | 101 ++++++++++++ 23 files changed, 672 insertions(+), 28 deletions(-) create mode 100644 broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/DiffConsumeQueueResponseBody.java create mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/queue/DiffConsumeQueueCommand.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 145a9522306..e9276e15778 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -789,6 +789,9 @@ public boolean initializeMessageStore() { defaultMessageStore = new RocksDBMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable()); } else { defaultMessageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable()); + if (messageStoreConfig.isRocksdbCQWriteEnable()) { + defaultMessageStore.enableRocksdbCQWrite(); + } } if (messageStoreConfig.isEnableDLegerCommitLog()) { @@ -815,6 +818,8 @@ public boolean initializeMessageStore() { } catch (IOException e) { result = false; LOG.error("BrokerController#initialize: unexpected error occurs", e); + } catch (Exception e) { + throw new RuntimeException(e); } return result; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index 21f20dde325..d3874d91d51 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -31,6 +31,7 @@ import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.ConfigManager; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; @@ -373,6 +374,25 @@ public void setDataVersion(DataVersion dataVersion) { this.dataVersion = dataVersion; } + public boolean loadDataVersion() { + String fileName = null; + try { + fileName = this.configFilePath(); + String jsonString = MixAll.file2String(fileName); + if (jsonString != null) { + ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class); + if (obj != null) { + this.dataVersion = obj.dataVersion; + } + LOG.info("load consumer offset dataVersion success, " + fileName + " " + jsonString); + } + return true; + } catch (Exception e) { + LOG.error("load consumer offset dataVersion failed " + fileName, e); + return false; + } + } + public void removeOffset(final String group) { Iterator>> it = this.offsetTable.entrySet().iterator(); while (it.hasNext()) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java index de293fc4992..6cadf86417a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java @@ -23,7 +23,12 @@ import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.RocksDBConfigManager; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.utils.DataConverter; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.remoting.protocol.DataVersion; import org.rocksdb.WriteBatch; import com.alibaba.fastjson.JSON; @@ -31,11 +36,13 @@ public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager { + protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + protected RocksDBConfigManager rocksDBConfigManager; public RocksDBConsumerOffsetManager(BrokerController brokerController) { super(brokerController); - this.rocksDBConfigManager = new RocksDBConfigManager(configFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs()); + this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs()); } @Override @@ -43,9 +50,47 @@ public boolean load() { if (!rocksDBConfigManager.init()) { return false; } - return this.rocksDBConfigManager.loadData(this::decodeOffset); + if (!loadDataVersion() || !loadConsumerOffset()) { + return false; + } + + return true; + } + + public boolean loadConsumerOffset() { + return this.rocksDBConfigManager.loadData(this::decodeOffset) && merge(); + } + + private boolean merge() { + if (!brokerController.getMessageStoreConfig().isTransferOffsetJsonToRocksdb()) { + log.info("the switch transferOffsetJsonToRocksdb is off, no merge offset operation is needed."); + return true; + } + if (!UtilAll.isPathExists(this.configFilePath()) && !UtilAll.isPathExists(this.configFilePath() + ".bak")) { + log.info("consumerOffset json file is not exist, so skip merge"); + return true; + } + if (!super.loadDataVersion()) { + log.error("load json consumerOffset dataVersion error, startup will exit"); + return false; + } + + final DataVersion dataVersion = super.getDataVersion(); + final DataVersion kvDataVersion = this.getDataVersion(); + if (dataVersion.getCounter().get() > kvDataVersion.getCounter().get()) { + if (!super.load()) { + log.error("load json consumerOffset info failed, startup will exit"); + return false; + } + this.persist(); + this.getDataVersion().assignNewOne(dataVersion); + updateDataVersion(); + } + log.info("update offset from json, dataVersion:{}, offsetTable: {} ", this.getDataVersion(), JSON.toJSONString(this.getOffsetTable())); + return true; } + @Override public boolean stop() { return this.rocksDBConfigManager.stop(); @@ -69,8 +114,7 @@ protected void decodeOffset(final byte[] key, final byte[] body) { LOG.info("load exist local offset, {}, {}", topicAtGroup, wrapper.getOffsetTable()); } - @Override - public String configFilePath() { + public String rocksdbConfigFilePath() { return this.brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "config" + File.separator + "consumerOffsets" + File.separator; } @@ -103,4 +147,22 @@ private void putWriteBatch(final WriteBatch writeBatch, final String topicGroupN byte[] valueBytes = JSON.toJSONBytes(wrapper, SerializerFeature.BrowserCompatible); writeBatch.put(keyBytes, valueBytes); } + + @Override + public boolean loadDataVersion() { + return this.rocksDBConfigManager.loadDataVersion(); + } + + @Override + public DataVersion getDataVersion() { + return rocksDBConfigManager.getKvDataVersion(); + } + + public void updateDataVersion() { + try { + rocksDBConfigManager.updateKvDataVersion(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 3039cf5c97c..d0e31a5cecb 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -18,6 +18,7 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; @@ -32,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Random; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -69,6 +71,7 @@ import org.apache.rocketmq.common.LockCallback; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UnlockCallback; @@ -209,6 +212,7 @@ import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; +import org.apache.rocketmq.store.RocksDBMessageStore; import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.queue.ConsumeQueueInterface; @@ -339,6 +343,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, return fetchAllConsumeStatsInBroker(ctx, request); case RequestCode.QUERY_CONSUME_QUEUE: return queryConsumeQueue(ctx, request); + case RequestCode.DIFF_CONSUME_QUEUE: + return this.diffConsumeQueue(ctx, request); case RequestCode.UPDATE_AND_GET_GROUP_FORBIDDEN: return this.updateAndGetGroupForbidden(ctx, request); case RequestCode.GET_SUBSCRIPTIONGROUP_CONFIG: @@ -458,6 +464,79 @@ private RemotingCommand updateAndGetGroupForbidden(ChannelHandlerContext ctx, Re return response; } + private RemotingCommand diffConsumeQueue(ChannelHandlerContext ctx, RemotingCommand request) { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(ResponseCode.SUCCESS); + response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", "diff success, very good!"))); + + DefaultMessageStore messageStore = (DefaultMessageStore) brokerController.getMessageStore(); + RocksDBMessageStore rocksDBMessageStore = messageStore.getRocksDBMessageStore(); + + if (!messageStore.getMessageStoreConfig().isRocksdbCQWriteEnable()) { + response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", "RocksdbCQWriteEnable is false, diffConsumeQueue is invalid"))); + return response; + } + + ConcurrentMap> cqTable = messageStore.getConsumeQueueTable(); + Random random = new Random(); + for (Map.Entry> topicToCqListEntry : cqTable.entrySet()) { + String topic = topicToCqListEntry.getKey(); + ConcurrentMap queueIdToCqMap = topicToCqListEntry.getValue(); + for (Map.Entry queueIdToCqEntry : queueIdToCqMap.entrySet()) { + Integer queueId = queueIdToCqEntry.getKey(); + + ConsumeQueueInterface jsonCq = queueIdToCqEntry.getValue(); + ConsumeQueueInterface kvCq = rocksDBMessageStore.getConsumeQueue(topic, queueId); + + CqUnit kvCqEarliestUnit = kvCq.getEarliestUnit(); + CqUnit kvCqLatestUnit = kvCq.getLatestUnit(); + CqUnit jsonCqEarliestUnit = jsonCq.getEarliestUnit(); + CqUnit jsonCqLatestUnit = jsonCq.getLatestUnit(); + LOGGER.info("diffConsumeQueue topic:{}, queue:{}, kvCqEarliestUnit:{}, jsonCqEarliestUnit:{}, kvCqLatestUnit:{}, jsonCqLatestUnit:{}", + topic, queueId, kvCqEarliestUnit, jsonCqEarliestUnit, kvCqLatestUnit, jsonCqLatestUnit); + + long jsonOffset = jsonCq.getMaxOffsetInQueue() - 1; + int sampleCount = 10; + + Set sampledOffsets = new HashSet<>(); + + if (jsonOffset > 100) { + // Randomly sample 10 offsets from the last 100 entries + long startOffset = jsonOffset - 100; + while (sampledOffsets.size() < sampleCount) { + long randomOffset = startOffset + random.nextInt(100); + sampledOffsets.add(randomOffset); + } + } else if (jsonOffset > 10) { + // Take the last 10 entries + long startOffset = jsonOffset - 10; + for (long i = startOffset; i < jsonOffset; i++) { + sampledOffsets.add(i); + } + } else { + // Take all available entries if less than 10 + for (long i = 0; i < jsonOffset; i++) { + sampledOffsets.add(i); + } + } + + for (long currentOffset : sampledOffsets) { + Pair kvCqUnitTime = kvCq.getCqUnitAndStoreTime(currentOffset); + Pair jsonCqUnitTime = jsonCq.getCqUnitAndStoreTime(currentOffset); + if (!checkCqUnitEqual(kvCqUnitTime.getObject1(), jsonCqUnitTime.getObject1())) { + String diffInfo = String.format("Difference found at topic:%s, queue:%s, offset:%s - kvCqUnit:%s, jsonCqUnit:%s", + topic, queueId, currentOffset, kvCqUnitTime.getObject1(), jsonCqUnitTime.getObject1()); + LOGGER.error(diffInfo); + response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", diffInfo))); + return response; + } + } + } + + } + return response; + } + @Override public boolean rejectRequest() { return false; @@ -3305,4 +3384,23 @@ private boolean validateBlackListConfigExist(Properties properties) { } return false; } + + private boolean checkCqUnitEqual(CqUnit cqUnit1, CqUnit cqUnit2) { + if (cqUnit1.getQueueOffset() != cqUnit2.getQueueOffset()) { + return false; + } + if (cqUnit1.getSize() != cqUnit2.getSize()) { + return false; + } + if (cqUnit1.getPos() != cqUnit2.getPos()) { + return false; + } + if (cqUnit1.getBatchNum() != cqUnit2.getBatchNum()) { + return false; + } + if (cqUnit1.getTagsCode() != cqUnit2.getTagsCode()) { + return false; + } + return true; + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java index 7df72dbe686..be8f9a2e6b5 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java @@ -79,28 +79,30 @@ public boolean loadForbidden(BiConsumer biConsumer) { private boolean merge() { if (!brokerController.getMessageStoreConfig().isTransferMetadataJsonToRocksdb()) { - log.info("The switch is off, no merge operation is needed."); + log.info("the switch transferMetadataJsonToRocksdb is off, no merge subGroup operation is needed."); return true; } if (!UtilAll.isPathExists(this.configFilePath()) && !UtilAll.isPathExists(this.configFilePath() + ".bak")) { - log.info("json file and json back file not exist, so skip merge"); + log.info("subGroup json file is not exist, so skip merge"); return true; } - - if (!super.load()) { - log.error("load group and forbidden info from json file error, startup will exit"); + if (!super.loadDataVersion()) { + log.error("load json subGroup dataVersion error, startup will exit"); return false; } - - final ConcurrentMap groupTable = this.getSubscriptionGroupTable(); - final ConcurrentMap> forbiddenTable = this.getForbiddenTable(); final DataVersion dataVersion = super.getDataVersion(); final DataVersion kvDataVersion = this.getDataVersion(); if (dataVersion.getCounter().get() > kvDataVersion.getCounter().get()) { + if (!super.load()) { + log.error("load group and forbidden info from json file error, startup will exit"); + return false; + } + final ConcurrentMap groupTable = this.getSubscriptionGroupTable(); for (Map.Entry entry : groupTable.entrySet()) { putSubscriptionGroupConfig(entry.getValue()); log.info("import subscription config to rocksdb, group={}", entry.getValue()); } + final ConcurrentMap> forbiddenTable = this.getForbiddenTable(); for (Map.Entry> entry : forbiddenTable.entrySet()) { try { this.rocksDBConfigManager.updateForbidden(entry.getKey(), JSON.toJSONString(entry.getValue())); @@ -110,8 +112,10 @@ private boolean merge() { return false; } } - this.rocksDBConfigManager.getKvDataVersion().assignNewOne(dataVersion); + this.getDataVersion().assignNewOne(dataVersion); updateDataVersion(); + } else { + log.info("dataVersion is not greater than kvDataVersion, no need to merge group metaData, dataVersion={}, kvDataVersion={}", dataVersion, kvDataVersion); } log.info("finish marge subscription config from json file and merge to rocksdb"); this.persist(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java index f2a7e0482b1..2ab2a9c4e5d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java @@ -334,6 +334,26 @@ public DataVersion getDataVersion() { return dataVersion; } + public boolean loadDataVersion() { + String fileName = null; + try { + fileName = this.configFilePath(); + String jsonString = MixAll.file2String(fileName); + if (jsonString != null) { + SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class); + if (obj != null) { + this.dataVersion.assignNewOne(obj.dataVersion); + this.printLoadDataWhenFirstBoot(obj); + } + log.info("load subGroup dataVersion success " + fileName + " " + obj.dataVersion); + } + return true; + } catch (Exception e) { + log.error("load subGroup dataVersion failed" + fileName , e); + return false; + } + } + public void deleteSubscriptionGroupConfig(final String groupName) { SubscriptionGroupConfig old = removeSubscriptionGroupConfig(groupName); this.forbiddenTable.remove(groupName); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java index 2a89dd7e024..dfd5dc71e54 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java @@ -60,29 +60,35 @@ public boolean loadDataVersion() { private boolean merge() { if (!brokerController.getMessageStoreConfig().isTransferMetadataJsonToRocksdb()) { - log.info("The switch is off, no merge operation is needed."); + log.info("the switch transferMetadataJsonToRocksdb is off, no merge topic operation is needed."); return true; } if (!UtilAll.isPathExists(this.configFilePath()) && !UtilAll.isPathExists(this.configFilePath() + ".bak")) { - log.info("json file and json back file not exist, so skip merge"); + log.info("topic json file is not exist, so skip merge"); return true; } - if (!super.load()) { - log.error("load topic config from json file error, startup will exit"); + if (!super.loadDataVersion()) { + log.error("load json topic dataVersion error, startup will exit"); return false; } - final ConcurrentMap topicConfigTable = this.getTopicConfigTable(); final DataVersion dataVersion = super.getDataVersion(); final DataVersion kvDataVersion = this.getDataVersion(); if (dataVersion.getCounter().get() > kvDataVersion.getCounter().get()) { + if (!super.load()) { + log.error("load topic config from json file error, startup will exit"); + return false; + } + final ConcurrentMap topicConfigTable = this.getTopicConfigTable(); for (Map.Entry entry : topicConfigTable.entrySet()) { putTopicConfig(entry.getValue()); log.info("import topic config to rocksdb, topic={}", entry.getValue()); } - this.rocksDBConfigManager.getKvDataVersion().assignNewOne(dataVersion); + this.getDataVersion().assignNewOne(dataVersion); updateDataVersion(); + } else { + log.info("dataVersion is not greater than kvDataVersion, no need to merge topic metaData, dataVersion={}, kvDataVersion={}", dataVersion, kvDataVersion); } log.info("finish read topic config from json file and merge to rocksdb"); this.persist(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index eab2896b001..314569a4d6f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -637,6 +637,26 @@ public String encode() { return encode(false); } + public boolean loadDataVersion() { + String fileName = null; + try { + fileName = this.configFilePath(); + String jsonString = MixAll.file2String(fileName); + if (jsonString != null) { + TopicConfigSerializeWrapper topicConfigSerializeWrapper = + TopicConfigSerializeWrapper.fromJson(jsonString, TopicConfigSerializeWrapper.class); + if (topicConfigSerializeWrapper != null) { + this.dataVersion.assignNewOne(topicConfigSerializeWrapper.getDataVersion()); + } + log.info("load topic metadata dataVersion success" + fileName + " " + topicConfigSerializeWrapper.getDataVersion()); + } + return true; + } catch (Exception e) { + log.error("load topic metadata dataVersion failed" + fileName , e); + return false; + } + } + @Override public String configFilePath() { return BrokerPathConfigHelper.getTopicConfigPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java new file mode 100644 index 00000000000..c268dafc604 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.offset; + +import java.io.IOException; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.commons.collections.MapUtils; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.Pair; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.DispatchRequest; +import org.apache.rocketmq.store.RocksDBMessageStore; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.queue.ConsumeQueueInterface; +import org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface; +import org.apache.rocketmq.store.queue.CqUnit; +import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.rocksdb.RocksDBException; + +@RunWith(MockitoJUnitRunner.class) +public class RocksdbTransferOffsetAndCqTest { + + private final String basePath = Paths.get(System.getProperty("user.home"), + "unit-test-store", UUID.randomUUID().toString().substring(0, 16).toUpperCase()).toString(); + + private final String topic = "topic"; + private final String group = "group"; + private final String clientHost = "clientHost"; + private final int queueId = 1; + + private RocksDBConsumerOffsetManager rocksdbConsumerOffsetManager; + + private ConsumerOffsetManager consumerOffsetManager; + + private DefaultMessageStore defaultMessageStore; + + @Mock + private BrokerController brokerController; + + @Before + public void init() throws IOException { + if (notToBeExecuted()) { + return; + } + BrokerConfig brokerConfig = new BrokerConfig(); + brokerConfig.setConsumerOffsetUpdateVersionStep(10); + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setStorePathRootDir(basePath); + messageStoreConfig.setTransferOffsetJsonToRocksdb(true); + messageStoreConfig.setRocksdbCQWriteEnable(true); + Mockito.lenient().when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); + Mockito.lenient().when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig); + + defaultMessageStore = new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("aaa", true), null, + brokerConfig, new ConcurrentHashMap()); + defaultMessageStore.enableRocksdbCQWrite(); + defaultMessageStore.loadCheckPoint(); + + consumerOffsetManager = new ConsumerOffsetManager(brokerController); + consumerOffsetManager.load(); + + rocksdbConsumerOffsetManager = new RocksDBConsumerOffsetManager(brokerController); + } + + @Test + public void testTransferOffset() { + if (notToBeExecuted()) { + return; + } + + for (int i = 0; i < 200; i++) { + consumerOffsetManager.commitOffset(clientHost, group, topic, queueId, i); + } + + ConcurrentMap> offsetTable = consumerOffsetManager.getOffsetTable(); + ConcurrentMap map = offsetTable.get(topic + "@" + group); + Assert.assertTrue(MapUtils.isNotEmpty(map)); + + Long offset = map.get(queueId); + Assert.assertEquals(199L, (long) offset); + + long offsetDataVersion = consumerOffsetManager.getDataVersion().getCounter().get(); + Assert.assertEquals(20L, offsetDataVersion); + + consumerOffsetManager.persist(); + + boolean loadResult = rocksdbConsumerOffsetManager.load(); + Assert.assertTrue(loadResult); + + ConcurrentMap> rocksdbOffsetTable = rocksdbConsumerOffsetManager.getOffsetTable(); + + ConcurrentMap rocksdbMap = rocksdbOffsetTable.get(topic + "@" + group); + Assert.assertTrue(MapUtils.isNotEmpty(rocksdbMap)); + + Long aLong1 = rocksdbMap.get(queueId); + Assert.assertEquals(199L, (long) aLong1); + + long rocksdbOffset = rocksdbConsumerOffsetManager.getDataVersion().getCounter().get(); + Assert.assertEquals(21L, rocksdbOffset); + } + + @Test + public void testRocksdbCqWrite() throws RocksDBException { + RocksDBMessageStore kvStore = defaultMessageStore.getRocksDBMessageStore(); + ConsumeQueueStoreInterface store = kvStore.getConsumeQueueStore(); + ConsumeQueueInterface rocksdbCq = defaultMessageStore.getRocksDBMessageStore().findConsumeQueue(topic, queueId); + ConsumeQueueInterface fileCq = defaultMessageStore.findConsumeQueue(topic, queueId); + for (int i = 0; i < 200; i++) { + DispatchRequest request = new DispatchRequest(topic, queueId, i, 200, 0, System.currentTimeMillis(), i, "", "", 0, 0, new HashMap<>()); + fileCq.putMessagePositionInfoWrapper(request); + store.putMessagePositionInfoWrapper(request); + } + Pair unit = rocksdbCq.getCqUnitAndStoreTime(100); + Pair unit1 = fileCq.getCqUnitAndStoreTime(100); + Assert.assertTrue(unit.getObject1().getPos() == unit1.getObject1().getPos()); + } + + private boolean notToBeExecuted() { + return MixAll.isMac(); + } + +} diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 8a3d3dd0dcb..9b111c6fcb3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -119,6 +119,7 @@ import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.remoting.protocol.body.CreateTopicListRequestBody; +import org.apache.rocketmq.remoting.protocol.body.DiffConsumeQueueResponseBody; import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache; import org.apache.rocketmq.remoting.protocol.body.GetConsumerStatusBody; import org.apache.rocketmq.remoting.protocol.body.GroupList; @@ -3016,6 +3017,17 @@ public QueryConsumeQueueResponseBody queryConsumeQueue(final String brokerAddr, throw new MQClientException(response.getCode(), response.getRemark()); } + public DiffConsumeQueueResponseBody diffConsumeQueue(final String brokerAddr, final long timeoutMillis) throws InterruptedException, + RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DIFF_CONSUME_QUEUE, null); + RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, timeoutMillis); + assert response != null; + if (ResponseCode.SUCCESS == response.getCode()) { + return DiffConsumeQueueResponseBody.decode(response.getBody(), DiffConsumeQueueResponseBody.class); + } + throw new MQClientException(response.getCode(), response.getRemark()); + } + public void checkClientInBroker(final String brokerAddr, final String consumerGroup, final String clientId, final SubscriptionData subscriptionData, final long timeoutMillis) diff --git a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java index f88b8e198bf..7c09cd5bc9b 100644 --- a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java +++ b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java @@ -496,6 +496,7 @@ public void statRocksdb(Logger logger) { logger.info("MemUsage. blockCache: {}, indexesAndFilterBlock: {}, memtable: {}, blocksPinnedByIterator: {}", blockCacheMemUsage, indexesAndFilterBlockMemUsage, memTableMemUsage, blocksPinnedByIteratorMemUsage); } catch (Exception ignored) { + throw new RuntimeException(ignored); } } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java index f45ff6fa484..ff44fd5b602 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java @@ -217,6 +217,7 @@ public class RequestCode { public static final int GET_SUBSCRIPTIONGROUP_CONFIG = 352; public static final int UPDATE_AND_GET_GROUP_FORBIDDEN = 353; + public static final int DIFF_CONSUME_QUEUE = 354; public static final int LITE_PULL_MESSAGE = 361; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/DiffConsumeQueueResponseBody.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/DiffConsumeQueueResponseBody.java new file mode 100644 index 00000000000..a555716a1cd --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/DiffConsumeQueueResponseBody.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.protocol.body; + +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +public class DiffConsumeQueueResponseBody extends RemotingSerializable { + + String diffResult; + + public String getDiffResult() { + return diffResult; + } + + public void setDiffResult(String diffResult) { + this.diffResult = diffResult; + } + + +} diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index f159c31a7be..4fe69da5e53 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -163,11 +163,13 @@ public class DefaultMessageStore implements MessageStore { private volatile boolean shutdown = true; protected boolean notifyMessageArriveInBatch = false; - private StoreCheckpoint storeCheckpoint; + protected StoreCheckpoint storeCheckpoint; private TimerMessageStore timerMessageStore; private final LinkedList dispatcherList; + private RocksDBMessageStore rocksDBMessageStore; + private RandomAccessFile lockFile; private FileLock lock; @@ -354,12 +356,7 @@ public boolean load() { } if (result) { - this.storeCheckpoint = - new StoreCheckpoint( - StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); - this.masterFlushedOffset = this.storeCheckpoint.getMasterFlushedOffset(); - setConfirmOffset(this.storeCheckpoint.getConfirmPhyOffset()); - + loadCheckPoint(); result = this.indexService.load(lastExitOK); this.recover(lastExitOK); LOGGER.info("message store recover end, and the max phy offset = {}", this.getMaxPhyOffset()); @@ -381,6 +378,14 @@ public boolean load() { return result; } + public void loadCheckPoint() throws IOException { + this.storeCheckpoint = + new StoreCheckpoint( + StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); + this.masterFlushedOffset = this.storeCheckpoint.getMasterFlushedOffset(); + setConfirmOffset(this.storeCheckpoint.getConfirmPhyOffset()); + } + /** * @throws Exception */ @@ -511,6 +516,10 @@ public void shutdown() { this.compactionService.shutdown(); } + if (messageStoreConfig.isRocksdbCQWriteEnable()) { + this.rocksDBMessageStore.consumeQueueStore.shutdown(); + } + this.flushConsumeQueueService.shutdown(); this.allocateMappedFileService.shutdown(); this.storeCheckpoint.flush(); @@ -3251,6 +3260,17 @@ public HARuntimeInfo getHARuntimeInfo() { } } + public void enableRocksdbCQWrite() { + try { + RocksDBMessageStore store = new RocksDBMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, this.topicConfigTable); + this.rocksDBMessageStore = store; + store.loadAndStartConsumerServiceOnly(); + addDispatcher(store.getDispatcherBuildRocksdbConsumeQueue()); + } catch (Exception e) { + LOGGER.error("enableRocksdbCqWrite error", e); + } + } + public int getMaxDelayLevel() { return maxDelayLevel; } @@ -3338,4 +3358,12 @@ public boolean isTransientStorePoolEnable() { public long getReputFromOffset() { return this.reputMessageService.getReputFromOffset(); } + + public RocksDBMessageStore getRocksDBMessageStore() { + return this.rocksDBMessageStore; + } + + public ConsumeQueueStoreInterface getConsumeQueueStore() { + return consumeQueueStore; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java index 6141b778bf7..8d4574b8a09 100644 --- a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java @@ -39,6 +39,8 @@ public class RocksDBMessageStore extends DefaultMessageStore { + private CommitLogDispatcherBuildRocksdbConsumeQueue dispatcherBuildRocksdbConsumeQueue; + public RocksDBMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager, final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig, final ConcurrentMap topicConfigTable) throws IOException { @@ -178,4 +180,30 @@ public void initMetrics(Meter meter, Supplier attributesBuild // Also add some metrics for rocksdb's monitoring. RocksDBStoreMetricsManager.init(meter, attributesBuilderSupplier, this); } + + public CommitLogDispatcherBuildRocksdbConsumeQueue getDispatcherBuildRocksdbConsumeQueue() { + return dispatcherBuildRocksdbConsumeQueue; + } + + class CommitLogDispatcherBuildRocksdbConsumeQueue implements CommitLogDispatcher { + @Override + public void dispatch(DispatchRequest request) throws RocksDBException { + putMessagePositionInfo(request); + } + } + + public void loadAndStartConsumerServiceOnly() { + try { + this.dispatcherBuildRocksdbConsumeQueue = new CommitLogDispatcherBuildRocksdbConsumeQueue(); + boolean loadResult = this.consumeQueueStore.load(); + if (!loadResult) { + throw new RuntimeException("load consume queue failed"); + } + super.loadCheckPoint(); + this.consumeQueueStore.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 0b45d92418e..b79d8122cb5 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -424,6 +424,26 @@ public class MessageStoreConfig { private boolean putConsumeQueueDataByFileChannel = true; + private boolean transferOffsetJsonToRocksdb = false; + + private boolean rocksdbCQWriteEnable = false; + + public boolean isRocksdbCQWriteEnable() { + return rocksdbCQWriteEnable; + } + + public void setRocksdbCQWriteEnable(boolean rocksdbWriteEnable) { + this.rocksdbCQWriteEnable = rocksdbWriteEnable; + } + + public boolean isTransferOffsetJsonToRocksdb() { + return transferOffsetJsonToRocksdb; + } + + public void setTransferOffsetJsonToRocksdb(boolean transferOffsetJsonToRocksdb) { + this.transferOffsetJsonToRocksdb = transferOffsetJsonToRocksdb; + } + public boolean isEnabledAppendPropCRC() { return enabledAppendPropCRC; } diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/CqUnit.java b/store/src/main/java/org/apache/rocketmq/store/queue/CqUnit.java index b8865fd9195..34f5cb142b6 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/CqUnit.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/CqUnit.java @@ -109,6 +109,7 @@ public String toString() { ", size=" + size + ", pos=" + pos + ", batchNum=" + batchNum + + ", tagsCode=" + tagsCode + ", compactedOffset=" + compactedOffset + '}'; } diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java index 5a981bb4df1..3d195513c8c 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java @@ -311,7 +311,7 @@ public CqUnit getEarliestUnit() { public CqUnit getLatestUnit() { try { long maxOffset = this.messageStore.getQueueStore().getMaxOffsetInQueue(topic, queueId); - return get(maxOffset); + return get(maxOffset > 0 ? maxOffset - 1 : maxOffset); } catch (RocksDBException e) { ERROR_LOG.error("getLatestUnit Failed. topic: {}, queueId: {}, {}", topic, queueId, e.getMessage()); } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index 5be6d24ff76..0c3003d5d0c 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -52,6 +52,7 @@ import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.body.DiffConsumeQueueResponseBody; import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache; import org.apache.rocketmq.remoting.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; @@ -730,6 +731,12 @@ public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String ); } + @Override + public DiffConsumeQueueResponseBody diffConsumeQueue(String brokerAddr) + throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { + return this.defaultMQAdminExtImpl.diffConsumeQueue(brokerAddr); + } + @Override public boolean resumeCheckHalfMessage(String topic, String msgId) diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 9546235d3e8..c6e9f889ed6 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -90,6 +90,7 @@ import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.body.DiffConsumeQueueResponseBody; import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache; import org.apache.rocketmq.remoting.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; @@ -1757,6 +1758,11 @@ public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String return this.mqClientInstance.getMQClientAPIImpl().queryConsumeQueue(brokerAddr, topic, queueId, index, count, consumerGroup, timeoutMillis); } + @Override + public DiffConsumeQueueResponseBody diffConsumeQueue(String brokerAddr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { + return this.mqClientInstance.getMQClientAPIImpl().diffConsumeQueue(brokerAddr, timeoutMillis); + } + @Override public boolean resumeCheckHalfMessage(final String topic, final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 9dff3cbab95..36bb8b663d4 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -48,6 +48,7 @@ import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.body.DiffConsumeQueueResponseBody; import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache; import org.apache.rocketmq.remoting.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; @@ -148,6 +149,8 @@ ConsumeStats examineConsumeStats( final String consumerGroup) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; + DiffConsumeQueueResponseBody diffConsumeQueue(String brokerAddr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException; + ConsumeStats examineConsumeStats(final String consumerGroup, final String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java index 1ecb1fa2cd9..e02e78b534e 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java @@ -14,10 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.rocketmq.tools.command.export; import com.alibaba.fastjson.JSONObject; import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.lang3.StringUtils; @@ -25,6 +27,7 @@ import org.apache.rocketmq.common.config.ConfigRocksDBStorage; import org.apache.rocketmq.common.utils.DataConverter; import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.tools.command.SubCommand; import org.apache.rocketmq.tools.command.SubCommandException; import org.rocksdb.RocksIterator; @@ -38,6 +41,17 @@ public class ExportMetadataInRocksDBCommand implements SubCommand { private static final String TOPICS_JSON_CONFIG = "topics"; private static final String SUBSCRIPTION_GROUP_JSON_CONFIG = "subscriptionGroups"; + public static void main(String[] args) throws SubCommandException { + ExportMetadataInRocksDBCommand cmd = new ExportMetadataInRocksDBCommand(); + + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-p /Users/huyitao/store/config", "-t subscriptionGroups", "-j true"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), + new DefaultParser()); + cmd.execute(commandLine, options, null); + } + @Override public String commandName() { return "exportMetadataInRocksDB"; @@ -77,6 +91,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t } String configType = commandLine.getOptionValue("configType").trim().toLowerCase(); + path += "/" + configType; boolean jsonEnable = false; if (commandLine.hasOption("jsonEnable")) { @@ -86,7 +101,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t ConfigRocksDBStorage kvStore = new ConfigRocksDBStorage(path, true /* readOnly */); if (!kvStore.start()) { - System.out.print("RocksDB load error, path=" + path + "\n"); + System.out.printf("RocksDB load error, path=" + path + "\n"); return; } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/DiffConsumeQueueCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/DiffConsumeQueueCommand.java new file mode 100644 index 00000000000..828f159a74c --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/DiffConsumeQueueCommand.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.tools.command.queue; + +import java.util.Map; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.DiffConsumeQueueResponseBody; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.SubCommand; + +public class DiffConsumeQueueCommand implements SubCommand { + + public static void main(String[] args) { + DiffConsumeQueueCommand cmd = new DiffConsumeQueueCommand(); + + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-c LetLetMe", "-n xxxx:9876"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), + new DefaultParser()); + cmd.execute(commandLine, options, null); + } + + @Override + public String commandName() { + return "diffConsumeQueueCommand"; + } + + @Override + public String commandDesc() { + return "check if rocksdb cq is same as file cq"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("c", "cluster", true, "cluster name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("n", "nameserverAddr", true, "nameserverAddr"); + opt.setRequired(true); + options.addOption(opt); + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + defaultMQAdminExt.setNamesrvAddr(StringUtils.trim(commandLine.getOptionValue('n'))); + String clusterName = commandLine.hasOption('c') ? commandLine.getOptionValue('c').trim() : ""; + + try { + defaultMQAdminExt.start(); + ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); + Map> clusterAddrTable = clusterInfo.getClusterAddrTable(); + Map brokerAddrTable = clusterInfo.getBrokerAddrTable(); + if (clusterAddrTable.get(clusterName) == null) { + System.out.printf("clusterAddrTable is empty"); + return; + } + for (Map.Entry entry : brokerAddrTable.entrySet()) { + String brokerName = entry.getKey(); + BrokerData brokerData = entry.getValue(); + String brokerAddr = brokerData.getBrokerAddrs().get(0L); + DiffConsumeQueueResponseBody body = defaultMQAdminExt.diffConsumeQueue(brokerAddr); + System.out.printf(brokerName + " | " + brokerAddr + " | " + body.getDiffResult()); + } + + } catch (Exception e) { + throw new RuntimeException(this.getClass().getSimpleName() + " command failed", e); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} From accf8048d4d8631ad9985be58f7bbbe5320297f1 Mon Sep 17 00:00:00 2001 From: LetLetMe Date: Thu, 29 Aug 2024 15:58:02 +0800 Subject: [PATCH 2/6] =?UTF-8?q?=E4=BF=AE=E6=94=B9brokerController=E5=90=AF?= =?UTF-8?q?=E5=8A=A8=E8=BF=87=E7=A8=8B=E4=B8=AD=E7=9A=84=E5=BC=82=E5=B8=B8?= =?UTF-8?q?=E6=8D=95=E8=8E=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/apache/rocketmq/broker/BrokerController.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index e9276e15778..6dc73286a41 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -815,11 +815,9 @@ public boolean initializeMessageStore() { this.timerMessageStore.registerEscapeBridgeHook(msg -> escapeBridge.putMessage(msg)); this.messageStore.setTimerMessageStore(this.timerMessageStore); } - } catch (IOException e) { + } catch (Exception e) { result = false; LOG.error("BrokerController#initialize: unexpected error occurs", e); - } catch (Exception e) { - throw new RuntimeException(e); } return result; } From 870241867ec6468f4355d2e8247d9c569a40e3ce Mon Sep 17 00:00:00 2001 From: LetLetMe Date: Thu, 29 Aug 2024 16:00:40 +0800 Subject: [PATCH 3/6] =?UTF-8?q?github=E6=8F=90=E4=BE=9B=E7=9A=84mac?= =?UTF-8?q?=E6=B5=8B=E8=AF=95=E7=8E=AF=E5=A2=83=E6=9C=89=E9=97=AE=E9=A2=98?= =?UTF-8?q?=EF=BC=8Crocksdb=E7=9B=B8=E5=85=B3=E6=B5=8B=E8=AF=95=E8=B7=B3?= =?UTF-8?q?=E8=BF=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java index c268dafc604..36b0a08b063 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java @@ -130,6 +130,9 @@ public void testTransferOffset() { @Test public void testRocksdbCqWrite() throws RocksDBException { + if (notToBeExecuted()) { + return; + } RocksDBMessageStore kvStore = defaultMessageStore.getRocksDBMessageStore(); ConsumeQueueStoreInterface store = kvStore.getConsumeQueueStore(); ConsumeQueueInterface rocksdbCq = defaultMessageStore.getRocksDBMessageStore().findConsumeQueue(topic, queueId); From 8b2fa04c36d680774271525893c2d035d8dba238 Mon Sep 17 00:00:00 2001 From: LetLetMe Date: Thu, 29 Aug 2024 16:43:30 +0800 Subject: [PATCH 4/6] =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=97=A0=E7=94=A8import?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/org/apache/rocketmq/broker/BrokerController.java | 1 - 1 file changed, 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 6dc73286a41..9142e850e9f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -18,7 +18,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; -import java.io.IOException; import java.net.InetSocketAddress; import java.util.AbstractMap; import java.util.ArrayList; From 5fb9bec28549913f42269d4b093494214d16c69b Mon Sep 17 00:00:00 2001 From: LetLetMe Date: Tue, 3 Sep 2024 16:54:31 +0800 Subject: [PATCH 5/6] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dpr=E4=B8=AD=E6=8F=90?= =?UTF-8?q?=E5=87=BA=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rocketmq/broker/BrokerController.java | 2 +- .../offset/RocksDBConsumerOffsetManager.java | 11 ++++----- .../processor/AdminBrokerProcessor.java | 12 +++++----- .../RocksDBSubscriptionGroupManager.java | 16 ++++++------- .../SubscriptionGroupManager.java | 4 ++-- .../topic/RocksDBTopicConfigManager.java | 10 ++++---- .../broker/topic/TopicConfigManager.java | 4 ++-- .../RocksdbTransferOffsetAndCqTest.java | 2 +- .../rocketmq/client/impl/MQClientAPIImpl.java | 8 +++---- .../common/config/AbstractRocksDBStorage.java | 24 +++++++++---------- .../remoting/protocol/RequestCode.java | 2 +- ...ckRocksdbCqWriteProgressResponseBody.java} | 2 +- .../rocketmq/store/DefaultMessageStore.java | 2 +- .../rocketmq/store/RocksDBMessageStore.java | 6 ++--- .../store/config/MessageStoreConfig.java | 10 ++++---- .../tools/admin/DefaultMQAdminExt.java | 6 ++--- .../tools/admin/DefaultMQAdminExtImpl.java | 6 ++--- .../rocketmq/tools/admin/MQAdminExt.java | 4 ++-- .../ExportMetadataInRocksDBCommand.java | 15 +----------- ...> CheckRocksdbCqWriteProgressCommand.java} | 21 ++++------------ 20 files changed, 70 insertions(+), 97 deletions(-) rename remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/{DiffConsumeQueueResponseBody.java => CheckRocksdbCqWriteProgressResponseBody.java} (92%) rename tools/src/main/java/org/apache/rocketmq/tools/command/queue/{DiffConsumeQueueCommand.java => CheckRocksdbCqWriteProgressCommand.java} (79%) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 9142e850e9f..7fb876ea378 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -788,7 +788,7 @@ public boolean initializeMessageStore() { defaultMessageStore = new RocksDBMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable()); } else { defaultMessageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable()); - if (messageStoreConfig.isRocksdbCQWriteEnable()) { + if (messageStoreConfig.isRocksdbCQDoubleWriteEnable()) { defaultMessageStore.enableRocksdbCQWrite(); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java index 6cadf86417a..1e7cda71eed 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java @@ -16,11 +16,12 @@ */ package org.apache.rocketmq.broker.offset; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; import java.io.File; import java.util.Iterator; import java.util.Map.Entry; import java.util.concurrent.ConcurrentMap; - import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.RocksDBConfigManager; import org.apache.rocketmq.common.UtilAll; @@ -31,9 +32,6 @@ import org.apache.rocketmq.remoting.protocol.DataVersion; import org.rocksdb.WriteBatch; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.serializer.SerializerFeature; - public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager { protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); @@ -67,7 +65,7 @@ private boolean merge() { return true; } if (!UtilAll.isPathExists(this.configFilePath()) && !UtilAll.isPathExists(this.configFilePath() + ".bak")) { - log.info("consumerOffset json file is not exist, so skip merge"); + log.info("consumerOffset json file does not exist, so skip merge"); return true; } if (!super.loadDataVersion()) { @@ -85,8 +83,8 @@ private boolean merge() { this.persist(); this.getDataVersion().assignNewOne(dataVersion); updateDataVersion(); + log.info("update offset from json, dataVersion:{}, offsetTable: {} ", this.getDataVersion(), JSON.toJSONString(this.getOffsetTable())); } - log.info("update offset from json, dataVersion:{}, offsetTable: {} ", this.getDataVersion(), JSON.toJSONString(this.getOffsetTable())); return true; } @@ -162,6 +160,7 @@ public void updateDataVersion() { try { rocksDBConfigManager.updateKvDataVersion(); } catch (Exception e) { + log.error("update consumer offset dataVersion error", e); throw new RuntimeException(e); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index d0e31a5cecb..64e7a9bb8c0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -343,8 +343,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, return fetchAllConsumeStatsInBroker(ctx, request); case RequestCode.QUERY_CONSUME_QUEUE: return queryConsumeQueue(ctx, request); - case RequestCode.DIFF_CONSUME_QUEUE: - return this.diffConsumeQueue(ctx, request); + case RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS: + return this.checkRocksdbCqWriteProgress(ctx, request); case RequestCode.UPDATE_AND_GET_GROUP_FORBIDDEN: return this.updateAndGetGroupForbidden(ctx, request); case RequestCode.GET_SUBSCRIPTIONGROUP_CONFIG: @@ -464,7 +464,7 @@ private RemotingCommand updateAndGetGroupForbidden(ChannelHandlerContext ctx, Re return response; } - private RemotingCommand diffConsumeQueue(ChannelHandlerContext ctx, RemotingCommand request) { + private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); response.setCode(ResponseCode.SUCCESS); response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", "diff success, very good!"))); @@ -472,8 +472,8 @@ private RemotingCommand diffConsumeQueue(ChannelHandlerContext ctx, RemotingComm DefaultMessageStore messageStore = (DefaultMessageStore) brokerController.getMessageStore(); RocksDBMessageStore rocksDBMessageStore = messageStore.getRocksDBMessageStore(); - if (!messageStore.getMessageStoreConfig().isRocksdbCQWriteEnable()) { - response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", "RocksdbCQWriteEnable is false, diffConsumeQueue is invalid"))); + if (!messageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) { + response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", "RocksdbCQWriteEnable is false, CheckRocksdbCqWriteProgressCommand is invalid"))); return response; } @@ -492,7 +492,7 @@ private RemotingCommand diffConsumeQueue(ChannelHandlerContext ctx, RemotingComm CqUnit kvCqLatestUnit = kvCq.getLatestUnit(); CqUnit jsonCqEarliestUnit = jsonCq.getEarliestUnit(); CqUnit jsonCqLatestUnit = jsonCq.getLatestUnit(); - LOGGER.info("diffConsumeQueue topic:{}, queue:{}, kvCqEarliestUnit:{}, jsonCqEarliestUnit:{}, kvCqLatestUnit:{}, jsonCqLatestUnit:{}", + LOGGER.info("CheckRocksdbCqWriteProgressCommand topic:{}, queue:{}, kvCqEarliestUnit:{}, jsonCqEarliestUnit:{}, kvCqLatestUnit:{}, jsonCqLatestUnit:{}", topic, queueId, kvCqEarliestUnit, jsonCqEarliestUnit, kvCqLatestUnit, jsonCqLatestUnit); long jsonOffset = jsonCq.getMaxOffsetInQueue() - 1; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java index be8f9a2e6b5..5119f78672c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java @@ -19,6 +19,12 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.serializer.SerializerFeature; +import java.io.File; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.BiConsumer; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.RocksDBConfigManager; import org.apache.rocketmq.common.UtilAll; @@ -27,13 +33,6 @@ import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.rocksdb.RocksIterator; -import java.io.File; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.function.BiConsumer; - public class RocksDBSubscriptionGroupManager extends SubscriptionGroupManager { protected RocksDBConfigManager rocksDBConfigManager; @@ -83,7 +82,7 @@ private boolean merge() { return true; } if (!UtilAll.isPathExists(this.configFilePath()) && !UtilAll.isPathExists(this.configFilePath() + ".bak")) { - log.info("subGroup json file is not exist, so skip merge"); + log.info("subGroup json file does not exist, so skip merge"); return true; } if (!super.loadDataVersion()) { @@ -200,6 +199,7 @@ public void updateDataVersion() { try { rocksDBConfigManager.updateKvDataVersion(); } catch (Exception e) { + log.error("update group config dataVersion error", e); throw new RuntimeException(e); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java index 2ab2a9c4e5d..e6855ef9a2a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java @@ -344,12 +344,12 @@ public boolean loadDataVersion() { if (obj != null) { this.dataVersion.assignNewOne(obj.dataVersion); this.printLoadDataWhenFirstBoot(obj); + log.info("load subGroup dataVersion success,{},{}", fileName, obj.dataVersion); } - log.info("load subGroup dataVersion success " + fileName + " " + obj.dataVersion); } return true; } catch (Exception e) { - log.error("load subGroup dataVersion failed" + fileName , e); + log.error("load subGroup dataVersion failed" + fileName, e); return false; } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java index dfd5dc71e54..466e6416f98 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java @@ -18,6 +18,9 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; +import java.io.File; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.RocksDBConfigManager; import org.apache.rocketmq.common.TopicConfig; @@ -25,10 +28,6 @@ import org.apache.rocketmq.common.utils.DataConverter; import org.apache.rocketmq.remoting.protocol.DataVersion; -import java.io.File; -import java.util.Map; -import java.util.concurrent.ConcurrentMap; - public class RocksDBTopicConfigManager extends TopicConfigManager { protected RocksDBConfigManager rocksDBConfigManager; @@ -64,7 +63,7 @@ private boolean merge() { return true; } if (!UtilAll.isPathExists(this.configFilePath()) && !UtilAll.isPathExists(this.configFilePath() + ".bak")) { - log.info("topic json file is not exist, so skip merge"); + log.info("topic json file does not exist, so skip merge"); return true; } @@ -156,6 +155,7 @@ public void updateDataVersion() { try { rocksDBConfigManager.updateKvDataVersion(); } catch (Exception e) { + log.error("update topic config dataVersion error", e); throw new RuntimeException(e); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 314569a4d6f..25d3218f2ab 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -647,12 +647,12 @@ public boolean loadDataVersion() { TopicConfigSerializeWrapper.fromJson(jsonString, TopicConfigSerializeWrapper.class); if (topicConfigSerializeWrapper != null) { this.dataVersion.assignNewOne(topicConfigSerializeWrapper.getDataVersion()); + log.info("load topic metadata dataVersion success {}, {}", fileName, topicConfigSerializeWrapper.getDataVersion()); } - log.info("load topic metadata dataVersion success" + fileName + " " + topicConfigSerializeWrapper.getDataVersion()); } return true; } catch (Exception e) { - log.error("load topic metadata dataVersion failed" + fileName , e); + log.error("load topic metadata dataVersion failed" + fileName, e); return false; } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java index 36b0a08b063..b4800aec24e 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java @@ -76,7 +76,7 @@ public void init() throws IOException { MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); messageStoreConfig.setStorePathRootDir(basePath); messageStoreConfig.setTransferOffsetJsonToRocksdb(true); - messageStoreConfig.setRocksdbCQWriteEnable(true); + messageStoreConfig.setRocksdbCQDoubleWriteEnable(true); Mockito.lenient().when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); Mockito.lenient().when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 9b111c6fcb3..bfc7da50fed 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -119,7 +119,7 @@ import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.remoting.protocol.body.CreateTopicListRequestBody; -import org.apache.rocketmq.remoting.protocol.body.DiffConsumeQueueResponseBody; +import org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody; import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache; import org.apache.rocketmq.remoting.protocol.body.GetConsumerStatusBody; import org.apache.rocketmq.remoting.protocol.body.GroupList; @@ -3017,13 +3017,13 @@ public QueryConsumeQueueResponseBody queryConsumeQueue(final String brokerAddr, throw new MQClientException(response.getCode(), response.getRemark()); } - public DiffConsumeQueueResponseBody diffConsumeQueue(final String brokerAddr, final long timeoutMillis) throws InterruptedException, + public CheckRocksdbCqWriteProgressResponseBody checkRocksdbCqWriteProgress(final String brokerAddr, final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DIFF_CONSUME_QUEUE, null); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS, null); RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, timeoutMillis); assert response != null; if (ResponseCode.SUCCESS == response.getCode()) { - return DiffConsumeQueueResponseBody.decode(response.getBody(), DiffConsumeQueueResponseBody.class); + return CheckRocksdbCqWriteProgressResponseBody.decode(response.getBody(), CheckRocksdbCqWriteProgressResponseBody.class); } throw new MQClientException(response.getCode(), response.getRemark()); } diff --git a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java index 7c09cd5bc9b..13522889bb3 100644 --- a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java +++ b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java @@ -17,6 +17,15 @@ package org.apache.rocketmq.common.config; import com.google.common.collect.Maps; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.utils.DataConverter; @@ -40,16 +49,6 @@ import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.Semaphore; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - import static org.rocksdb.RocksDB.NOT_FOUND; public abstract class AbstractRocksDBStorage { @@ -495,8 +494,9 @@ public void statRocksdb(Logger logger) { String blocksPinnedByIteratorMemUsage = this.db.getProperty("rocksdb.block-cache-pinned-usage"); logger.info("MemUsage. blockCache: {}, indexesAndFilterBlock: {}, memtable: {}, blocksPinnedByIterator: {}", blockCacheMemUsage, indexesAndFilterBlockMemUsage, memTableMemUsage, blocksPinnedByIteratorMemUsage); - } catch (Exception ignored) { - throw new RuntimeException(ignored); + } catch (Exception e) { + logger.error("statRocksdb Failed. {}", this.dbPath, e); + throw new RuntimeException(e); } } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java index ff44fd5b602..cfc5cc22785 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java @@ -217,7 +217,7 @@ public class RequestCode { public static final int GET_SUBSCRIPTIONGROUP_CONFIG = 352; public static final int UPDATE_AND_GET_GROUP_FORBIDDEN = 353; - public static final int DIFF_CONSUME_QUEUE = 354; + public static final int CHECK_ROCKSDB_CQ_WRITE_PROGRESS = 354; public static final int LITE_PULL_MESSAGE = 361; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/DiffConsumeQueueResponseBody.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CheckRocksdbCqWriteProgressResponseBody.java similarity index 92% rename from remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/DiffConsumeQueueResponseBody.java rename to remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CheckRocksdbCqWriteProgressResponseBody.java index a555716a1cd..76719ac1a24 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/DiffConsumeQueueResponseBody.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CheckRocksdbCqWriteProgressResponseBody.java @@ -19,7 +19,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable; -public class DiffConsumeQueueResponseBody extends RemotingSerializable { +public class CheckRocksdbCqWriteProgressResponseBody extends RemotingSerializable { String diffResult; diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 4fe69da5e53..76477afe746 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -516,7 +516,7 @@ public void shutdown() { this.compactionService.shutdown(); } - if (messageStoreConfig.isRocksdbCQWriteEnable()) { + if (messageStoreConfig.isRocksdbCQDoubleWriteEnable()) { this.rocksDBMessageStore.consumeQueueStore.shutdown(); } diff --git a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java index 8d4574b8a09..edb68894853 100644 --- a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java @@ -16,13 +16,12 @@ */ package org.apache.rocketmq.store; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.metrics.Meter; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.function.Supplier; - -import io.opentelemetry.api.common.AttributesBuilder; -import io.opentelemetry.api.metrics.Meter; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; @@ -202,6 +201,7 @@ public void loadAndStartConsumerServiceOnly() { super.loadCheckPoint(); this.consumeQueueStore.start(); } catch (Exception e) { + ERROR_LOG.error("loadAndStartConsumerServiceOnly error", e); throw new RuntimeException(e); } } diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index b79d8122cb5..454dce6d9ce 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -426,14 +426,14 @@ public class MessageStoreConfig { private boolean transferOffsetJsonToRocksdb = false; - private boolean rocksdbCQWriteEnable = false; + private boolean rocksdbCQDoubleWriteEnable = false; - public boolean isRocksdbCQWriteEnable() { - return rocksdbCQWriteEnable; + public boolean isRocksdbCQDoubleWriteEnable() { + return rocksdbCQDoubleWriteEnable; } - public void setRocksdbCQWriteEnable(boolean rocksdbWriteEnable) { - this.rocksdbCQWriteEnable = rocksdbWriteEnable; + public void setRocksdbCQDoubleWriteEnable(boolean rocksdbWriteEnable) { + this.rocksdbCQDoubleWriteEnable = rocksdbWriteEnable; } public boolean isTransferOffsetJsonToRocksdb() { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index 0c3003d5d0c..17a96b1dd91 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -52,7 +52,7 @@ import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; -import org.apache.rocketmq.remoting.protocol.body.DiffConsumeQueueResponseBody; +import org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody; import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache; import org.apache.rocketmq.remoting.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; @@ -732,9 +732,9 @@ public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String } @Override - public DiffConsumeQueueResponseBody diffConsumeQueue(String brokerAddr) + public CheckRocksdbCqWriteProgressResponseBody checkRocksdbCqWriteProgress(String brokerAddr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { - return this.defaultMQAdminExtImpl.diffConsumeQueue(brokerAddr); + return this.defaultMQAdminExtImpl.checkRocksdbCqWriteProgress(brokerAddr); } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index c6e9f889ed6..52172cf1c0c 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -90,7 +90,7 @@ import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; -import org.apache.rocketmq.remoting.protocol.body.DiffConsumeQueueResponseBody; +import org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody; import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache; import org.apache.rocketmq.remoting.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; @@ -1759,8 +1759,8 @@ public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String } @Override - public DiffConsumeQueueResponseBody diffConsumeQueue(String brokerAddr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { - return this.mqClientInstance.getMQClientAPIImpl().diffConsumeQueue(brokerAddr, timeoutMillis); + public CheckRocksdbCqWriteProgressResponseBody checkRocksdbCqWriteProgress(String brokerAddr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { + return this.mqClientInstance.getMQClientAPIImpl().checkRocksdbCqWriteProgress(brokerAddr, timeoutMillis); } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 36bb8b663d4..b48b1cc6afd 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -48,7 +48,7 @@ import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; -import org.apache.rocketmq.remoting.protocol.body.DiffConsumeQueueResponseBody; +import org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody; import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache; import org.apache.rocketmq.remoting.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; @@ -149,7 +149,7 @@ ConsumeStats examineConsumeStats( final String consumerGroup) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; - DiffConsumeQueueResponseBody diffConsumeQueue(String brokerAddr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException; + CheckRocksdbCqWriteProgressResponseBody checkRocksdbCqWriteProgress(String brokerAddr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException; ConsumeStats examineConsumeStats(final String consumerGroup, final String topic) throws RemotingException, MQClientException, diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java index e02e78b534e..c466490b8a8 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java @@ -19,7 +19,6 @@ import com.alibaba.fastjson.JSONObject; import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.lang3.StringUtils; @@ -27,7 +26,6 @@ import org.apache.rocketmq.common.config.ConfigRocksDBStorage; import org.apache.rocketmq.common.utils.DataConverter; import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.tools.command.SubCommand; import org.apache.rocketmq.tools.command.SubCommandException; import org.rocksdb.RocksIterator; @@ -41,17 +39,6 @@ public class ExportMetadataInRocksDBCommand implements SubCommand { private static final String TOPICS_JSON_CONFIG = "topics"; private static final String SUBSCRIPTION_GROUP_JSON_CONFIG = "subscriptionGroups"; - public static void main(String[] args) throws SubCommandException { - ExportMetadataInRocksDBCommand cmd = new ExportMetadataInRocksDBCommand(); - - Options options = ServerUtil.buildCommandlineOptions(new Options()); - String[] subargs = new String[] {"-p /Users/huyitao/store/config", "-t subscriptionGroups", "-j true"}; - final CommandLine commandLine = - ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), - new DefaultParser()); - cmd.execute(commandLine, options, null); - } - @Override public String commandName() { return "exportMetadataInRocksDB"; @@ -101,7 +88,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t ConfigRocksDBStorage kvStore = new ConfigRocksDBStorage(path, true /* readOnly */); if (!kvStore.start()) { - System.out.printf("RocksDB load error, path=" + path + "\n"); + System.out.printf("RocksDB load error, path=%s\n" , path); return; } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/DiffConsumeQueueCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java similarity index 79% rename from tools/src/main/java/org/apache/rocketmq/tools/command/queue/DiffConsumeQueueCommand.java rename to tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java index 828f159a74c..31584186c52 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/DiffConsumeQueueCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java @@ -20,34 +20,21 @@ import java.util.Map; import java.util.Set; import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; -import org.apache.rocketmq.remoting.protocol.body.DiffConsumeQueueResponseBody; +import org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody; import org.apache.rocketmq.remoting.protocol.route.BrokerData; -import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; -public class DiffConsumeQueueCommand implements SubCommand { - - public static void main(String[] args) { - DiffConsumeQueueCommand cmd = new DiffConsumeQueueCommand(); - - Options options = ServerUtil.buildCommandlineOptions(new Options()); - String[] subargs = new String[] {"-c LetLetMe", "-n xxxx:9876"}; - final CommandLine commandLine = - ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), - new DefaultParser()); - cmd.execute(commandLine, options, null); - } +public class CheckRocksdbCqWriteProgressCommand implements SubCommand { @Override public String commandName() { - return "diffConsumeQueueCommand"; + return "checkRocksdbCqWriteProgressCommandCommand"; } @Override @@ -88,7 +75,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { String brokerName = entry.getKey(); BrokerData brokerData = entry.getValue(); String brokerAddr = brokerData.getBrokerAddrs().get(0L); - DiffConsumeQueueResponseBody body = defaultMQAdminExt.diffConsumeQueue(brokerAddr); + CheckRocksdbCqWriteProgressResponseBody body = defaultMQAdminExt.checkRocksdbCqWriteProgress(brokerAddr); System.out.printf(brokerName + " | " + brokerAddr + " | " + body.getDiffResult()); } From c1e798b6d166ac536a22b6f4114a9ff72e6ef439 Mon Sep 17 00:00:00 2001 From: LetLetMe Date: Mon, 9 Sep 2024 16:42:04 +0800 Subject: [PATCH 6/6] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dpr=E4=B8=AD=E6=8F=90?= =?UTF-8?q?=E5=87=BA=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../processor/AdminBrokerProcessor.java | 120 ++++++++---------- .../rocketmq/client/impl/MQClientAPIImpl.java | 11 +- ...ckRocksdbCqWriteProgressRequestHeader.java | 47 +++++++ .../store/config/MessageStoreConfig.java | 11 ++ .../store/queue/RocksDBConsumeQueue.java | 1 - .../store/queue/RocksDBConsumeQueueStore.java | 10 +- .../tools/admin/DefaultMQAdminExt.java | 4 +- .../tools/admin/DefaultMQAdminExtImpl.java | 5 +- .../rocketmq/tools/admin/MQAdminExt.java | 2 +- .../CheckRocksdbCqWriteProgressCommand.java | 17 ++- 10 files changed, 145 insertions(+), 83 deletions(-) create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckRocksdbCqWriteProgressRequestHeader.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 64e7a9bb8c0..7afaea1aba0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -22,6 +22,7 @@ import com.google.common.collect.Sets; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; +import io.opentelemetry.api.common.Attributes; import java.io.UnsupportedEncodingException; import java.net.UnknownHostException; import java.nio.charset.StandardCharsets; @@ -33,14 +34,12 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Random; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import io.opentelemetry.api.common.Attributes; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.acl.AccessValidator; @@ -140,6 +139,7 @@ import org.apache.rocketmq.remoting.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.body.UserInfo; +import org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CloneGroupOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CreateAccessConfigRequestHeader; @@ -221,8 +221,9 @@ import org.apache.rocketmq.store.timer.TimerCheckpoint; import org.apache.rocketmq.store.timer.TimerMessageStore; import org.apache.rocketmq.store.util.LibC; -import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM; + import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_INVOCATION_STATUS; +import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM; import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse; public class AdminBrokerProcessor implements NettyRequestProcessor { @@ -464,79 +465,71 @@ private RemotingCommand updateAndGetGroupForbidden(ChannelHandlerContext ctx, Re return response; } - private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand request) { + private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + CheckRocksdbCqWriteProgressRequestHeader requestHeader = request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class); + String requestTopic = requestHeader.getTopic(); final RemotingCommand response = RemotingCommand.createResponseCommand(null); response.setCode(ResponseCode.SUCCESS); - response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", "diff success, very good!"))); DefaultMessageStore messageStore = (DefaultMessageStore) brokerController.getMessageStore(); RocksDBMessageStore rocksDBMessageStore = messageStore.getRocksDBMessageStore(); - if (!messageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) { - response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", "RocksdbCQWriteEnable is false, CheckRocksdbCqWriteProgressCommand is invalid"))); + response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", "rocksdbCQWriteEnable is false, checkRocksdbCqWriteProgressCommand is invalid"))); return response; } ConcurrentMap> cqTable = messageStore.getConsumeQueueTable(); - Random random = new Random(); - for (Map.Entry> topicToCqListEntry : cqTable.entrySet()) { - String topic = topicToCqListEntry.getKey(); - ConcurrentMap queueIdToCqMap = topicToCqListEntry.getValue(); - for (Map.Entry queueIdToCqEntry : queueIdToCqMap.entrySet()) { - Integer queueId = queueIdToCqEntry.getKey(); - - ConsumeQueueInterface jsonCq = queueIdToCqEntry.getValue(); - ConsumeQueueInterface kvCq = rocksDBMessageStore.getConsumeQueue(topic, queueId); - - CqUnit kvCqEarliestUnit = kvCq.getEarliestUnit(); - CqUnit kvCqLatestUnit = kvCq.getLatestUnit(); - CqUnit jsonCqEarliestUnit = jsonCq.getEarliestUnit(); - CqUnit jsonCqLatestUnit = jsonCq.getLatestUnit(); - LOGGER.info("CheckRocksdbCqWriteProgressCommand topic:{}, queue:{}, kvCqEarliestUnit:{}, jsonCqEarliestUnit:{}, kvCqLatestUnit:{}, jsonCqLatestUnit:{}", - topic, queueId, kvCqEarliestUnit, jsonCqEarliestUnit, kvCqLatestUnit, jsonCqLatestUnit); - - long jsonOffset = jsonCq.getMaxOffsetInQueue() - 1; - int sampleCount = 10; - - Set sampledOffsets = new HashSet<>(); - - if (jsonOffset > 100) { - // Randomly sample 10 offsets from the last 100 entries - long startOffset = jsonOffset - 100; - while (sampledOffsets.size() < sampleCount) { - long randomOffset = startOffset + random.nextInt(100); - sampledOffsets.add(randomOffset); - } - } else if (jsonOffset > 10) { - // Take the last 10 entries - long startOffset = jsonOffset - 10; - for (long i = startOffset; i < jsonOffset; i++) { - sampledOffsets.add(i); - } - } else { - // Take all available entries if less than 10 - for (long i = 0; i < jsonOffset; i++) { - sampledOffsets.add(i); - } - } + StringBuilder diffResult = new StringBuilder("check success, all is ok!\n"); + try { + if (StringUtils.isNotBlank(requestTopic)) { + processConsumeQueuesForTopic(cqTable.get(requestTopic), requestTopic, rocksDBMessageStore, diffResult,false); + response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", diffResult.toString()))); + return response; + } + for (Map.Entry> topicEntry : cqTable.entrySet()) { + String topic = topicEntry.getKey(); + processConsumeQueuesForTopic(topicEntry.getValue(), topic, rocksDBMessageStore, diffResult,true); + } + diffResult.append("check all topic successful, size:").append(cqTable.size()); + response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", diffResult.toString()))); - for (long currentOffset : sampledOffsets) { - Pair kvCqUnitTime = kvCq.getCqUnitAndStoreTime(currentOffset); - Pair jsonCqUnitTime = jsonCq.getCqUnitAndStoreTime(currentOffset); - if (!checkCqUnitEqual(kvCqUnitTime.getObject1(), jsonCqUnitTime.getObject1())) { - String diffInfo = String.format("Difference found at topic:%s, queue:%s, offset:%s - kvCqUnit:%s, jsonCqUnit:%s", - topic, queueId, currentOffset, kvCqUnitTime.getObject1(), jsonCqUnitTime.getObject1()); - LOGGER.error(diffInfo); - response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", diffInfo))); - return response; - } + } catch (Exception e) { + LOGGER.error("CheckRocksdbCqWriteProgressCommand error", e); + response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", e.getMessage()))); + } + return response; + } + + private void processConsumeQueuesForTopic(ConcurrentMap queueMap, String topic, RocksDBMessageStore rocksDBMessageStore, StringBuilder diffResult, boolean checkAll) { + for (Map.Entry queueEntry : queueMap.entrySet()) { + Integer queueId = queueEntry.getKey(); + ConsumeQueueInterface jsonCq = queueEntry.getValue(); + ConsumeQueueInterface kvCq = rocksDBMessageStore.getConsumeQueue(topic, queueId); + if (!checkAll) { + String format = String.format("\n[topic: %s, queue: %s] \n kvEarliest : %s | kvLatest : %s \n fileEarliest: %s | fileEarliest: %s ", + topic, queueId, kvCq.getEarliestUnit(), kvCq.getLatestUnit(), jsonCq.getEarliestUnit(), jsonCq.getLatestUnit()); + diffResult.append(format).append("\n"); + } + long maxFileOffsetInQueue = jsonCq.getMaxOffsetInQueue(); + long minOffsetInQueue = kvCq.getMinOffsetInQueue(); + for (long i = minOffsetInQueue; i < maxFileOffsetInQueue; i++) { + Pair fileCqUnit = jsonCq.getCqUnitAndStoreTime(i); + Pair kvCqUnit = kvCq.getCqUnitAndStoreTime(i); + if (fileCqUnit == null || kvCqUnit == null) { + diffResult.append(String.format("[topic: %s, queue: %s, offset: %s] \n kv : %s \n file: %s \n", + topic, queueId, i, kvCqUnit != null ? kvCqUnit.getObject1() : "null", fileCqUnit != null ? fileCqUnit.getObject1() : "null")); + return; + } + if (!checkCqUnitEqual(kvCqUnit.getObject1(), fileCqUnit.getObject1())) { + String diffInfo = String.format("[topic:%s, queue: %s offset: %s] \n file: %s \n kv: %s", + topic, queueId, i, kvCqUnit.getObject1(), fileCqUnit.getObject1()); + LOGGER.error(diffInfo); + diffResult.append(diffInfo).append("\n"); + return; } } - } - return response; } - @Override public boolean rejectRequest() { return false; @@ -3398,9 +3391,6 @@ private boolean checkCqUnitEqual(CqUnit cqUnit1, CqUnit cqUnit2) { if (cqUnit1.getBatchNum() != cqUnit2.getBatchNum()) { return false; } - if (cqUnit1.getTagsCode() != cqUnit2.getTagsCode()) { - return false; - } - return true; + return cqUnit1.getTagsCode() == cqUnit2.getTagsCode(); } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index bfc7da50fed..b2cae4c74a8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -112,6 +112,7 @@ import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo; import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData; import org.apache.rocketmq.remoting.protocol.body.CheckClientRequestBody; +import org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody; import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; @@ -119,7 +120,6 @@ import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.remoting.protocol.body.CreateTopicListRequestBody; -import org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody; import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache; import org.apache.rocketmq.remoting.protocol.body.GetConsumerStatusBody; import org.apache.rocketmq.remoting.protocol.body.GroupList; @@ -148,6 +148,7 @@ import org.apache.rocketmq.remoting.protocol.header.AddBrokerRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CloneGroupOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ConsumerSendMsgBackRequestHeader; @@ -185,9 +186,9 @@ import org.apache.rocketmq.remoting.protocol.header.GetTopicStatsInfoRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetTopicsByClusterRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetUserRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.HeartbeatRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ListAclsRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ListUsersRequestHeader; -import org.apache.rocketmq.remoting.protocol.header.HeartbeatRequestHeader; import org.apache.rocketmq.remoting.protocol.header.LockBatchMqRequestHeader; import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.PopMessageResponseHeader; @@ -3017,9 +3018,11 @@ public QueryConsumeQueueResponseBody queryConsumeQueue(final String brokerAddr, throw new MQClientException(response.getCode(), response.getRemark()); } - public CheckRocksdbCqWriteProgressResponseBody checkRocksdbCqWriteProgress(final String brokerAddr, final long timeoutMillis) throws InterruptedException, + public CheckRocksdbCqWriteProgressResponseBody checkRocksdbCqWriteProgress(final String brokerAddr, final String topic, final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS, null); + CheckRocksdbCqWriteProgressRequestHeader header = new CheckRocksdbCqWriteProgressRequestHeader(); + header.setTopic(topic); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS, header); RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, timeoutMillis); assert response != null; if (ResponseCode.SUCCESS == response.getCode()) { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckRocksdbCqWriteProgressRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckRocksdbCqWriteProgressRequestHeader.java new file mode 100644 index 00000000000..fee158b4976 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckRocksdbCqWriteProgressRequestHeader.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.protocol.header; + +import org.apache.rocketmq.common.action.Action; +import org.apache.rocketmq.common.action.RocketMQAction; +import org.apache.rocketmq.common.resource.ResourceType; +import org.apache.rocketmq.common.resource.RocketMQResource; +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.protocol.RequestCode; + +@RocketMQAction(value = RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS, action = Action.GET) +public class CheckRocksdbCqWriteProgressRequestHeader implements CommandCustomHeader { + + @CFNotNull + @RocketMQResource(ResourceType.TOPIC) + private String topic; + + @Override + public void checkFields() throws RemotingCommandException { + + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 454dce6d9ce..9c524aa5aea 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -428,6 +428,17 @@ public class MessageStoreConfig { private boolean rocksdbCQDoubleWriteEnable = false; + private boolean enableBatchWriteKvCq = false; + + + public boolean isEnableBatchWriteKvCq() { + return enableBatchWriteKvCq; + } + + public void setEnableBatchWriteKvCq(boolean enableBatchWriteKvCq) { + this.enableBatchWriteKvCq = enableBatchWriteKvCq; + } + public boolean isRocksdbCQDoubleWriteEnable() { return rocksdbCQDoubleWriteEnable; } diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java index 3d195513c8c..2363c2896e5 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java @@ -18,7 +18,6 @@ import java.nio.ByteBuffer; import java.util.List; - import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.attribute.CQType; diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java index 3c6b91ec018..34c6d2f3956 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java @@ -28,7 +28,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.BoundaryType; @@ -78,6 +77,8 @@ public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore { private final Map> tempTopicQueueMaxOffsetMap; private volatile boolean isCQError = false; + private boolean enableBatchWriteKvCq; + public RocksDBConsumeQueueStore(DefaultMessageStore messageStore) { super(messageStore); @@ -87,6 +88,7 @@ public RocksDBConsumeQueueStore(DefaultMessageStore messageStore) { this.rocksDBConsumeQueueOffsetTable = new RocksDBConsumeQueueOffsetTable(rocksDBConsumeQueueTable, rocksDBStorage, messageStore); this.writeBatch = new WriteBatch(); + this.enableBatchWriteKvCq = messageStoreConfig.isEnableBatchWriteKvCq(); this.bufferDRList = new ArrayList(BATCH_SIZE); this.cqBBPairList = new ArrayList(BATCH_SIZE); this.offsetBBPairList = new ArrayList(BATCH_SIZE); @@ -164,12 +166,12 @@ private boolean shutdownInner() { @Override public void putMessagePositionInfoWrapper(DispatchRequest request) throws RocksDBException { - if (request == null || this.bufferDRList.size() >= BATCH_SIZE) { - putMessagePosition(); - } if (request != null) { this.bufferDRList.add(request); } + if (request == null || !enableBatchWriteKvCq || this.bufferDRList.size() >= BATCH_SIZE) { + putMessagePosition(); + } } public void putMessagePosition() throws RocksDBException { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index 17a96b1dd91..04e9ba6920e 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -732,9 +732,9 @@ public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String } @Override - public CheckRocksdbCqWriteProgressResponseBody checkRocksdbCqWriteProgress(String brokerAddr) + public CheckRocksdbCqWriteProgressResponseBody checkRocksdbCqWriteProgress(String brokerAddr, String topic) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { - return this.defaultMQAdminExtImpl.checkRocksdbCqWriteProgress(brokerAddr); + return this.defaultMQAdminExtImpl.checkRocksdbCqWriteProgress(brokerAddr, topic); } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 52172cf1c0c..79b2a5f95f5 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -1759,8 +1759,9 @@ public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String } @Override - public CheckRocksdbCqWriteProgressResponseBody checkRocksdbCqWriteProgress(String brokerAddr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { - return this.mqClientInstance.getMQClientAPIImpl().checkRocksdbCqWriteProgress(brokerAddr, timeoutMillis); + public CheckRocksdbCqWriteProgressResponseBody checkRocksdbCqWriteProgress(String brokerAddr, String topic) + throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { + return this.mqClientInstance.getMQClientAPIImpl().checkRocksdbCqWriteProgress(brokerAddr, topic, timeoutMillis); } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index b48b1cc6afd..b23c7df3a99 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -149,7 +149,7 @@ ConsumeStats examineConsumeStats( final String consumerGroup) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; - CheckRocksdbCqWriteProgressResponseBody checkRocksdbCqWriteProgress(String brokerAddr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException; + CheckRocksdbCqWriteProgressResponseBody checkRocksdbCqWriteProgress(String brokerAddr, String topic) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException; ConsumeStats examineConsumeStats(final String consumerGroup, final String topic) throws RemotingException, MQClientException, diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java index 31584186c52..82dcb741962 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java @@ -24,8 +24,8 @@ import org.apache.commons.cli.Options; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.route.BrokerData; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; @@ -51,6 +51,10 @@ public Options buildCommandlineOptions(Options options) { opt = new Option("n", "nameserverAddr", true, "nameserverAddr"); opt.setRequired(true); options.addOption(opt); + + opt = new Option("t", "topic", true, "topic name"); + opt.setRequired(false); + options.addOption(opt); return options; } @@ -61,6 +65,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); defaultMQAdminExt.setNamesrvAddr(StringUtils.trim(commandLine.getOptionValue('n'))); String clusterName = commandLine.hasOption('c') ? commandLine.getOptionValue('c').trim() : ""; + String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : ""; try { defaultMQAdminExt.start(); @@ -68,15 +73,19 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { Map> clusterAddrTable = clusterInfo.getClusterAddrTable(); Map brokerAddrTable = clusterInfo.getBrokerAddrTable(); if (clusterAddrTable.get(clusterName) == null) { - System.out.printf("clusterAddrTable is empty"); + System.out.print("clusterAddrTable is empty"); return; } for (Map.Entry entry : brokerAddrTable.entrySet()) { String brokerName = entry.getKey(); BrokerData brokerData = entry.getValue(); String brokerAddr = brokerData.getBrokerAddrs().get(0L); - CheckRocksdbCqWriteProgressResponseBody body = defaultMQAdminExt.checkRocksdbCqWriteProgress(brokerAddr); - System.out.printf(brokerName + " | " + brokerAddr + " | " + body.getDiffResult()); + CheckRocksdbCqWriteProgressResponseBody body = defaultMQAdminExt.checkRocksdbCqWriteProgress(brokerAddr, topic); + if (StringUtils.isNotBlank(topic)) { + System.out.printf(body.getDiffResult()); + } else { + System.out.printf(brokerName + " | " + brokerAddr + " | " + body.getDiffResult()); + } } } catch (Exception e) {