-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: BigQuery common and provisioner classes #138
Conversation
Added: - extensions/common/gcp/gcp-core/.../bigquery : common classes including BQ DataAddress, service client, and theBigQuery DataAddress Validator Extension to register DataAddress validators - extensions/control-plane/provision/provision-bigquery : BQ provisioner for generating access token for data plane sink, including the BigQuery Provision Extension to register the provisioner Limitations: - provisioner requires destination table to exist, doesn't create new table if it doesn't - data is committed to the destination table only if no error occurred during the data transmission - no transmission retry in case of error - results are fetched via pages (size is fixed, 4 rows), max. size for serialized page data is 16 KiB
@man8pr I added the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool stuff overall, comments inline
...ions/common/gcp/gcp-core/src/main/java/org/eclipse/edc/gcp/bigquery/BigQueryDataAddress.java
Outdated
Show resolved
Hide resolved
...ions/common/gcp/gcp-core/src/main/java/org/eclipse/edc/gcp/bigquery/BigQueryDataAddress.java
Outdated
Show resolved
Hide resolved
extensions/common/gcp/gcp-core/src/test/java/org/eclipse/edc/gcp/bigquery/Asserts.java
Outdated
Show resolved
Hide resolved
testRunSinkQuery(false); | ||
} | ||
|
||
private void testRunSourceQuery(String sinkType) throws InterruptedException, IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: method ordering should be such, that test methods go first, then protected methods then private methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated ordering
} | ||
|
||
@Test | ||
void testSinkValidatorShouldNotValidateIncompleteAddresses() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: i'd prefer having one Test method per scenario, i.e. every violation should have its own method. That greatly improves maintenance and readability
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Split the method into single violation methods
|
||
import static org.assertj.core.api.Assertions.assertThat; | ||
|
||
public class PartAssert extends AbstractAssert<PartAssert, Part> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doc. and does this have to be public
? seems like package-private would be fine too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
|
||
@Override | ||
public @Nullable | ||
ResourceDefinition generate(DataRequest dataRequest, Policy policy) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
heads up: the signature for this interface changed from DataRequest -> TransferProcess
. you should rebase onto latest main
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, updated interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I focused mainly on the service implementation, the big issue here is the thread safety and how this class is used.
...ions/common/gcp/gcp-core/src/main/java/org/eclipse/edc/gcp/bigquery/BigQueryServiceImpl.java
Outdated
Show resolved
Hide resolved
.../common/gcp/gcp-core/src/test/java/org/eclipse/edc/gcp/bigquery/BigQueryServiceImplTest.java
Outdated
Show resolved
Hide resolved
...ions/common/gcp/gcp-core/src/main/java/org/eclipse/edc/gcp/bigquery/BigQueryServiceImpl.java
Outdated
Show resolved
Hide resolved
...sion-bigquery/src/main/java/org/eclipse/edc/connector/provision/gcp/BigQueryProvisioner.java
Outdated
Show resolved
Hide resolved
...ions/common/gcp/gcp-core/src/main/java/org/eclipse/edc/gcp/bigquery/BigQueryServiceImpl.java
Outdated
Show resolved
Hide resolved
...ions/common/gcp/gcp-core/src/main/java/org/eclipse/edc/gcp/bigquery/BigQueryServiceImpl.java
Outdated
Show resolved
Hide resolved
...ions/common/gcp/gcp-core/src/main/java/org/eclipse/edc/gcp/bigquery/BigQueryServiceImpl.java
Outdated
Show resolved
Hide resolved
...ions/common/gcp/gcp-core/src/main/java/org/eclipse/edc/gcp/bigquery/BigQueryServiceImpl.java
Outdated
Show resolved
Hide resolved
...ions/common/gcp/gcp-core/src/main/java/org/eclipse/edc/gcp/bigquery/BigQueryServiceImpl.java
Outdated
Show resolved
Hide resolved
...ions/common/gcp/gcp-core/src/main/java/org/eclipse/edc/gcp/bigquery/BigQueryServiceImpl.java
Outdated
Show resolved
Hide resolved
...p/gcp-core/src/main/java/org/eclipse/edc/gcp/bigquery/service/BigQuerySourceServiceImpl.java
Fixed
Show fixed
Hide fixed
...p/gcp-core/src/main/java/org/eclipse/edc/gcp/bigquery/service/BigQuerySourceServiceImpl.java
Fixed
Show fixed
Hide fixed
…n error fix, ExecutorService for running the query - Split BigQueryService and BigQueryServiceImpl in 3 parts, provision, source and sink, to address the specific needs - BigQueryDataAddress not serializable - BigQuerySourceServiceImpl using ExecutorService to track threading and thread pools - BigQuerySourceServiceImplTest.java less mocks to test the runSourceQuery function - BigQueryConsumerResourceDefinitionGenerator and test updated to reflect the new API
56175a1
to
d9f4c9d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the thread-safety issues are still there.
As a general rule, I suggest to remove from this PR all the code that's only used by tests (because it's dead-code) and to put it into a separate pr that will show their usage within an integration/e2e test, that will make the review easier.
...cp-core/src/main/java/org/eclipse/edc/gcp/bigquery/service/BigQueryProvisionServiceImpl.java
Outdated
Show resolved
Hide resolved
...sion-bigquery/src/main/java/org/eclipse/edc/connector/provision/gcp/BigQueryProvisioner.java
Outdated
Show resolved
Hide resolved
…arameter in BQ ProvisionService tableExists method
…h BigQueryFactory - replaced BigQueryProviderService* classes with BigQueryFactory / BigQuery for simpler implementation - removed TypeManager member class from Provisioner (not used) - removed unused classes in the test implementation (will be bundled with the data plane classes)
.../common/gcp/gcp-core/src/main/java/org/eclipse/edc/gcp/bigquery/service/BigQueryFactory.java
Fixed
Show fixed
Hide fixed
...sion-bigquery/src/main/java/org/eclipse/edc/connector/provision/gcp/BigQueryProvisioner.java
Fixed
Show fixed
Hide fixed
...sion-bigquery/src/main/java/org/eclipse/edc/connector/provision/gcp/BigQueryProvisioner.java
Fixed
Show fixed
Hide fixed
var credentialProject = project; | ||
if (credentialProject == null) { | ||
credentialProject = gcpConfiguration.projectId(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this credentialProject
could be a field of the service and initiated in the build()
method, so there'll be no reason to initialize it twice (line 95 and line 131)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The class has been removed
this.monitor = monitor; | ||
} | ||
|
||
private void initCredentials() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method could return a GoogleCredentials
without setting fields (like it's done at line 111 and 116). Doing this these side effects will disappear
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The class has been removed
reset(gcpConfiguration); | ||
reset(monitor); | ||
reset(bigQuery); | ||
reset(table); | ||
reset(typeManager); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reset
is not needed here, please take a look at other tests around the EDC repositories to see how we mock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The class has been removed
.build(); | ||
|
||
// New seed for each test. | ||
random = new Random(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The class has been removed
.../common/gcp/gcp-core/src/main/java/org/eclipse/edc/gcp/bigquery/service/BigQueryFactory.java
Show resolved
Hide resolved
} | ||
|
||
if (bqFactory == null) { | ||
bqFactory = new BigQueryFactoryImpl(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the factory is instantiated here? I'd say that it should be done in the extension and passed as collaborator of the provisioner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
|
||
@Override | ||
public BigQuery createBigQuery(GcpConfiguration gcpConfiguration, Monitor monitor) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
both GcpConfiguration
and Monitor
should be passed in construction, there's no point in passing them here.
Plus, if the method is not depending on "flow data" like "projectId" (as it was in the previous implementation), there's no need at all to have this factory, because the BigQuery
instance build by the factory will always be the same, so it could be instantiated by the extension and injected directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, BigQuery instance is different if the transfer request includes the name of a service account to be impersonated for the access
// TODO update target with the generated table name. | ||
|
||
try { | ||
String serviceAccountEmail = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this variable is not used, please remove it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
when(table.exists()).thenReturn(true); | ||
when(bigQuery.getTable(TEST_TARGET.getTableId())).thenReturn(table); | ||
|
||
var response = bigQueryProvisioner.provision(resourceDefinition, policy).join(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please avoid using join
and use assertJ async capabilities (assertThat(future).succeedsWithin(...)
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done (thanks for the hint, the succeedsWithin is really good)
assertThat(response.succeeded()).isTrue(); | ||
assertThat(response.getContent().getResource()).isInstanceOfSatisfying(BigQueryProvisionedResource.class, resource -> { | ||
assertThat(resource.getId()).isEqualTo(RESOURCE_ID); | ||
assertThat(resource.getTransferProcessId()).isEqualTo(TRANSFER_ID); | ||
assertThat(resource.getProject()).isEqualTo(TEST_PROJECT); | ||
assertThat(resource.getDataset()).isEqualTo(TEST_DATASET); | ||
assertThat(resource.getTable()).isEqualTo(TEST_TABLE); | ||
assertThat(resource.hasToken()).isTrue(); | ||
assertThat(resource.getCustomerName()).isEqualTo(CUSTOMER_NAME); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all the token creation logic is not tested
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added tests for testing the creation of the token
.build().getService(); | ||
} | ||
|
||
private BigQuery createBigQuery() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused, please delete
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
.isEqualTo(true); | ||
} | ||
|
||
private class BigQueryFactoryTest implements BigQueryFactory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please use a mockito mock
instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
return new BigQueryTarget(project, dataset, table); | ||
} | ||
|
||
public static class Builder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this builder is not really needed as it's relying on a single newInstance method + build, just use the constructor instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
GcpAccessToken token = null; | ||
|
||
if (serviceAccount != null) { | ||
token = iamService.createAccessToken(serviceAccount); | ||
} else { | ||
serviceAccount = ADC_SERVICE_ACCOUNT; | ||
token = iamService.createDefaultAccessToken(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this piece of code is duplicated in the GcsProvisioner
, could be the iamService.createAccessToken
that detects if the service account passed is a default one and in that case it will return the default access token?
This will simplify things here and give the knowledge about accounts to the iam service (that's the one that should have them)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely, delegating the responsibility to IAM is good. Since I would change then both BQ and GCS provisioner, along with IAM, does it make sense to have this in a dedicated PR?
} | ||
} | ||
|
||
private void provisionSucceeds(String serviceAccountName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not a safe way to test, because you pretty much replicated the production code logic, so it's more error prone, please structure the tests in the way that they are understandable from the test method itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really good point, as exchanged, I make a first try, by clearly showing the setup using default credentials, and the setup for testing the use of specific service account
@Override | ||
protected void verify() { | ||
super.verify(); | ||
// TODO verify required fields. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please solve this TODO as it seems not too hard to do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
/** | ||
* Schema for BigQuery service. | ||
*/ | ||
public interface BigQueryService { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public interface BigQueryService { | |
public interface BigQueryServiceSchema { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
… BigQueryProvisioner with public ctor
… method - better grouping of test set up when using default credentials and when using specific service account
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking forward for the "service account refactoring", please solve DEPENDENCIES
and I will merge this
Sure, on the refactoring now, it shouldn't be a large change |
What this PR changes/adds
Added:
Why it does that
The PR adds first basic version of the BigQuery extension to exchange data to and from BigQuery tables.
Further notes
To be added in following PR:
Limitations for this version:
Linked Issue(s)
Closes #50