From 4c2fdd94e27312282e11b17e12f4ec3e5adc113e Mon Sep 17 00:00:00 2001
From: Emre Kartoglu <969071+iemre@users.noreply.github.com>
Date: Tue, 7 Nov 2023 22:53:17 +0000
Subject: [PATCH] [FLINK-33260][Connectors/Kinesis] Allow user to provide a
list of recoverable exceptions (#110)
---
.../docs/connectors/table/kinesis.md | 8 ++
docs/content/docs/connectors/table/kinesis.md | 8 ++
.../config/ConsumerConfigConstants.java | 7 ++
.../kinesis/config/ExceptionConfig.java | 35 ++++++
.../config/RecoverableErrorsConfig.java | 99 +++++++++++++++++
.../fanout/FanOutRecordPublisher.java | 3 +-
.../FanOutRecordPublisherConfiguration.java | 14 +++
.../fanout/FanOutShardSubscriber.java | 75 +++++++++----
.../kinesis/util/KinesisConfigUtil.java | 7 ++
.../config/RecoverableErrorsConfigTest.java | 83 ++++++++++++++
.../fanout/FanOutShardSubscriberTest.java | 104 ++++++++++++++++--
.../kinesis/util/KinesisConfigUtilTest.java | 24 ++++
12 files changed, 439 insertions(+), 28 deletions(-)
create mode 100644 flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ExceptionConfig.java
create mode 100644 flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/RecoverableErrorsConfig.java
create mode 100644 flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/config/RecoverableErrorsConfigTest.java
diff --git a/docs/content.zh/docs/connectors/table/kinesis.md b/docs/content.zh/docs/connectors/table/kinesis.md
index b9010ff2..43fa0072 100644
--- a/docs/content.zh/docs/connectors/table/kinesis.md
+++ b/docs/content.zh/docs/connectors/table/kinesis.md
@@ -680,6 +680,14 @@ Connector Options
Long |
The interval (in milliseconds) after which to consider a shard idle for purposes of watermark generation. A positive value will allow the watermark to progress even when some shards don't receive new records. |
+
+ shard.consumer.error.recoverable[0].exception |
+ optional |
+ no |
+ (none) |
+ String |
+ User-specified Exception to retry indefinitely. Example value: `java.net.UnknownHostException`. This configuration is a zero-based array. As such, the specified exceptions must start with index 0. Specified exceptions must be valid Throwables in classpath, or connector will fail to initialize and fail fast. |
+
scan.watermark.sync.interval |
optional |
diff --git a/docs/content/docs/connectors/table/kinesis.md b/docs/content/docs/connectors/table/kinesis.md
index 3ad681ad..46cf1c74 100644
--- a/docs/content/docs/connectors/table/kinesis.md
+++ b/docs/content/docs/connectors/table/kinesis.md
@@ -681,6 +681,14 @@ Connector Options
Long |
The interval (in milliseconds) after which to consider a shard idle for purposes of watermark generation. A positive value will allow the watermark to progress even when some shards don't receive new records. |
+
+ shard.consumer.error.recoverable[0].exception |
+ optional |
+ no |
+ (none) |
+ String |
+ User-specified Exception to retry indefinitely. Example value: `java.net.UnknownHostException`. This configuration is a zero-based array. As such, the specified exceptions must start with index 0. Specified exceptions must be valid Throwables in classpath, or connector will fail to initialize and fail fast. |
+
scan.watermark.sync.interval |
optional |
diff --git a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index fff44d64..71068226 100644
--- a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -191,6 +191,13 @@ public enum EFORegistrationType {
public static final String REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT =
"flink.stream.registerstreamconsumer.backoff.expconst";
+ /**
+ * The user-provided list of exceptions to recover from. These exceptions are retried
+ * indefinitely.
+ */
+ public static final String RECOVERABLE_EXCEPTIONS_PREFIX =
+ "flink.shard.consumer.error.recoverable";
+
/** The maximum number of deregisterStream attempts if we get a recoverable exception. */
public static final String DEREGISTER_STREAM_RETRIES =
"flink.stream.deregisterstreamconsumer.maxretries";
diff --git a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ExceptionConfig.java b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ExceptionConfig.java
new file mode 100644
index 00000000..d3b4080b
--- /dev/null
+++ b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ExceptionConfig.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.config;
+
+/**
+ * Helper class to hold information/behaviour about Exceptions. Used for configuring recoverable
+ * exceptions.
+ */
+public class ExceptionConfig {
+ private final Class> exceptionClass;
+
+ public ExceptionConfig(Class> exClass) {
+ this.exceptionClass = exClass;
+ }
+
+ public Class> getExceptionClass() {
+ return exceptionClass;
+ }
+}
diff --git a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/RecoverableErrorsConfig.java b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/RecoverableErrorsConfig.java
new file mode 100644
index 00000000..8c5c4dfe
--- /dev/null
+++ b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/RecoverableErrorsConfig.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.config;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Hosts the recoverable exception configuration. Recoverable exceptions are retried indefinitely.
+ */
+public class RecoverableErrorsConfig {
+ public static final String INVALID_CONFIG_MESSAGE =
+ "Invalid config for recoverable consumer exceptions. "
+ + "Valid config example: "
+ + "`flink.shard.consumer.error.recoverable[0].exception=net.java.UnknownHostException`. "
+ + "Your config array must use zero-based indexing as shown in the example.";
+
+ /**
+ * Parses the array of recoverable error configs.
+ *
+ * @param config connector configuration
+ * @return an Optional of RecoverableErrorsConfig
+ */
+ public static Optional createConfigFromPropertiesOrThrow(
+ final Properties config) {
+ List exConfs = new ArrayList<>();
+ int idx = 0;
+ String exceptionConfigKey =
+ String.format(
+ "%s[%d].exception",
+ ConsumerConfigConstants.RECOVERABLE_EXCEPTIONS_PREFIX, idx);
+ while (config.containsKey(exceptionConfigKey)) {
+ String exPath = config.getProperty(exceptionConfigKey);
+ try {
+ Class> aClass = Class.forName(exPath);
+ if (!Throwable.class.isAssignableFrom(aClass)) {
+ throw new ClassCastException();
+ }
+ exConfs.add(new ExceptionConfig(aClass));
+ } catch (ClassCastException e) {
+ throw new IllegalArgumentException(
+ "Provided recoverable exception class is not a Throwable: " + exPath);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException(
+ "Provided recoverable exception class could not be found: " + exPath);
+ }
+ exceptionConfigKey =
+ String.format(
+ "%s[%d].exception",
+ ConsumerConfigConstants.RECOVERABLE_EXCEPTIONS_PREFIX, ++idx);
+ }
+ if (idx > 0) {
+ // We processed configs successfully
+ return Optional.of(new RecoverableErrorsConfig(exConfs));
+ }
+
+ // Check if user provided wrong config suffix, so they fail faster
+ for (Object key : config.keySet()) {
+ if (((String) key).startsWith(ConsumerConfigConstants.RECOVERABLE_EXCEPTIONS_PREFIX)) {
+ throw new IllegalArgumentException(RecoverableErrorsConfig.INVALID_CONFIG_MESSAGE);
+ }
+ }
+
+ return Optional.empty();
+ }
+
+ private final List exceptionConfigs;
+
+ public RecoverableErrorsConfig(List exceptionConfigs) {
+ this.exceptionConfigs = exceptionConfigs;
+ }
+
+ public boolean hasNoConfig() {
+ return CollectionUtils.isEmpty(exceptionConfigs);
+ }
+
+ public List getExceptionConfigs() {
+ return exceptionConfigs;
+ }
+}
diff --git a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java
index e4115125..ee0623ed 100644
--- a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java
+++ b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java
@@ -168,7 +168,8 @@ private RecordPublisherRunResult runWithBackoff(
subscribedShard.getShard().getShardId(),
kinesisProxy,
configuration.getSubscribeToShardTimeout(),
- runningSupplier);
+ runningSupplier,
+ configuration.getRecoverableErrorsConfig());
RecordPublisherRunResult result;
try {
diff --git a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
index cd46876d..cbc51039 100644
--- a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
+++ b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
@@ -20,6 +20,7 @@
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.config.RecoverableErrorsConfig;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
import org.apache.flink.util.Preconditions;
@@ -118,6 +119,9 @@ public class FanOutRecordPublisherConfiguration {
/** Exponential backoff power constant for the describe stream consumer operation. */
private final double describeStreamConsumerExpConstant;
+ /** Recoverable error configuration. These are retried indefinitely. */
+ private final RecoverableErrorsConfig recoverableErrorsConfig;
+
/**
* Creates a FanOutRecordPublisherConfiguration.
*
@@ -318,6 +322,8 @@ public FanOutRecordPublisherConfiguration(
.orElse(
ConsumerConfigConstants
.DEFAULT_DESCRIBE_STREAM_CONSUMER_BACKOFF_EXPONENTIAL_CONSTANT);
+
+ this.recoverableErrorsConfig = this.parseRecoverableErrorConfig(configProps);
}
// ------------------------------------------------------------------------
@@ -472,4 +478,12 @@ public Optional getConsumerName() {
public Optional getStreamConsumerArn(String stream) {
return Optional.ofNullable(streamConsumerArns.get(stream));
}
+
+ public RecoverableErrorsConfig parseRecoverableErrorConfig(final Properties config) {
+ return RecoverableErrorsConfig.createConfigFromPropertiesOrThrow(config).orElse(null);
+ }
+
+ public RecoverableErrorsConfig getRecoverableErrorsConfig() {
+ return recoverableErrorsConfig;
+ }
}
diff --git a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
index a9c7c098..21fe050e 100644
--- a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
+++ b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
@@ -19,9 +19,12 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.connectors.kinesis.config.ExceptionConfig;
+import org.apache.flink.streaming.connectors.kinesis.config.RecoverableErrorsConfig;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyAsyncV2;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyAsyncV2Interface;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import io.netty.handler.timeout.ReadTimeoutException;
@@ -38,6 +41,7 @@
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
import java.time.Duration;
+import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
@@ -121,6 +125,8 @@ public class FanOutShardSubscriber {
private final Supplier runningSupplier;
+ private final RecoverableErrorsConfig recoverableErrorsConfig;
+
/**
* Create a new Fan Out Shard subscriber.
*
@@ -130,20 +136,24 @@ public class FanOutShardSubscriber {
* @param subscribeToShardTimeout A timeout when waiting for a shard subscription to be
* established
* @param runningSupplier a callback to query if the consumer is still running
+ * @param recoverableErrorsConfig recoverable error configuration (errors that are retried
+ * indefinitely)
*/
FanOutShardSubscriber(
final String consumerArn,
final String shardId,
final KinesisProxyAsyncV2Interface kinesis,
final Duration subscribeToShardTimeout,
- final Supplier runningSupplier) {
+ final Supplier runningSupplier,
+ final RecoverableErrorsConfig recoverableErrorsConfig) {
this(
consumerArn,
shardId,
kinesis,
subscribeToShardTimeout,
DEFAULT_QUEUE_TIMEOUT,
- runningSupplier);
+ runningSupplier,
+ recoverableErrorsConfig);
}
/**
@@ -164,13 +174,15 @@ public class FanOutShardSubscriber {
final KinesisProxyAsyncV2Interface kinesis,
final Duration subscribeToShardTimeout,
final Duration queueWaitTimeout,
- final Supplier runningSupplier) {
+ final Supplier runningSupplier,
+ final RecoverableErrorsConfig recoverableErrorsConfig) {
this.kinesis = Preconditions.checkNotNull(kinesis);
this.consumerArn = Preconditions.checkNotNull(consumerArn);
this.shardId = Preconditions.checkNotNull(shardId);
this.subscribeToShardTimeout = subscribeToShardTimeout;
this.queueWaitTimeout = queueWaitTimeout;
this.runningSupplier = runningSupplier;
+ this.recoverableErrorsConfig = recoverableErrorsConfig;
}
/**
@@ -249,22 +261,21 @@ private FanOutShardSubscription openSubscriptionToShard(final StartingPosition s
kinesis.subscribeToShard(request, responseHandler);
- boolean subscriptionEstablished =
- waitForSubscriptionLatch.await(
+ boolean subscriptionTimedOut =
+ !waitForSubscriptionLatch.await(
subscribeToShardTimeout.toMillis(), TimeUnit.MILLISECONDS);
- if (!subscriptionEstablished) {
+ if (subscriptionTimedOut) {
final String errorMessage =
"Timed out acquiring subscription - " + shardId + " (" + consumerArn + ")";
LOG.error(errorMessage);
subscription.cancelSubscription();
- handleError(
- new RecoverableFanOutSubscriberException(new TimeoutException(errorMessage)));
+ handleErrorAndRethrow(new TimeoutException(errorMessage));
}
Throwable throwable = exception.get();
if (throwable != null) {
- handleError(throwable);
+ handleErrorAndRethrow(throwable);
}
LOG.debug("Acquired subscription - {} ({})", shardId, consumerArn);
@@ -282,7 +293,7 @@ private FanOutShardSubscription openSubscriptionToShard(final StartingPosition s
*
* @param throwable the exception that has occurred
*/
- private void handleError(final Throwable throwable) throws FanOutSubscriberException {
+ private void handleErrorAndRethrow(final Throwable throwable) throws FanOutSubscriberException {
Throwable cause;
if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
cause = throwable.getCause();
@@ -302,16 +313,10 @@ private void handleError(final Throwable throwable) throws FanOutSubscriberExcep
throw new FanOutSubscriberInterruptedException(throwable);
} else if (cause instanceof FanOutSubscriberException) {
throw (FanOutSubscriberException) cause;
- } else if (cause instanceof ReadTimeoutException) {
- // ReadTimeoutException occurs naturally under backpressure scenarios when full batches
- // take longer to
- // process than standard read timeout (default 30s). Recoverable exceptions are intended
- // to be retried
- // indefinitely to avoid system degradation under backpressure. The EFO connection
- // (subscription) to Kinesis
- // is closed, and reacquired once the queue of records has been processed.
+ } else if (isDefinedAsRecoverable(cause)) {
throw new RecoverableFanOutSubscriberException(cause);
} else {
+ // All other errors are treated as retryable
throw new RetryableFanOutSubscriberException(cause);
}
}
@@ -329,6 +334,38 @@ private boolean isInterrupted(final Throwable throwable) {
return false;
}
+ private boolean isDefinedAsRecoverable(Throwable cause) {
+ // non-customisable list of exceptions that should be recovered (retried indefinitely).
+ if (cause instanceof ReadTimeoutException || cause instanceof TimeoutException) {
+ // ReadTimeoutException occurs naturally under backpressure scenarios when full batches
+ // take longer to
+ // process than standard read timeout (default 30s). Recoverable exceptions are intended
+ // to be retried
+ // indefinitely to avoid system degradation under backpressure. The EFO connection
+ // (subscription) to Kinesis
+ // is closed, and reacquired once the queue of records has been processed.
+ return true;
+ }
+ return isConfiguredAsRecoverable(cause);
+ }
+
+ /**
+ * @param cause Throwable on which to base our exception search
+ * @return true if the input Throwable is configured as a Recoverable Error by the user
+ */
+ private boolean isConfiguredAsRecoverable(Throwable cause) {
+ if (this.recoverableErrorsConfig == null || this.recoverableErrorsConfig.hasNoConfig()) {
+ return false;
+ }
+ for (ExceptionConfig config : this.recoverableErrorsConfig.getExceptionConfigs()) {
+ Optional throwable =
+ ExceptionUtils.findThrowable(
+ cause, (Class) config.getExceptionClass());
+ return throwable.isPresent();
+ }
+ return false;
+ }
+
/**
* Once the subscription is open, records will be delivered to the {@link BlockingQueue}. Queue
* capacity is hardcoded to 1 record, the queue is used solely to separate consumption and
@@ -386,7 +423,7 @@ private RecordPublisherRunResult consumeAllRecordsFromKinesisShard(
// The subscription is complete, but the shard might not be, so we return incomplete
return INCOMPLETE;
} else {
- handleError(subscriptionEvent.getThrowable());
+ handleErrorAndRethrow(subscriptionEvent.getThrowable());
result = INCOMPLETE;
break;
}
diff --git a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index 5cf85799..1dcd4e0a 100644
--- a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -29,6 +29,7 @@
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.RecoverableErrorsConfig;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
@@ -322,6 +323,12 @@ public static void validateConsumerConfiguration(Properties config, List
config,
ConsumerConfigConstants.EFO_HTTP_CLIENT_MAX_CONCURRENCY,
"Invalid value given for EFO HTTP client max concurrency. Must be positive.");
+
+ validateRecoverableErrorConfig(config);
+ }
+
+ private static void validateRecoverableErrorConfig(Properties config) {
+ RecoverableErrorsConfig.createConfigFromPropertiesOrThrow(config);
}
/**
diff --git a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/config/RecoverableErrorsConfigTest.java b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/config/RecoverableErrorsConfigTest.java
new file mode 100644
index 00000000..c1cb1818
--- /dev/null
+++ b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/config/RecoverableErrorsConfigTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.config;
+
+import org.junit.Test;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link RecoverableErrorsConfig}. */
+public class RecoverableErrorsConfigTest {
+
+ @Test
+ public void testParseConfigFromProperties() {
+ Properties config = new Properties();
+ config.setProperty(
+ "flink.shard.consumer.error.recoverable[0].exception",
+ "java.net.UnknownHostException");
+ config.setProperty(
+ "flink.shard.consumer.error.recoverable[1].exception",
+ "java.lang.IllegalArgumentException");
+ Optional recoverableErrorsConfigOptional =
+ RecoverableErrorsConfig.createConfigFromPropertiesOrThrow(config);
+ assertTrue(recoverableErrorsConfigOptional.isPresent());
+ RecoverableErrorsConfig recoverableErrorsConfig = recoverableErrorsConfigOptional.get();
+ assertFalse(recoverableErrorsConfig.hasNoConfig());
+ assertThat(recoverableErrorsConfig.getExceptionConfigs().size()).isEqualTo(2);
+ assertThat(recoverableErrorsConfig.getExceptionConfigs().get(0).getExceptionClass())
+ .isEqualTo(java.net.UnknownHostException.class);
+ assertThat(recoverableErrorsConfig.getExceptionConfigs().get(1).getExceptionClass())
+ .isEqualTo(java.lang.IllegalArgumentException.class);
+ }
+
+ @Test
+ public void testReturnEmptyWhenConfigNotFound() {
+ Optional recoverableErrorsConfigOptional =
+ RecoverableErrorsConfig.createConfigFromPropertiesOrThrow(new Properties());
+ assertFalse(recoverableErrorsConfigOptional.isPresent());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testThrowsExceptionWhenProvidedClassIsNotThrowable() {
+ Properties config = new Properties();
+ config.setProperty(
+ "flink.shard.consumer.error.recoverable[0].exception", "java.util.Properties");
+ RecoverableErrorsConfig.createConfigFromPropertiesOrThrow(config);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testThrowsExceptionWhenProvidedClassCanNotBeFound() {
+ Properties config = new Properties();
+ config.setProperty(
+ "flink.shard.consumer.error.recoverable[0].exception", "made.up.TestClass");
+ RecoverableErrorsConfig.createConfigFromPropertiesOrThrow(config);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testThrowsExceptionWhenProvidedConfigSuffixIsNotValid() {
+ Properties config = new Properties();
+ config.setProperty(
+ "flink.shard.consumer.error.recoverable[0].exceptionnm", "java.lang.Exception");
+ RecoverableErrorsConfig.createConfigFromPropertiesOrThrow(config);
+ }
+}
diff --git a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
index 1201819f..923d0ae6 100644
--- a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
+++ b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
@@ -17,6 +17,8 @@
package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+import org.apache.flink.streaming.connectors.kinesis.config.ExceptionConfig;
+import org.apache.flink.streaming.connectors.kinesis.config.RecoverableErrorsConfig;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyAsyncV2Interface;
import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory;
@@ -30,6 +32,7 @@
import software.amazon.awssdk.services.kinesis.model.StartingPosition;
import java.time.Duration;
+import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -57,7 +60,86 @@ public void testRecoverableErrorThrownToConsumer() throws Exception {
"shardId",
errorKinesisV2,
DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
- () -> true);
+ () -> true,
+ null);
+
+ software.amazon.awssdk.services.kinesis.model.StartingPosition startingPosition =
+ software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().build();
+ subscriber.subscribeToShardAndConsumeRecords(startingPosition, event -> {});
+ }
+
+ @Test
+ public void testRecoverableErrorThrownToConsumerWhenUserConfiguresExceptionToBeRecoverable()
+ throws Exception {
+ thrown.expect(FanOutShardSubscriber.RecoverableFanOutSubscriberException.class);
+ thrown.expectMessage("java.lang.ArithmeticException");
+
+ SubscriptionErrorKinesisAsyncV2 errorKinesisV2 =
+ FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(
+ new ArithmeticException());
+
+ FanOutShardSubscriber subscriber =
+ new FanOutShardSubscriber(
+ "consumerArn",
+ "shardId",
+ errorKinesisV2,
+ DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
+ () -> true,
+ new RecoverableErrorsConfig(
+ Collections.singletonList(
+ new ExceptionConfig(ArithmeticException.class))));
+
+ software.amazon.awssdk.services.kinesis.model.StartingPosition startingPosition =
+ software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().build();
+ subscriber.subscribeToShardAndConsumeRecords(startingPosition, event -> {});
+ }
+
+ @Test
+ public void testRecoverableErrorThrownToConsumerWhenUserConfiguredExceptionIsWrapped()
+ throws Exception {
+ thrown.expect(FanOutShardSubscriber.RecoverableFanOutSubscriberException.class);
+ thrown.expectMessage("java.lang.ArithmeticException");
+
+ SubscriptionErrorKinesisAsyncV2 errorKinesisV2 =
+ FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(
+ new RuntimeException(new ArithmeticException()));
+
+ FanOutShardSubscriber subscriber =
+ new FanOutShardSubscriber(
+ "consumerArn",
+ "shardId",
+ errorKinesisV2,
+ DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
+ () -> true,
+ new RecoverableErrorsConfig(
+ Collections.singletonList(
+ new ExceptionConfig(ArithmeticException.class))));
+
+ software.amazon.awssdk.services.kinesis.model.StartingPosition startingPosition =
+ software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().build();
+ subscriber.subscribeToShardAndConsumeRecords(startingPosition, event -> {});
+ }
+
+ @Test
+ public void testRetryableErrorThrownToConsumerWhenUserConfiguredExceptionIsNotThrown()
+ throws Exception {
+ thrown.expect(FanOutShardSubscriber.RetryableFanOutSubscriberException.class);
+ thrown.expectMessage("java.lang.ArithmeticException");
+
+ SubscriptionErrorKinesisAsyncV2 errorKinesisV2 =
+ FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(
+ new RuntimeException(new ArithmeticException()));
+
+ FanOutShardSubscriber subscriber =
+ new FanOutShardSubscriber(
+ "consumerArn",
+ "shardId",
+ errorKinesisV2,
+ DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
+ () -> true,
+ new RecoverableErrorsConfig(
+ Collections.singletonList(
+ new ExceptionConfig(java.net.UnknownHostException.class))));
software.amazon.awssdk.services.kinesis.model.StartingPosition startingPosition =
software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().build();
@@ -79,7 +161,8 @@ public void testRetryableErrorThrownToConsumer() throws Exception {
"shardId",
errorKinesisV2,
DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
- () -> true);
+ () -> true,
+ null);
software.amazon.awssdk.services.kinesis.model.StartingPosition startingPosition =
software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().build();
@@ -100,7 +183,8 @@ public void testInterruptedErrorThrownToConsumer() throws Exception {
"shardId",
errorKinesisV2,
DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
- () -> true);
+ () -> true,
+ null);
software.amazon.awssdk.services.kinesis.model.StartingPosition startingPosition =
software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().build();
@@ -123,7 +207,8 @@ public void testMultipleErrorsThrownPassesFirstErrorToConsumer() throws Exceptio
"shardId",
errorKinesisV2,
DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
- () -> true);
+ () -> true,
+ null);
StartingPosition startingPosition = StartingPosition.builder().build();
subscriber.subscribeToShardAndConsumeRecords(startingPosition, event -> {});
@@ -140,7 +225,8 @@ public void testSubscriptionCompletion() throws Exception {
"shardId",
errorKinesisV2,
DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
- () -> true);
+ () -> true,
+ null);
StartingPosition startingPosition = StartingPosition.builder().build();
RecordPublisherRunResult result =
@@ -159,7 +245,7 @@ public void testTimeoutSubscribingToShard() throws Exception {
FanOutShardSubscriber subscriber =
new FanOutShardSubscriber(
- "consumerArn", "shardId", kinesis, Duration.ofMillis(1), () -> true);
+ "consumerArn", "shardId", kinesis, Duration.ofMillis(1), () -> true, null);
StartingPosition startingPosition = StartingPosition.builder().build();
subscriber.subscribeToShardAndConsumeRecords(startingPosition, event -> {});
@@ -180,7 +266,8 @@ public void testTimeoutEnqueuingEvent() throws Exception {
kinesis,
DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
Duration.ofMillis(100),
- () -> true);
+ () -> true,
+ null);
StartingPosition startingPosition = StartingPosition.builder().build();
subscriber.subscribeToShardAndConsumeRecords(
@@ -207,7 +294,8 @@ public void testCancelExitsGracefully() throws Exception {
"shardId",
unboundedStream,
DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT,
- run::get);
+ run::get,
+ null);
final AtomicInteger batches = new AtomicInteger(0);
diff --git a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
index 66e36b0a..dfe9cd9f 100644
--- a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
+++ b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
@@ -1021,4 +1021,28 @@ public void testGetV2ConsumerClientProperties() {
.containsKey("aws.kinesis.client.user-agent-prefix")
.hasSize(2);
}
+
+ @Test
+ public void testInvalidCustomRecoverableErrorConfiguration() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage(
+ "Provided recoverable exception class could not be found: com.NonExistentExceptionClass");
+
+ Properties testConfig = TestUtils.getStandardProperties();
+ testConfig.setProperty(
+ ConsumerConfigConstants.RECOVERABLE_EXCEPTIONS_PREFIX + "[0].exception",
+ "com.NonExistentExceptionClass");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
+
+ @Test
+ public void testValidCustomRecoverableErrorConfiguration() {
+ Properties testConfig = TestUtils.getStandardProperties();
+ testConfig.setProperty(
+ ConsumerConfigConstants.RECOVERABLE_EXCEPTIONS_PREFIX + "[0].exception",
+ "java.net.UnknownHostException");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
}