diff --git a/src/main/java/io/aiven/flink/connectors/bigquery/sink/BigQueryConfigOptions.java b/src/main/java/io/aiven/flink/connectors/bigquery/sink/BigQueryConfigOptions.java index f64ab4c..ed7fefe 100644 --- a/src/main/java/io/aiven/flink/connectors/bigquery/sink/BigQueryConfigOptions.java +++ b/src/main/java/io/aiven/flink/connectors/bigquery/sink/BigQueryConfigOptions.java @@ -47,4 +47,16 @@ public class BigQueryConfigOptions { .longType() .defaultValue(100 * 1024 * 1024L) .withDescription("Determines maximum sum of request sizes in bytes to BigQuery"); + + public static final ConfigOption RETRY_COUNT = + ConfigOptions.key("retry-count") + .intType() + .defaultValue(10) + .withDescription("Determines amount of retry attempts"); + + public static final ConfigOption RECREATE_COUNT = + ConfigOptions.key("recreate-count") + .intType() + .defaultValue(10) + .withDescription("Determines amount of recreate attempts"); } diff --git a/src/main/java/io/aiven/flink/connectors/bigquery/sink/BigQueryConnectionOptions.java b/src/main/java/io/aiven/flink/connectors/bigquery/sink/BigQueryConnectionOptions.java index 614d48d..cf8b40e 100644 --- a/src/main/java/io/aiven/flink/connectors/bigquery/sink/BigQueryConnectionOptions.java +++ b/src/main/java/io/aiven/flink/connectors/bigquery/sink/BigQueryConnectionOptions.java @@ -6,6 +6,8 @@ import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.MAX_OUTSTANDING_ELEMENTS_COUNT; import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.MAX_OUTSTANDING_REQUEST_BYTES; import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.PROJECT_ID; +import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.RECREATE_COUNT; +import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.RETRY_COUNT; import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.SERVICE_ACCOUNT; import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.TABLE; @@ -27,6 +29,8 @@ public class BigQueryConnectionOptions implements Serializable { private final String table; private final long maxOutstandingElementsCount; private final long maxOutstandingRequestBytes; + private final int retryCount; + private final int recreateCount; private final boolean createIfNotExists; private final DeliveryGuarantee deliveryGuarantee; @@ -39,6 +43,8 @@ public BigQueryConnectionOptions( DeliveryGuarantee deliveryGuarantee, long maxOutstandingElementsCount, long maxOutstandingRequestBytes, + int retryCount, + int recreateCount, Credentials credentials) { this.project = project; this.dataset = dataset; @@ -48,6 +54,8 @@ public BigQueryConnectionOptions( this.credentials = credentials; this.maxOutstandingElementsCount = maxOutstandingElementsCount; this.maxOutstandingRequestBytes = maxOutstandingRequestBytes; + this.retryCount = retryCount; + this.recreateCount = recreateCount; } public static BigQueryConnectionOptions fromReadableConfig(ReadableConfig config) { @@ -65,6 +73,8 @@ public static BigQueryConnectionOptions fromReadableConfig(ReadableConfig config config.get(DELIVERY_GUARANTEE), config.get(MAX_OUTSTANDING_ELEMENTS_COUNT), config.get(MAX_OUTSTANDING_REQUEST_BYTES), + config.get(RETRY_COUNT), + config.get(RECREATE_COUNT), credentials); } @@ -91,4 +101,12 @@ public long getMaxOutstandingElementsCount() { public long getMaxOutstandingRequestBytes() { return maxOutstandingRequestBytes; } + + public int getRetryCount() { + return retryCount; + } + + public int getRecreateCount() { + return recreateCount; + } } diff --git a/src/main/java/io/aiven/flink/connectors/bigquery/sink/BigQueryWriter.java b/src/main/java/io/aiven/flink/connectors/bigquery/sink/BigQueryWriter.java index e5b3a64..668cf03 100644 --- a/src/main/java/io/aiven/flink/connectors/bigquery/sink/BigQueryWriter.java +++ b/src/main/java/io/aiven/flink/connectors/bigquery/sink/BigQueryWriter.java @@ -61,7 +61,6 @@ public abstract class BigQueryWriter implements SinkWriter { Status.Code.DEADLINE_EXCEEDED, Status.Code.UNAVAILABLE); protected static final int MAX_RECREATE_COUNT = 3; - private static final int MAX_RETRY_COUNT = 3; protected final String[] fieldNames; protected final LogicalType[] fieldTypes; @@ -381,7 +380,7 @@ protected void append(AppendContext appendContext) synchronized (this.lock) { if (!streamWriter.isUserClosed() && streamWriter.isClosed() - && recreateCount.getAndIncrement() < MAX_RECREATE_COUNT) { + && recreateCount.getAndIncrement() < options.getRecreateCount()) { streamWriter = JsonStreamWriter.newBuilder(streamWriter.getStreamName(), BigQueryWriteClient.create()) .setFlowControlSettings( @@ -438,7 +437,7 @@ public void onFailure(Throwable throwable) { // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, // see: https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html Status status = Status.fromThrowable(throwable); - if (appendContext.retryCount < MAX_RETRY_COUNT + if (appendContext.retryCount < this.parent.options.getRetryCount() && RETRIABLE_ERROR_CODES.contains(status.getCode())) { appendContext.retryCount++; try { diff --git a/src/test/java/io/aiven/flink/connectors/bigquery/sink/BigQuerySinkTest.java b/src/test/java/io/aiven/flink/connectors/bigquery/sink/BigQuerySinkTest.java index 9bfd480..5f63db0 100644 --- a/src/test/java/io/aiven/flink/connectors/bigquery/sink/BigQuerySinkTest.java +++ b/src/test/java/io/aiven/flink/connectors/bigquery/sink/BigQuerySinkTest.java @@ -54,6 +54,8 @@ void tableCreationTest(String tableName, String[] fieldNames, DataType[] fieldTy dg, 1000L, 100 * 1024 * 1024, + 10, + 10, CREDENTIALS); var table = BigQueryDynamicTableSink.ensureTableExists(fieldNames, fieldTypes, options); table.delete(); @@ -148,6 +150,8 @@ void testValidTableDefinitions( DeliveryGuarantee.EXACTLY_ONCE, 1000L, 100 * 1024 * 1024, + 10, + 10, CREDENTIALS); var table = BigQueryDynamicTableSink.ensureTableExists(bqColumnNames, bqColumnTypes, options); List columns = new ArrayList<>(); @@ -303,6 +307,8 @@ void testInvalidTableDefinitions( DeliveryGuarantee.EXACTLY_ONCE, 1000L, 100 * 1024 * 1024, + 10, + 10, CREDENTIALS); var table = BigQueryDynamicTableSink.ensureTableExists(bqColumnNames, bqColumnTypes, options); List columns = new ArrayList<>();