diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 74a2516174a..ed92e1f3481 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -1096,22 +1096,41 @@ public MQClientInstance getmQClientFactory() { private boolean tryToCompressMessage(final Message msg) { if (msg instanceof MessageBatch) { - //batch does not support compressing right now - return false; - } - byte[] body = msg.getBody(); - if (body != null) { - if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) { + MessageBatch bmsg = (MessageBatch) msg; + long totalLen = 0; + int totalCount = 0; + List msgs = new ArrayList<>(); + for (Message m : bmsg) { + totalLen += m.getBody().length; + totalCount++; + msgs.add(m); + } + if (totalLen <= 0) return false; + + int avgLen = (int) (totalLen / totalCount); + if (avgLen >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) { try { - byte[] data = this.defaultMQProducer.getCompressor().compress(body, this.defaultMQProducer.getCompressLevel()); - if (data != null) { - msg.setBody(data); - return true; - } + msg.setBody(MessageDecoder.encodeMessages(msgs,this.defaultMQProducer.getCompressor(),this.defaultMQProducer.getCompressLevel())); + return true; } catch (IOException e) { log.error("tryToCompressMessage exception", e); - if (log.isDebugEnabled()) { - log.debug(msg.toString()); + } + } + } else { + byte[] body = msg.getBody(); + if (body != null) { + if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) { + try { + byte[] data = this.defaultMQProducer.getCompressor().compress(body, this.defaultMQProducer.getCompressLevel()); + if (data != null) { + msg.setBody(data); + return true; + } + } catch (IOException e) { + log.error("tryToCompressMessage exception", e); + if (log.isDebugEnabled()) { + log.debug(msg.toString()); + } } } } diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index 4cf899f9708..54e6fe78b5e 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -259,6 +259,20 @@ public MessageQueue select(List mqs, Message msg, Object arg) { assertThat(cc.get()).isEqualTo(10); } + @Test + public void testBatchSendBigMessage() throws RemotingException, InterruptedException, MQClientException, MQBrokerException { + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); + List msgs = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + Message message = new Message(); + message.setTopic("test"); + message.setBody((new String(bigMessage.getBody()) + i).getBytes()); + msgs.add(message); + } + SendResult sendResult = producer.send(msgs); + assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); + } + @Test public void testBatchSendMessageAsync() throws RemotingException, MQClientException, InterruptedException, MQBrokerException { diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java index f5491e192af..840ac6f5b44 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java @@ -659,8 +659,19 @@ public static Map string2messageProperties(final String properti } public static byte[] encodeMessage(Message message) { + try { + return encodeMessage(message,null,null); + } catch (IOException e) { + //should not happen because not doing compress + throw new RuntimeException(e); + } + } + public static byte[] encodeMessage(Message message, Compressor compressor,Integer compressLevel) throws IOException { //only need flag, body, properties byte[] body = message.getBody(); + if (compressor != null) { + body = compressor.compress(body,compressLevel); + } int bodyLen = body.length; String properties = messageProperties2String(message.getProperties()); byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8); @@ -729,11 +740,20 @@ public static Message decodeMessage(ByteBuffer byteBuffer) throws Exception { } public static byte[] encodeMessages(List messages) { + try { + return encodeMessages(messages, null,null); + } catch (IOException e) { + //should not happen because not doing compress + throw new RuntimeException(e); + } + } + + public static byte[] encodeMessages(List messages, Compressor compressor, Integer compressLevel) throws IOException { //TO DO refactor, accumulate in one buffer, avoid copies List encodedMessages = new ArrayList<>(messages.size()); int allSize = 0; for (Message message : messages) { - byte[] tmp = encodeMessage(message); + byte[] tmp = encodeMessage(message,compressor,compressLevel); encodedMessages.add(tmp); allSize += tmp.length; } diff --git a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java index cdda908f626..814bcf425eb 100644 --- a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java +++ b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java @@ -81,11 +81,17 @@ public static RMQPopClient getRMQPopClient() { return client; } - public static DefaultMQPullConsumer getRMQPullConsumer(String nsAddr, String consumerGroup) throws Exception { + public static DefaultMQPullConsumer getRMQPullConsumer(String nsAddr, String consumerGroup, boolean start) throws Exception { DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(consumerGroup); defaultMQPullConsumer.setInstanceName(UUID.randomUUID().toString()); defaultMQPullConsumer.setNamesrvAddr(nsAddr); - defaultMQPullConsumer.start(); + if (start) { + defaultMQPullConsumer.start(); + } return defaultMQPullConsumer; } + + public static DefaultMQPullConsumer getRMQPullConsumer(String nsAddr, String consumerGroup) throws Exception { + return getRMQPullConsumer(nsAddr, consumerGroup, true); + } } diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java index 38176c83ede..e0ad3ff2059 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java @@ -37,6 +37,7 @@ import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.store.DefaultMessageStore; @@ -214,6 +215,69 @@ public void testBatchSend_SysOuterBatch() throws Exception { } } + @Test + public void testBatchSend_CompressionBody() throws Exception { + Assert.assertTrue(brokerController1.getMessageStore() instanceof DefaultMessageStore); + Assert.assertTrue(brokerController2.getMessageStore() instanceof DefaultMessageStore); + Assert.assertTrue(brokerController3.getMessageStore() instanceof DefaultMessageStore); + + String batchTopic = UUID.randomUUID().toString(); + IntegrationTestBase.initTopic(batchTopic, NAMESRV_ADDR, CLUSTER_NAME, CQType.SimpleCQ); + Assert.assertEquals(8, brokerController1.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums()); + Assert.assertEquals(8, brokerController2.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums()); + Assert.assertEquals(8, brokerController3.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums()); + Assert.assertEquals(0, brokerController1.getMessageStore().getMinOffsetInQueue(batchTopic, 0)); + Assert.assertEquals(0, brokerController2.getMessageStore().getMinOffsetInQueue(batchTopic, 0)); + Assert.assertEquals(0, brokerController3.getMessageStore().getMinOffsetInQueue(batchTopic, 0)); + Assert.assertEquals(0, brokerController1.getMessageStore().getMaxOffsetInQueue(batchTopic, 0)); + Assert.assertEquals(0, brokerController2.getMessageStore().getMaxOffsetInQueue(batchTopic, 0)); + Assert.assertEquals(0, brokerController3.getMessageStore().getMaxOffsetInQueue(batchTopic, 0)); + + DefaultMQProducer producer = ProducerFactory.getRMQProducer(NAMESRV_ADDR); + MessageQueue messageQueue = producer.fetchPublishMessageQueues(batchTopic).iterator().next(); + int bodyCompressionThreshold = producer.getCompressMsgBodyOverHowmuch(); + + int bodyLen = bodyCompressionThreshold + 1; + int batchCount = 10; + int batchNum = 10; + for (int i = 0; i < batchCount; i++) { + List messageList = new ArrayList<>(); + for (int j = 0; j < batchNum; j++) { + messageList.add(new Message(batchTopic, RandomUtils.getStringWithNumber(bodyLen).getBytes())); + } + SendResult sendResult = producer.send(messageList, messageQueue); + Assert.assertEquals(SendStatus.SEND_OK, sendResult.getSendStatus()); + Assert.assertEquals(messageQueue.getQueueId(), sendResult.getMessageQueue().getQueueId()); + Assert.assertEquals(i * batchNum, sendResult.getQueueOffset()); + Assert.assertEquals(10, sendResult.getMsgId().split(",").length); + } + Thread.sleep(300); + { + // not start to set decodeDecompressBody independent of + // system property ClientConfig.DECODE_DECOMPRESS_BODY(com.rocketmq.decompress.body) config + DefaultMQPullConsumer defaultMQPullConsumer = ConsumerFactory.getRMQPullConsumer(NAMESRV_ADDR, "group",false); + defaultMQPullConsumer.setDecodeDecompressBody(true); + defaultMQPullConsumer.start(); + long startOffset = 5; + PullResult pullResult = defaultMQPullConsumer.pullBlockIfNotFound(messageQueue, "*", startOffset, batchCount * batchNum); + Assert.assertEquals(PullStatus.FOUND, pullResult.getPullStatus()); + Assert.assertEquals(0, pullResult.getMinOffset()); + Assert.assertEquals(batchCount * batchNum, pullResult.getMaxOffset()); + Assert.assertEquals(batchCount * batchNum - startOffset, pullResult.getMsgFoundList().size()); + for (int i = 0; i < pullResult.getMsgFoundList().size(); i++) { + MessageExt messageExt = pullResult.getMsgFoundList().get(i); + Assert.assertEquals(i + startOffset, messageExt.getQueueOffset()); + Assert.assertEquals(batchTopic, messageExt.getTopic()); + Assert.assertEquals(messageQueue.getQueueId(), messageExt.getQueueId()); + Assert.assertEquals(bodyLen, messageExt.getBody().length); + Assert.assertTrue((messageExt.getSysFlag() & MessageSysFlag.COMPRESSED_FLAG) != 0); + Assert.assertTrue(messageExt.getStoreSize() < bodyLen); + } + } + } + + + @Test public void testBatchSend_CheckProperties() throws Exception { List messageList = new ArrayList<>();