Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to sink interfaces #38

Merged
merged 5 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,3 @@ Please keep in mind that depending on network timings could differ.
Since currently data in connector are sending in JSON and then internally converted to PROTO it would make sense to send in PROTO. Probably it is a topic for a separate issue.

Data for tables with larger variety of columns are similar (2 main CPU consumers).


7 changes: 6 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<assertj.core.version>3.24.2</assertj.core.version>
<error_prone_core.version>2.20.0</error_prone_core.version>
<flink.version>1.16.2</flink.version>
<google-cloud-libraries.version>26.19.0</google-cloud-libraries.version>
<google-cloud-libraries.version>26.22.0</google-cloud-libraries.version>
<java.version>11</java.version>
<junit.version>5.10.0</junit.version>
<googleJavaFormat.version>1.14.0</googleJavaFormat.version>
Expand Down Expand Up @@ -88,6 +88,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,28 @@ public class BigQueryConfigOptions {
.enumType(DeliveryGuarantee.class)
.defaultValue(DeliveryGuarantee.AT_LEAST_ONCE)
.withDescription("Determines delivery guarantee");

public static final ConfigOption<Long> MAX_OUTSTANDING_ELEMENTS_COUNT =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a suggestion -- the wording "maxOutstandingElementsCount" is from the Google API, but other Flink sinks sometimes use maxInflightRequests Do you think it's worthwhile aligning to other connectors?

I have some doubts ... the semantics are pretty similar but maybe not entirely the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's from google api since currently it is for google api

we could think about renaming once we switch to async sink Flink connector interfaces

ConfigOptions.key("max-outstanding-elements-count")
.longType()
.defaultValue(10000L)
.withDescription("Determines maximum number of concurrent requests to BigQuery");

public static final ConfigOption<Long> MAX_OUTSTANDING_REQUEST_BYTES =
ConfigOptions.key("max-outstanding-request-bytes")
.longType()
.defaultValue(100 * 1024 * 1024L)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100Mb is a default value from BQ storage api

.withDescription("Determines maximum sum of request sizes in bytes to BigQuery");

public static final ConfigOption<Integer> RETRY_COUNT =
ConfigOptions.key("retry-count")
.intType()
.defaultValue(10)
.withDescription("Determines amount of retry attempts");

public static final ConfigOption<Integer> RECREATE_COUNT =
ConfigOptions.key("recreate-count")
.intType()
.defaultValue(10)
.withDescription("Determines amount of recreate attempts");
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
package io.aiven.flink.connectors.bigquery.sink;

import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.CREATE_TABLE_IF_NOT_PRESENT;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.DATASET;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.DELIVERY_GUARANTEE;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.MAX_OUTSTANDING_ELEMENTS_COUNT;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.MAX_OUTSTANDING_REQUEST_BYTES;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.PROJECT_ID;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.RECREATE_COUNT;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.RETRY_COUNT;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.SERVICE_ACCOUNT;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.TABLE;

import com.google.auth.Credentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.bigquery.storage.v1.TableName;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.Serializable;
import org.apache.flink.configuration.ReadableConfig;

public class BigQueryConnectionOptions implements Serializable {

Expand All @@ -12,6 +27,10 @@ public class BigQueryConnectionOptions implements Serializable {
private final String project;
private final String dataset;
private final String table;
private final long maxOutstandingElementsCount;
private final long maxOutstandingRequestBytes;
private final int retryCount;
private final int recreateCount;

private final boolean createIfNotExists;
private final DeliveryGuarantee deliveryGuarantee;
Expand All @@ -22,13 +41,41 @@ public BigQueryConnectionOptions(
String table,
boolean createIfNotExists,
DeliveryGuarantee deliveryGuarantee,
long maxOutstandingElementsCount,
long maxOutstandingRequestBytes,
int retryCount,
int recreateCount,
Credentials credentials) {
this.project = project;
this.dataset = dataset;
this.table = table;
this.createIfNotExists = createIfNotExists;
this.deliveryGuarantee = deliveryGuarantee;
this.credentials = credentials;
this.maxOutstandingElementsCount = maxOutstandingElementsCount;
this.maxOutstandingRequestBytes = maxOutstandingRequestBytes;
this.retryCount = retryCount;
this.recreateCount = recreateCount;
}

public static BigQueryConnectionOptions fromReadableConfig(ReadableConfig config) {
final Credentials credentials;
try (FileInputStream fis = new FileInputStream(config.get(SERVICE_ACCOUNT))) {
credentials = ServiceAccountCredentials.fromStream(fis);
} catch (IOException e) {
throw new RuntimeException(e);
}
return new BigQueryConnectionOptions(
config.get(PROJECT_ID),
config.get(DATASET),
config.get(TABLE),
config.get(CREATE_TABLE_IF_NOT_PRESENT),
config.get(DELIVERY_GUARANTEE),
config.get(MAX_OUTSTANDING_ELEMENTS_COUNT),
config.get(MAX_OUTSTANDING_REQUEST_BYTES),
config.get(RETRY_COUNT),
config.get(RECREATE_COUNT),
credentials);
}

public TableName getTableName() {
Expand All @@ -46,4 +93,20 @@ public boolean isCreateIfNotExists() {
public DeliveryGuarantee getDeliveryGuarantee() {
return deliveryGuarantee;
}

public long getMaxOutstandingElementsCount() {
return maxOutstandingElementsCount;
}

public long getMaxOutstandingRequestBytes() {
return maxOutstandingRequestBytes;
}

public int getRetryCount() {
return retryCount;
}

public int getRecreateCount() {
return recreateCount;
}
}
Loading
Loading