Skip to content

Commit

Permalink
Extract retry as config option
Browse files Browse the repository at this point in the history
  • Loading branch information
snuyanzin committed Aug 17, 2023
1 parent 716b59b commit 81f2d08
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,16 @@ public class BigQueryConfigOptions {
.longType()
.defaultValue(100 * 1024 * 1024L)
.withDescription("Determines maximum sum of request sizes in bytes to BigQuery");

public static final ConfigOption<Integer> RETRY_COUNT =
ConfigOptions.key("retry-count")
.intType()
.defaultValue(10)
.withDescription("Determines amount of retry attempts");

public static final ConfigOption<Integer> RECREATE_COUNT =
ConfigOptions.key("recreate-count")
.intType()
.defaultValue(10)
.withDescription("Determines amount of recreate attempts");
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.MAX_OUTSTANDING_ELEMENTS_COUNT;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.MAX_OUTSTANDING_REQUEST_BYTES;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.PROJECT_ID;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.RECREATE_COUNT;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.RETRY_COUNT;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.SERVICE_ACCOUNT;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.TABLE;

Expand All @@ -27,6 +29,8 @@ public class BigQueryConnectionOptions implements Serializable {
private final String table;
private final long maxOutstandingElementsCount;
private final long maxOutstandingRequestBytes;
private final int retryCount;
private final int recreateCount;

private final boolean createIfNotExists;
private final DeliveryGuarantee deliveryGuarantee;
Expand All @@ -39,6 +43,8 @@ public BigQueryConnectionOptions(
DeliveryGuarantee deliveryGuarantee,
long maxOutstandingElementsCount,
long maxOutstandingRequestBytes,
int retryCount,
int recreateCount,
Credentials credentials) {
this.project = project;
this.dataset = dataset;
Expand All @@ -48,6 +54,8 @@ public BigQueryConnectionOptions(
this.credentials = credentials;
this.maxOutstandingElementsCount = maxOutstandingElementsCount;
this.maxOutstandingRequestBytes = maxOutstandingRequestBytes;
this.retryCount = retryCount;
this.recreateCount = recreateCount;
}

public static BigQueryConnectionOptions fromReadableConfig(ReadableConfig config) {
Expand All @@ -65,6 +73,8 @@ public static BigQueryConnectionOptions fromReadableConfig(ReadableConfig config
config.get(DELIVERY_GUARANTEE),
config.get(MAX_OUTSTANDING_ELEMENTS_COUNT),
config.get(MAX_OUTSTANDING_REQUEST_BYTES),
config.get(RETRY_COUNT),
config.get(RECREATE_COUNT),
credentials);
}

Expand All @@ -91,4 +101,12 @@ public long getMaxOutstandingElementsCount() {
public long getMaxOutstandingRequestBytes() {
return maxOutstandingRequestBytes;
}

public int getRetryCount() {
return retryCount;
}

public int getRecreateCount() {
return recreateCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public abstract class BigQueryWriter implements SinkWriter<RowData> {
Status.Code.DEADLINE_EXCEEDED,
Status.Code.UNAVAILABLE);
protected static final int MAX_RECREATE_COUNT = 3;
private static final int MAX_RETRY_COUNT = 3;
protected final String[] fieldNames;

protected final LogicalType[] fieldTypes;
Expand Down Expand Up @@ -381,7 +380,7 @@ protected void append(AppendContext appendContext)
synchronized (this.lock) {
if (!streamWriter.isUserClosed()
&& streamWriter.isClosed()
&& recreateCount.getAndIncrement() < MAX_RECREATE_COUNT) {
&& recreateCount.getAndIncrement() < options.getRecreateCount()) {
streamWriter =
JsonStreamWriter.newBuilder(streamWriter.getStreamName(), BigQueryWriteClient.create())
.setFlowControlSettings(
Expand Down Expand Up @@ -438,7 +437,7 @@ public void onFailure(Throwable throwable) {
// If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information,
// see: https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
Status status = Status.fromThrowable(throwable);
if (appendContext.retryCount < MAX_RETRY_COUNT
if (appendContext.retryCount < this.parent.options.getRetryCount()
&& RETRIABLE_ERROR_CODES.contains(status.getCode())) {
appendContext.retryCount++;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ void tableCreationTest(String tableName, String[] fieldNames, DataType[] fieldTy
dg,
1000L,
100 * 1024 * 1024,
10,
10,
CREDENTIALS);
var table = BigQueryDynamicTableSink.ensureTableExists(fieldNames, fieldTypes, options);
table.delete();
Expand Down Expand Up @@ -148,6 +150,8 @@ void testValidTableDefinitions(
DeliveryGuarantee.EXACTLY_ONCE,
1000L,
100 * 1024 * 1024,
10,
10,
CREDENTIALS);
var table = BigQueryDynamicTableSink.ensureTableExists(bqColumnNames, bqColumnTypes, options);
List<Column> columns = new ArrayList<>();
Expand Down Expand Up @@ -303,6 +307,8 @@ void testInvalidTableDefinitions(
DeliveryGuarantee.EXACTLY_ONCE,
1000L,
100 * 1024 * 1024,
10,
10,
CREDENTIALS);
var table = BigQueryDynamicTableSink.ensureTableExists(bqColumnNames, bqColumnTypes, options);
List<Column> columns = new ArrayList<>();
Expand Down

0 comments on commit 81f2d08

Please sign in to comment.