Skip to content

Commit

Permalink
[FLINK-33260][Connectors/Kinesis] Allow user to provide a list of rec…
Browse files Browse the repository at this point in the history
…overable exceptions (apache#110)
  • Loading branch information
iemre authored Nov 7, 2023
1 parent 0243540 commit 4c2fdd9
Show file tree
Hide file tree
Showing 12 changed files with 439 additions and 28 deletions.
8 changes: 8 additions & 0 deletions docs/content.zh/docs/connectors/table/kinesis.md
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,14 @@ Connector Options
<td>Long</td>
<td>The interval (in milliseconds) after which to consider a shard idle for purposes of watermark generation. A positive value will allow the watermark to progress even when some shards don't receive new records.</td>
</tr>
<tr>
<td><h5>shard.consumer.error.recoverable[0].exception</h5></td>
<td>optional</td>
<td>no</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>User-specified Exception to retry indefinitely. Example value: `java.net.UnknownHostException`. This configuration is a zero-based array. As such, the specified exceptions must start with index 0. Specified exceptions must be valid Throwables in classpath, or connector will fail to initialize and fail fast.</td>
</tr>
<tr>
<td><h5>scan.watermark.sync.interval</h5></td>
<td>optional</td>
Expand Down
8 changes: 8 additions & 0 deletions docs/content/docs/connectors/table/kinesis.md
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,14 @@ Connector Options
<td>Long</td>
<td>The interval (in milliseconds) after which to consider a shard idle for purposes of watermark generation. A positive value will allow the watermark to progress even when some shards don't receive new records.</td>
</tr>
<tr>
<td><h5>shard.consumer.error.recoverable[0].exception</h5></td>
<td>optional</td>
<td>no</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>User-specified Exception to retry indefinitely. Example value: `java.net.UnknownHostException`. This configuration is a zero-based array. As such, the specified exceptions must start with index 0. Specified exceptions must be valid Throwables in classpath, or connector will fail to initialize and fail fast.</td>
</tr>
<tr>
<td><h5>scan.watermark.sync.interval</h5></td>
<td>optional</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ public enum EFORegistrationType {
public static final String REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT =
"flink.stream.registerstreamconsumer.backoff.expconst";

/**
* The user-provided list of exceptions to recover from. These exceptions are retried
* indefinitely.
*/
public static final String RECOVERABLE_EXCEPTIONS_PREFIX =
"flink.shard.consumer.error.recoverable";

/** The maximum number of deregisterStream attempts if we get a recoverable exception. */
public static final String DEREGISTER_STREAM_RETRIES =
"flink.stream.deregisterstreamconsumer.maxretries";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.flink.streaming.connectors.kinesis.config;

/**
* Helper class to hold information/behaviour about Exceptions. Used for configuring recoverable
* exceptions.
*/
public class ExceptionConfig {
private final Class<?> exceptionClass;

public ExceptionConfig(Class<?> exClass) {
this.exceptionClass = exClass;
}

public Class<?> getExceptionClass() {
return exceptionClass;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.kinesis.config;

import org.apache.commons.collections.CollectionUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;

/**
* Hosts the recoverable exception configuration. Recoverable exceptions are retried indefinitely.
*/
public class RecoverableErrorsConfig {
public static final String INVALID_CONFIG_MESSAGE =
"Invalid config for recoverable consumer exceptions. "
+ "Valid config example: "
+ "`flink.shard.consumer.error.recoverable[0].exception=net.java.UnknownHostException`. "
+ "Your config array must use zero-based indexing as shown in the example.";

/**
* Parses the array of recoverable error configs.
*
* @param config connector configuration
* @return an Optional of RecoverableErrorsConfig
*/
public static Optional<RecoverableErrorsConfig> createConfigFromPropertiesOrThrow(
final Properties config) {
List<ExceptionConfig> exConfs = new ArrayList<>();
int idx = 0;
String exceptionConfigKey =
String.format(
"%s[%d].exception",
ConsumerConfigConstants.RECOVERABLE_EXCEPTIONS_PREFIX, idx);
while (config.containsKey(exceptionConfigKey)) {
String exPath = config.getProperty(exceptionConfigKey);
try {
Class<?> aClass = Class.forName(exPath);
if (!Throwable.class.isAssignableFrom(aClass)) {
throw new ClassCastException();
}
exConfs.add(new ExceptionConfig(aClass));
} catch (ClassCastException e) {
throw new IllegalArgumentException(
"Provided recoverable exception class is not a Throwable: " + exPath);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException(
"Provided recoverable exception class could not be found: " + exPath);
}
exceptionConfigKey =
String.format(
"%s[%d].exception",
ConsumerConfigConstants.RECOVERABLE_EXCEPTIONS_PREFIX, ++idx);
}
if (idx > 0) {
// We processed configs successfully
return Optional.of(new RecoverableErrorsConfig(exConfs));
}

// Check if user provided wrong config suffix, so they fail faster
for (Object key : config.keySet()) {
if (((String) key).startsWith(ConsumerConfigConstants.RECOVERABLE_EXCEPTIONS_PREFIX)) {
throw new IllegalArgumentException(RecoverableErrorsConfig.INVALID_CONFIG_MESSAGE);
}
}

return Optional.empty();
}

private final List<ExceptionConfig> exceptionConfigs;

public RecoverableErrorsConfig(List<ExceptionConfig> exceptionConfigs) {
this.exceptionConfigs = exceptionConfigs;
}

public boolean hasNoConfig() {
return CollectionUtils.isEmpty(exceptionConfigs);
}

public List<ExceptionConfig> getExceptionConfigs() {
return exceptionConfigs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ private RecordPublisherRunResult runWithBackoff(
subscribedShard.getShard().getShardId(),
kinesisProxy,
configuration.getSubscribeToShardTimeout(),
runningSupplier);
runningSupplier,
configuration.getRecoverableErrorsConfig());
RecordPublisherRunResult result;

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
import org.apache.flink.streaming.connectors.kinesis.config.RecoverableErrorsConfig;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -118,6 +119,9 @@ public class FanOutRecordPublisherConfiguration {
/** Exponential backoff power constant for the describe stream consumer operation. */
private final double describeStreamConsumerExpConstant;

/** Recoverable error configuration. These are retried indefinitely. */
private final RecoverableErrorsConfig recoverableErrorsConfig;

/**
* Creates a FanOutRecordPublisherConfiguration.
*
Expand Down Expand Up @@ -318,6 +322,8 @@ public FanOutRecordPublisherConfiguration(
.orElse(
ConsumerConfigConstants
.DEFAULT_DESCRIBE_STREAM_CONSUMER_BACKOFF_EXPONENTIAL_CONSTANT);

this.recoverableErrorsConfig = this.parseRecoverableErrorConfig(configProps);
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -472,4 +478,12 @@ public Optional<String> getConsumerName() {
public Optional<String> getStreamConsumerArn(String stream) {
return Optional.ofNullable(streamConsumerArns.get(stream));
}

public RecoverableErrorsConfig parseRecoverableErrorConfig(final Properties config) {
return RecoverableErrorsConfig.createConfigFromPropertiesOrThrow(config).orElse(null);
}

public RecoverableErrorsConfig getRecoverableErrorsConfig() {
return recoverableErrorsConfig;
}
}
Loading

0 comments on commit 4c2fdd9

Please sign in to comment.