diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index a40a29ebbd64a..2719a2b22354d 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -346,6 +346,13 @@ protected void startPolling() { ConsumerRecords allRecords = consumer.poll(pollDuration); if (consumerListener != null) { if (!consumerListener.afterConsume(consumer)) { + // because the consumer will just poll the same messages again + // there we need to avoid consuming asap, by simulating 1 idle poll duration + try { + Thread.sleep(pollTimeoutMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } continue; } } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java index 58211ba86a0cb..beaa02fc49952 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java @@ -30,7 +30,6 @@ public class KafkaConsumerListener implements ConsumerListener consumer; private SeekPolicy seekPolicy; - private Predicate afterConsumeEval; public Consumer getConsumer() { @@ -56,34 +55,35 @@ public void setResumableCheck(Predicate afterConsumeEval) { @Override public boolean afterConsume(@SuppressWarnings("unused") Object ignored) { - if (afterConsumeEval.test(null)) { - LOG.warn("State changed, therefore resuming the consumer"); + boolean resume = afterConsumeEval.test(null); + if (resume) { + LOG.debug("Resuming consumer"); consumer.resume(consumer.assignment()); - - return true; + } else { + LOG.debug("Pausing consumer"); + seekConsumer(); } - - LOG.warn("The consumer is not yet resumable"); - return false; + return resume; } @Override public boolean afterProcess(ProcessingResult result) { if (result.isFailed()) { - LOG.warn("Pausing consumer due to error on the last processing"); + LOG.debug("Pausing consumer due to last processing error"); consumer.pause(consumer.assignment()); - - if (seekPolicy == SeekPolicy.BEGINNING) { - LOG.debug("Seeking from the beginning of topic"); - consumer.seekToBeginning(consumer.assignment()); - } else if (seekPolicy == SeekPolicy.END) { - LOG.debug("Seeking from the end off the topic"); - consumer.seekToEnd(consumer.assignment()); - } - + seekConsumer(); return false; } - return true; } + + protected void seekConsumer() { + if (seekPolicy == SeekPolicy.BEGINNING) { + LOG.debug("Seeking to beginning of topic"); + consumer.seekToBeginning(consumer.assignment()); + } else if (seekPolicy == SeekPolicy.END) { + LOG.debug("Seeking to end of topic"); + consumer.seekToEnd(consumer.assignment()); + } + } } diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerPauseableEeiIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerPauseableEeiIT.java new file mode 100644 index 0000000000000..60ac902025f46 --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerPauseableEeiIT.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.integration.commit; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.kafka.KafkaConstants; +import org.apache.camel.component.kafka.consumer.KafkaManualCommit; +import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.RepeatedTest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class KafkaConsumerPauseableEeiIT extends BaseManualCommitTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerPauseableEeiIT.class); + + public static final String TOPIC = "testPauseableEipTest"; + + private volatile int count = 0; + + @AfterEach + public void after() { + cleanupKafka(TOPIC); + } + + @Override + protected RouteBuilder createRouteBuilder() { + String from = "kafka:" + TOPIC + + "?groupId=KafkaConsumerPauseableEeiIT&pollTimeoutMs=1000" + + "&autoCommitEnable=false&allowManualCommit=true&maxPollRecords=1"; + + return new RouteBuilder() { + @Override + public void configure() { + from(from).routeId("foo") + .pausable(new KafkaConsumerListener(), o -> canContinue()) + .log("IN ${body}") + .process(e -> { + log.info("Manual commit: " + e.getMessage().getBody(String.class)); + KafkaManualCommit manual + = e.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); + assertNotNull(manual); + manual.commit(); + }) + .log("RESULT ${body}") + .to("mock:result"); + } + }; + } + + @RepeatedTest(1) + public void kafkaPauseableEip() throws Exception { + MockEndpoint result = contextExtension.getMockEndpoint("mock:result"); + // we receive all 15 records + result.expectedMessageCount(15); + + sendRecords(0, 15, TOPIC); + + result.assertIsSatisfied(); + + // but some were temporary paused so the counter should be higher + assertTrue(count > 20); + } + + public boolean canContinue() { + count++; + boolean answer; + if (count < 4 || count > 10) { + answer = true; + } else { + answer = false; + } + LOG.info("canContinue count: {} -> {}", count, answer); + return answer; + } + +} diff --git a/components/camel-kafka/src/test/resources/log4j2.properties b/components/camel-kafka/src/test/resources/log4j2.properties index d1ad676c533ce..ab51785efdca8 100644 --- a/components/camel-kafka/src/test/resources/log4j2.properties +++ b/components/camel-kafka/src/test/resources/log4j2.properties @@ -25,7 +25,6 @@ appender.stdout.name = stdout appender.stdout.layout.type = PatternLayout appender.stdout.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n -rootLogger.level = WARN rootLogger.appenderRef.out.ref = out logger.camel.name=org.apache.camel @@ -40,6 +39,7 @@ logger.idem.level=INFO logger.resume.name=org.apache.camel.processor.resume.kafka logger.resume.level=INFO +# reduce kafka noise logger.kafka.name=org.apache.kafka logger.kafka.level=WARN