Skip to content

Commit

Permalink
Add Flow controll config options
Browse files Browse the repository at this point in the history
  • Loading branch information
snuyanzin committed Aug 17, 2023
1 parent d6407a0 commit 716b59b
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.connector.base.DeliveryGuarantee;

public class BigQueryConfigOptions {
public static final ConfigOption<String> SERVICE_ACCOUNT =
Expand Down Expand Up @@ -36,4 +35,16 @@ public class BigQueryConfigOptions {
.enumType(DeliveryGuarantee.class)
.defaultValue(DeliveryGuarantee.AT_LEAST_ONCE)
.withDescription("Determines delivery guarantee");

public static final ConfigOption<Long> MAX_OUTSTANDING_ELEMENTS_COUNT =
ConfigOptions.key("max-outstanding-elements-count")
.longType()
.defaultValue(10000L)
.withDescription("Determines maximum number of concurrent requests to BigQuery");

public static final ConfigOption<Long> MAX_OUTSTANDING_REQUEST_BYTES =
ConfigOptions.key("max-outstanding-request-bytes")
.longType()
.defaultValue(100 * 1024 * 1024L)
.withDescription("Determines maximum sum of request sizes in bytes to BigQuery");
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.CREATE_TABLE_IF_NOT_PRESENT;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.DATASET;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.DELIVERY_GUARANTEE;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.MAX_OUTSTANDING_ELEMENTS_COUNT;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.MAX_OUTSTANDING_REQUEST_BYTES;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.PROJECT_ID;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.SERVICE_ACCOUNT;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.TABLE;
Expand All @@ -14,7 +16,6 @@
import java.io.IOException;
import java.io.Serializable;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.base.DeliveryGuarantee;

public class BigQueryConnectionOptions implements Serializable {

Expand All @@ -24,6 +25,8 @@ public class BigQueryConnectionOptions implements Serializable {
private final String project;
private final String dataset;
private final String table;
private final long maxOutstandingElementsCount;
private final long maxOutstandingRequestBytes;

private final boolean createIfNotExists;
private final DeliveryGuarantee deliveryGuarantee;
Expand All @@ -34,13 +37,17 @@ public BigQueryConnectionOptions(
String table,
boolean createIfNotExists,
DeliveryGuarantee deliveryGuarantee,
long maxOutstandingElementsCount,
long maxOutstandingRequestBytes,
Credentials credentials) {
this.project = project;
this.dataset = dataset;
this.table = table;
this.createIfNotExists = createIfNotExists;
this.deliveryGuarantee = deliveryGuarantee;
this.credentials = credentials;
this.maxOutstandingElementsCount = maxOutstandingElementsCount;
this.maxOutstandingRequestBytes = maxOutstandingRequestBytes;
}

public static BigQueryConnectionOptions fromReadableConfig(ReadableConfig config) {
Expand All @@ -56,6 +63,8 @@ public static BigQueryConnectionOptions fromReadableConfig(ReadableConfig config
config.get(TABLE),
config.get(CREATE_TABLE_IF_NOT_PRESENT),
config.get(DELIVERY_GUARANTEE),
config.get(MAX_OUTSTANDING_ELEMENTS_COUNT),
config.get(MAX_OUTSTANDING_REQUEST_BYTES),
credentials);
}

Expand All @@ -74,4 +83,12 @@ public boolean isCreateIfNotExists() {
public DeliveryGuarantee getDeliveryGuarantee() {
return deliveryGuarantee;
}

public long getMaxOutstandingElementsCount() {
return maxOutstandingElementsCount;
}

public long getMaxOutstandingRequestBytes() {
return maxOutstandingRequestBytes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Preconditions;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.aiven.flink.connectors.bigquery.sink;

import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
Expand Down Expand Up @@ -36,7 +37,12 @@ protected JsonStreamWriter getStreamWriter(
WriteStream writeStream = client.createWriteStream(createWriteStreamRequest);

JsonStreamWriter.Builder builder =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client);
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client)
.setFlowControlSettings(
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(options.getMaxOutstandingElementsCount())
.setMaxOutstandingRequestBytes(options.getMaxOutstandingRequestBytes())
.build());
return builder
.setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(100)))
.setChannelProvider(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ protected JsonStreamWriter getStreamWriter(
JsonStreamWriter.Builder builder =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client)
.setFlowControlSettings(
FlowControlSettings.newBuilder().setMaxOutstandingElementCount(10000L).build());
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(options.getMaxOutstandingElementsCount())
.setMaxOutstandingRequestBytes(options.getMaxOutstandingRequestBytes())
.build());
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.CREATE_TABLE_IF_NOT_PRESENT;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.DATASET;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.DELIVERY_GUARANTEE;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.MAX_OUTSTANDING_ELEMENTS_COUNT;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.MAX_OUTSTANDING_REQUEST_BYTES;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.PROJECT_ID;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.SERVICE_ACCOUNT;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.TABLE;
Expand All @@ -22,7 +24,9 @@ public class BigQueryTableSinkFactory implements DynamicTableSinkFactory {
DATASET,
TABLE,
CREATE_TABLE_IF_NOT_PRESENT,
DELIVERY_GUARANTEE);
DELIVERY_GUARANTEE,
MAX_OUTSTANDING_ELEMENTS_COUNT,
MAX_OUTSTANDING_REQUEST_BYTES);

@Override
public DynamicTableSink createDynamicTableSink(Context context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
Expand All @@ -34,7 +35,6 @@
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
Expand Down Expand Up @@ -384,6 +384,11 @@ protected void append(AppendContext appendContext)
&& recreateCount.getAndIncrement() < MAX_RECREATE_COUNT) {
streamWriter =
JsonStreamWriter.newBuilder(streamWriter.getStreamName(), BigQueryWriteClient.create())
.setFlowControlSettings(
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(options.getMaxOutstandingElementsCount())
.setMaxOutstandingRequestBytes(options.getMaxOutstandingRequestBytes())
.build())
.build();
this.error = null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.aiven.flink.connectors.bigquery.sink;

public enum DeliveryGuarantee {
AT_LEAST_ONCE,
EXACTLY_ONCE;
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
Expand Down Expand Up @@ -53,6 +52,8 @@ void tableCreationTest(String tableName, String[] fieldNames, DataType[] fieldTy
tableName + "-" + dg.name(),
true,
dg,
1000L,
100 * 1024 * 1024,
CREDENTIALS);
var table = BigQueryDynamicTableSink.ensureTableExists(fieldNames, fieldTypes, options);
table.delete();
Expand Down Expand Up @@ -145,6 +146,8 @@ void testValidTableDefinitions(
bigQueryTableName,
true,
DeliveryGuarantee.EXACTLY_ONCE,
1000L,
100 * 1024 * 1024,
CREDENTIALS);
var table = BigQueryDynamicTableSink.ensureTableExists(bqColumnNames, bqColumnTypes, options);
List<Column> columns = new ArrayList<>();
Expand Down Expand Up @@ -298,6 +301,8 @@ void testInvalidTableDefinitions(
bigQueryTableName,
true,
DeliveryGuarantee.EXACTLY_ONCE,
1000L,
100 * 1024 * 1024,
CREDENTIALS);
var table = BigQueryDynamicTableSink.ensureTableExists(bqColumnNames, bqColumnTypes, options);
List<Column> columns = new ArrayList<>();
Expand Down

0 comments on commit 716b59b

Please sign in to comment.