Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #8755] batch send support compression #8746

Open
wants to merge 9 commits into
base: develop
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -1096,22 +1096,41 @@ public MQClientInstance getmQClientFactory() {

private boolean tryToCompressMessage(final Message msg) {
if (msg instanceof MessageBatch) {
//batch does not support compressing right now
return false;
}
byte[] body = msg.getBody();
if (body != null) {
if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
MessageBatch bmsg = (MessageBatch) msg;
long totalLen = 0;
int totalCount = 0;
List<Message> msgs = new ArrayList<>();
for (Message m : bmsg) {
totalLen += m.getBody().length;
totalCount++;
msgs.add(m);
}
if (totalLen <= 0) return false;

int avgLen = (int) (totalLen / totalCount);
if (avgLen >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
try {
byte[] data = this.defaultMQProducer.getCompressor().compress(body, this.defaultMQProducer.getCompressLevel());
if (data != null) {
msg.setBody(data);
return true;
}
msg.setBody(MessageDecoder.encodeMessages(msgs,this.defaultMQProducer.getCompressor(),this.defaultMQProducer.getCompressLevel()));
return true;
} catch (IOException e) {
log.error("tryToCompressMessage exception", e);
if (log.isDebugEnabled()) {
log.debug(msg.toString());
}
}
} else {
byte[] body = msg.getBody();
if (body != null) {
if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
try {
byte[] data = this.defaultMQProducer.getCompressor().compress(body, this.defaultMQProducer.getCompressLevel());
if (data != null) {
msg.setBody(data);
return true;
}
} catch (IOException e) {
log.error("tryToCompressMessage exception", e);
if (log.isDebugEnabled()) {
log.debug(msg.toString());
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,20 @@ public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
assertThat(cc.get()).isEqualTo(10);
}

@Test
public void testBatchSendBigMessage() throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
List<Message> msgs = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Message message = new Message();
message.setTopic("test");
message.setBody((new String(bigMessage.getBody()) + i).getBytes());
msgs.add(message);
}
SendResult sendResult = producer.send(msgs);
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
}

@Test
public void testBatchSendMessageAsync()
throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,8 +659,19 @@ public static Map<String, String> string2messageProperties(final String properti
}

public static byte[] encodeMessage(Message message) {
try {
return encodeMessage(message,null,null);
} catch (IOException e) {
//should not happen because not doing compress
throw new RuntimeException(e);
}
}
public static byte[] encodeMessage(Message message, Compressor compressor,Integer compressLevel) throws IOException {
//only need flag, body, properties
byte[] body = message.getBody();
if (compressor != null) {
body = compressor.compress(body,compressLevel);
}
int bodyLen = body.length;
String properties = messageProperties2String(message.getProperties());
byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
Expand Down Expand Up @@ -729,11 +740,20 @@ public static Message decodeMessage(ByteBuffer byteBuffer) throws Exception {
}

public static byte[] encodeMessages(List<Message> messages) {
try {
return encodeMessages(messages, null,null);
} catch (IOException e) {
//should not happen because not doing compress
throw new RuntimeException(e);
}
}

public static byte[] encodeMessages(List<Message> messages, Compressor compressor, Integer compressLevel) throws IOException {
//TO DO refactor, accumulate in one buffer, avoid copies
List<byte[]> encodedMessages = new ArrayList<>(messages.size());
int allSize = 0;
for (Message message : messages) {
byte[] tmp = encodeMessage(message);
byte[] tmp = encodeMessage(message,compressor,compressLevel);
encodedMessages.add(tmp);
allSize += tmp.length;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,17 @@ public static RMQPopClient getRMQPopClient() {
return client;
}

public static DefaultMQPullConsumer getRMQPullConsumer(String nsAddr, String consumerGroup) throws Exception {
public static DefaultMQPullConsumer getRMQPullConsumer(String nsAddr, String consumerGroup, boolean start) throws Exception {
DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(consumerGroup);
defaultMQPullConsumer.setInstanceName(UUID.randomUUID().toString());
defaultMQPullConsumer.setNamesrvAddr(nsAddr);
defaultMQPullConsumer.start();
if (start) {
defaultMQPullConsumer.start();
}
return defaultMQPullConsumer;
}

public static DefaultMQPullConsumer getRMQPullConsumer(String nsAddr, String consumerGroup) throws Exception {
return getRMQPullConsumer(nsAddr, consumerGroup, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.DefaultMessageStore;
Expand Down Expand Up @@ -214,6 +215,69 @@ public void testBatchSend_SysOuterBatch() throws Exception {
}
}

@Test
public void testBatchSend_CompressionBody() throws Exception {
Assert.assertTrue(brokerController1.getMessageStore() instanceof DefaultMessageStore);
Assert.assertTrue(brokerController2.getMessageStore() instanceof DefaultMessageStore);
Assert.assertTrue(brokerController3.getMessageStore() instanceof DefaultMessageStore);

String batchTopic = UUID.randomUUID().toString();
IntegrationTestBase.initTopic(batchTopic, NAMESRV_ADDR, CLUSTER_NAME, CQType.SimpleCQ);
Assert.assertEquals(8, brokerController1.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums());
Assert.assertEquals(8, brokerController2.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums());
Assert.assertEquals(8, brokerController3.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums());
Assert.assertEquals(0, brokerController1.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
Assert.assertEquals(0, brokerController2.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
Assert.assertEquals(0, brokerController3.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
Assert.assertEquals(0, brokerController1.getMessageStore().getMaxOffsetInQueue(batchTopic, 0));
Assert.assertEquals(0, brokerController2.getMessageStore().getMaxOffsetInQueue(batchTopic, 0));
Assert.assertEquals(0, brokerController3.getMessageStore().getMaxOffsetInQueue(batchTopic, 0));

DefaultMQProducer producer = ProducerFactory.getRMQProducer(NAMESRV_ADDR);
MessageQueue messageQueue = producer.fetchPublishMessageQueues(batchTopic).iterator().next();
int bodyCompressionThreshold = producer.getCompressMsgBodyOverHowmuch();

int bodyLen = bodyCompressionThreshold + 1;
int batchCount = 10;
int batchNum = 10;
for (int i = 0; i < batchCount; i++) {
List<Message> messageList = new ArrayList<>();
for (int j = 0; j < batchNum; j++) {
messageList.add(new Message(batchTopic, RandomUtils.getStringWithNumber(bodyLen).getBytes()));
}
SendResult sendResult = producer.send(messageList, messageQueue);
Assert.assertEquals(SendStatus.SEND_OK, sendResult.getSendStatus());
Assert.assertEquals(messageQueue.getQueueId(), sendResult.getMessageQueue().getQueueId());
Assert.assertEquals(i * batchNum, sendResult.getQueueOffset());
Assert.assertEquals(10, sendResult.getMsgId().split(",").length);
}
Thread.sleep(300);
{
// not start to set decodeDecompressBody independent of
// system property ClientConfig.DECODE_DECOMPRESS_BODY(com.rocketmq.decompress.body) config
DefaultMQPullConsumer defaultMQPullConsumer = ConsumerFactory.getRMQPullConsumer(NAMESRV_ADDR, "group",false);
defaultMQPullConsumer.setDecodeDecompressBody(true);
defaultMQPullConsumer.start();
long startOffset = 5;
PullResult pullResult = defaultMQPullConsumer.pullBlockIfNotFound(messageQueue, "*", startOffset, batchCount * batchNum);
Assert.assertEquals(PullStatus.FOUND, pullResult.getPullStatus());
Assert.assertEquals(0, pullResult.getMinOffset());
Assert.assertEquals(batchCount * batchNum, pullResult.getMaxOffset());
Assert.assertEquals(batchCount * batchNum - startOffset, pullResult.getMsgFoundList().size());
for (int i = 0; i < pullResult.getMsgFoundList().size(); i++) {
MessageExt messageExt = pullResult.getMsgFoundList().get(i);
Assert.assertEquals(i + startOffset, messageExt.getQueueOffset());
Assert.assertEquals(batchTopic, messageExt.getTopic());
Assert.assertEquals(messageQueue.getQueueId(), messageExt.getQueueId());
Assert.assertEquals(bodyLen, messageExt.getBody().length);
Assert.assertTrue((messageExt.getSysFlag() & MessageSysFlag.COMPRESSED_FLAG) != 0);
Assert.assertTrue(messageExt.getStoreSize() < bodyLen);
}
}
}



@Test
public void testBatchSend_CheckProperties() throws Exception {
List<Message> messageList = new ArrayList<>();
Expand Down
Loading