Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Support JSON CQ Files and offset In-place Upgrade to RocksDB #8687

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.AbstractMap;
import java.util.ArrayList;
Expand Down Expand Up @@ -789,6 +788,9 @@ public boolean initializeMessageStore() {
defaultMessageStore = new RocksDBMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable());
} else {
defaultMessageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable());
if (messageStoreConfig.isRocksdbCQDoubleWriteEnable()) {
defaultMessageStore.enableRocksdbCQWrite();
}
}

if (messageStoreConfig.isEnableDLegerCommitLog()) {
Expand All @@ -812,7 +814,7 @@ public boolean initializeMessageStore() {
this.timerMessageStore.registerEscapeBridgeHook(msg -> escapeBridge.putMessage(msg));
this.messageStore.setTimerMessageStore(this.timerMessageStore);
}
} catch (IOException e) {
} catch (Exception e) {
result = false;
LOG.error("BrokerController#initialize: unexpected error occurs", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
Expand Down Expand Up @@ -373,6 +374,25 @@ public void setDataVersion(DataVersion dataVersion) {
this.dataVersion = dataVersion;
}

public boolean loadDataVersion() {
String fileName = null;
try {
fileName = this.configFilePath();
String jsonString = MixAll.file2String(fileName);
if (jsonString != null) {
ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);
if (obj != null) {
this.dataVersion = obj.dataVersion;
}
LOG.info("load consumer offset dataVersion success, " + fileName + " " + jsonString);
}
return true;
} catch (Exception e) {
LOG.error("load consumer offset dataVersion failed " + fileName, e);
return false;
}
}

public void removeOffset(final String group) {
Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
while (it.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,79 @@
*/
package org.apache.rocketmq.broker.offset;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import java.io.File;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.RocksDBConfigManager;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.rocksdb.WriteBatch;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;

public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager {

protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

protected RocksDBConfigManager rocksDBConfigManager;

public RocksDBConsumerOffsetManager(BrokerController brokerController) {
super(brokerController);
this.rocksDBConfigManager = new RocksDBConfigManager(configFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
}

@Override
public boolean load() {
if (!rocksDBConfigManager.init()) {
return false;
}
return this.rocksDBConfigManager.loadData(this::decodeOffset);
if (!loadDataVersion() || !loadConsumerOffset()) {
return false;
}

return true;
}

public boolean loadConsumerOffset() {
return this.rocksDBConfigManager.loadData(this::decodeOffset) && merge();
}

private boolean merge() {
if (!brokerController.getMessageStoreConfig().isTransferOffsetJsonToRocksdb()) {
log.info("the switch transferOffsetJsonToRocksdb is off, no merge offset operation is needed.");
return true;
}
if (!UtilAll.isPathExists(this.configFilePath()) && !UtilAll.isPathExists(this.configFilePath() + ".bak")) {
log.info("consumerOffset json file does not exist, so skip merge");
return true;
}
if (!super.loadDataVersion()) {
log.error("load json consumerOffset dataVersion error, startup will exit");
return false;
}

final DataVersion dataVersion = super.getDataVersion();
final DataVersion kvDataVersion = this.getDataVersion();
if (dataVersion.getCounter().get() > kvDataVersion.getCounter().get()) {
if (!super.load()) {
log.error("load json consumerOffset info failed, startup will exit");
return false;
}
this.persist();
this.getDataVersion().assignNewOne(dataVersion);
updateDataVersion();
log.info("update offset from json, dataVersion:{}, offsetTable: {} ", this.getDataVersion(), JSON.toJSONString(this.getOffsetTable()));
}
return true;
}


@Override
public boolean stop() {
return this.rocksDBConfigManager.stop();
Expand All @@ -69,8 +112,7 @@ protected void decodeOffset(final byte[] key, final byte[] body) {
LOG.info("load exist local offset, {}, {}", topicAtGroup, wrapper.getOffsetTable());
}

@Override
public String configFilePath() {
public String rocksdbConfigFilePath() {
return this.brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "config" + File.separator + "consumerOffsets" + File.separator;
}

Expand Down Expand Up @@ -103,4 +145,23 @@ private void putWriteBatch(final WriteBatch writeBatch, final String topicGroupN
byte[] valueBytes = JSON.toJSONBytes(wrapper, SerializerFeature.BrowserCompatible);
writeBatch.put(keyBytes, valueBytes);
}

@Override
public boolean loadDataVersion() {
return this.rocksDBConfigManager.loadDataVersion();
}

@Override
public DataVersion getDataVersion() {
return rocksDBConfigManager.getKvDataVersion();
}

public void updateDataVersion() {
try {
rocksDBConfigManager.updateKvDataVersion();
} catch (Exception e) {
log.error("update consumer offset dataVersion error", e);
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.opentelemetry.api.common.Attributes;
import java.io.UnsupportedEncodingException;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
Expand All @@ -38,7 +40,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import io.opentelemetry.api.common.Attributes;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.AccessValidator;
Expand Down Expand Up @@ -69,6 +70,7 @@
import org.apache.rocketmq.common.LockCallback;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UnlockCallback;
Expand Down Expand Up @@ -137,6 +139,7 @@
import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
import org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CloneGroupOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CreateAccessConfigRequestHeader;
Expand Down Expand Up @@ -209,6 +212,7 @@
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.RocksDBMessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
Expand All @@ -217,8 +221,9 @@
import org.apache.rocketmq.store.timer.TimerCheckpoint;
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.util.LibC;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;

import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_INVOCATION_STATUS;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;

public class AdminBrokerProcessor implements NettyRequestProcessor {
Expand Down Expand Up @@ -339,6 +344,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx,
return fetchAllConsumeStatsInBroker(ctx, request);
case RequestCode.QUERY_CONSUME_QUEUE:
return queryConsumeQueue(ctx, request);
case RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS:
return this.checkRocksdbCqWriteProgress(ctx, request);
case RequestCode.UPDATE_AND_GET_GROUP_FORBIDDEN:
return this.updateAndGetGroupForbidden(ctx, request);
case RequestCode.GET_SUBSCRIPTIONGROUP_CONFIG:
Expand Down Expand Up @@ -458,6 +465,71 @@ private RemotingCommand updateAndGetGroupForbidden(ChannelHandlerContext ctx, Re
return response;
}

private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
CheckRocksdbCqWriteProgressRequestHeader requestHeader = request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class);
String requestTopic = requestHeader.getTopic();
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(ResponseCode.SUCCESS);

DefaultMessageStore messageStore = (DefaultMessageStore) brokerController.getMessageStore();
RocksDBMessageStore rocksDBMessageStore = messageStore.getRocksDBMessageStore();
if (!messageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) {
response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", "rocksdbCQWriteEnable is false, checkRocksdbCqWriteProgressCommand is invalid")));
return response;
}

ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> cqTable = messageStore.getConsumeQueueTable();
StringBuilder diffResult = new StringBuilder("check success, all is ok!\n");
try {
if (StringUtils.isNotBlank(requestTopic)) {
processConsumeQueuesForTopic(cqTable.get(requestTopic), requestTopic, rocksDBMessageStore, diffResult,false);
response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", diffResult.toString())));
return response;
}
for (Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> topicEntry : cqTable.entrySet()) {
String topic = topicEntry.getKey();
processConsumeQueuesForTopic(topicEntry.getValue(), topic, rocksDBMessageStore, diffResult,true);
}
diffResult.append("check all topic successful, size:").append(cqTable.size());
response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", diffResult.toString())));

} catch (Exception e) {
LOGGER.error("CheckRocksdbCqWriteProgressCommand error", e);
response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", e.getMessage())));
}
return response;
}

private void processConsumeQueuesForTopic(ConcurrentMap<Integer, ConsumeQueueInterface> queueMap, String topic, RocksDBMessageStore rocksDBMessageStore, StringBuilder diffResult, boolean checkAll) {
for (Map.Entry<Integer, ConsumeQueueInterface> queueEntry : queueMap.entrySet()) {
Integer queueId = queueEntry.getKey();
ConsumeQueueInterface jsonCq = queueEntry.getValue();
ConsumeQueueInterface kvCq = rocksDBMessageStore.getConsumeQueue(topic, queueId);
if (!checkAll) {
String format = String.format("\n[topic: %s, queue: %s] \n kvEarliest : %s | kvLatest : %s \n fileEarliest: %s | fileEarliest: %s ",
topic, queueId, kvCq.getEarliestUnit(), kvCq.getLatestUnit(), jsonCq.getEarliestUnit(), jsonCq.getLatestUnit());
diffResult.append(format).append("\n");
}
long maxFileOffsetInQueue = jsonCq.getMaxOffsetInQueue();
long minOffsetInQueue = kvCq.getMinOffsetInQueue();
for (long i = minOffsetInQueue; i < maxFileOffsetInQueue; i++) {
Pair<CqUnit, Long> fileCqUnit = jsonCq.getCqUnitAndStoreTime(i);
Pair<CqUnit, Long> kvCqUnit = kvCq.getCqUnitAndStoreTime(i);
if (fileCqUnit == null || kvCqUnit == null) {
diffResult.append(String.format("[topic: %s, queue: %s, offset: %s] \n kv : %s \n file: %s \n",
topic, queueId, i, kvCqUnit != null ? kvCqUnit.getObject1() : "null", fileCqUnit != null ? fileCqUnit.getObject1() : "null"));
return;
}
if (!checkCqUnitEqual(kvCqUnit.getObject1(), fileCqUnit.getObject1())) {
String diffInfo = String.format("[topic:%s, queue: %s offset: %s] \n file: %s \n kv: %s",
topic, queueId, i, kvCqUnit.getObject1(), fileCqUnit.getObject1());
LOGGER.error(diffInfo);
diffResult.append(diffInfo).append("\n");
return;
}
}
}
}
@Override
public boolean rejectRequest() {
return false;
Expand Down Expand Up @@ -3305,4 +3377,20 @@ private boolean validateBlackListConfigExist(Properties properties) {
}
return false;
}

private boolean checkCqUnitEqual(CqUnit cqUnit1, CqUnit cqUnit2) {
if (cqUnit1.getQueueOffset() != cqUnit2.getQueueOffset()) {
return false;
}
if (cqUnit1.getSize() != cqUnit2.getSize()) {
return false;
}
if (cqUnit1.getPos() != cqUnit2.getPos()) {
return false;
}
if (cqUnit1.getBatchNum() != cqUnit2.getBatchNum()) {
return false;
}
return cqUnit1.getTagsCode() == cqUnit2.getTagsCode();
}
}
Loading
Loading