Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use DH S3Instructions to build Iceberg AWS clients #6113

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 29 additions & 7 deletions extensions/iceberg/s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,19 @@ dependencies {
implementation libs.awssdk.s3
implementation libs.awssdk.crt.client
runtimeOnly libs.awssdk.sts
runtimeOnly libs.awssdk.glue
implementation libs.awssdk.glue

// We don't want to explicitly pull in dependencies for dynamodb (org.apache.iceberg.aws.dynamodb.DynamoDbCatalog),
// but we need to be able to compile against it to implement AwsClientFactory
compileOnly libs.awssdk.dynamodb

// We don't want to explicitly pull in dependencies for KMS (there doesn't seem to be anything in Iceberg that
// actually calls it?), but we need to be able to compile against it to implement AwsClientFactory
compileOnly libs.awssdk.kms

compileOnly libs.autoservice
annotationProcessor libs.autoservice.compiler

testImplementation libs.junit4
testImplementation project(':engine-test-utils')

testImplementation libs.testcontainers
Expand All @@ -43,12 +50,27 @@ dependencies {
testRuntimeOnly project(':test-configs')
testRuntimeOnly project(':log-to-slf4j')
testRuntimeOnly libs.slf4j.simple

testImplementation platform(libs.junit.bom)
testImplementation libs.junit.jupiter
testRuntimeOnly libs.junit.jupiter.engine
testRuntimeOnly libs.junit.platform.launcher
}

TestTools.addEngineOutOfBandTest(project)
test {
useJUnitPlatform {
excludeTags("testcontainers")
}
}

tasks.register('testOutOfBand', Test) {
useJUnitPlatform {
includeTags("testcontainers")
}

testOutOfBand.dependsOn Docker.registryTask(project, 'localstack')
testOutOfBand.systemProperty 'testcontainers.localstack.image', Docker.localImageName('localstack')
dependsOn Docker.registryTask(project, 'localstack')
systemProperty 'testcontainers.localstack.image', Docker.localImageName('localstack')

testOutOfBand.dependsOn Docker.registryTask(project, 'minio')
testOutOfBand.systemProperty 'testcontainers.minio.image', Docker.localImageName('minio')
dependsOn Docker.registryTask(project, 'minio')
systemProperty 'testcontainers.minio.image', Docker.localImageName('minio')
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.s3;

import org.apache.iceberg.aws.AwsClientFactory;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.aws.s3.S3FileIOAwsClientFactory;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.s3.S3Client;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
* An {@link AwsClientFactory} and {@link S3FileIOAwsClientFactory} implementation that assumes ownership of AWS client
* creation as configured via {@link S3Instructions}.
*/
public final class DeephavenAwsClientFactory implements AwsClientFactory, S3FileIOAwsClientFactory {

private static final String UUID_KEY = DeephavenAwsClientFactory.class.getName() + ".__uuid";

/**
* Adds {@link DeephavenAwsClientFactory} to {@code propertiesOut} with the keys
* {@value AwsProperties#CLIENT_FACTORY} and {@value S3FileIOProperties#CLIENT_FACTORY}; it is an error if either of
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
* these properties is already set. After the corresponding {@link org.apache.iceberg.catalog.Catalog} is no longer
* in use, the caller should invoke the returned {@link Runnable} to clean up.
*
* @param instructions the instructions
* @param propertiesOut the properties
* @return the runnable to be invoked after initialization
*/
public static Runnable addToProperties(S3Instructions instructions, Map<String, String> propertiesOut) {
Objects.requireNonNull(instructions);
put(propertiesOut, AwsProperties.CLIENT_FACTORY, DeephavenAwsClientFactory.class.getName());
put(propertiesOut, S3FileIOProperties.CLIENT_FACTORY, DeephavenAwsClientFactory.class.getName());
final String uuid = UUID.randomUUID().toString();
put(propertiesOut, UUID_KEY, uuid);
S3_INSTRUCTIONS_MAP.put(uuid, instructions);
return () -> S3_INSTRUCTIONS_MAP.remove(uuid);
}

/**
* Get the {@link S3Instructions} as set in the corresponding {@link #addToProperties(S3Instructions, Map)} if the
* properties were built with that. If the properties were built with {@link #addToProperties(S3Instructions, Map)},
* but the {@link Runnable} was already invoked for cleanup, an {@link IllegalStateException} will be thrown.
*
* @param properties the properties
* @return the instructions
*/
public static Optional<S3Instructions> get(Map<String, String> properties) {
final String uuid = properties.get(UUID_KEY);
if (uuid == null) {
return Optional.empty();
}
final S3Instructions instructions = S3_INSTRUCTIONS_MAP.get(uuid);
if (instructions == null) {
throw new IllegalStateException(
"This S3Iinstructions were already cleaned up; please ensure that the returned Runnable from addToProperties is not invoked until the Catalog is no longer in use.");
Comment on lines +64 to +65
Copy link
Contributor

@malhotrashivam malhotrashivam Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is so weird that spotless let this one pass.
Also, typo in S3Instructions.

}
return Optional.of(instructions);
}

private static <K, V> void put(Map<K, V> map, K key, V value) {
if (map.putIfAbsent(key, value) != null) {
throw new IllegalArgumentException(String.format("Key '%s' already exist in map", key));
}
}

private static final Map<String, S3Instructions> S3_INSTRUCTIONS_MAP = new ConcurrentHashMap<>();

private S3Instructions instructions;

@Override
public void initialize(Map<String, String> properties) {
this.instructions = get(properties).orElseThrow(() -> new IllegalArgumentException(
"DeephavenAwsClientFactory was setup improperly; it must be configured with DeephavenAwsClientFactory.addToProperties"));
}

private void checkInit() {
if (instructions == null) {
throw new IllegalStateException("Must initialize before use");
}
}

@Override
public S3Client s3() {
checkInit();
return S3ClientFactory.getSyncClient(instructions);
}

@Override
public GlueClient glue() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the glue, kms and dynamodb clients creation covered under any tests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No; you could argue we should prefer to throw unsupported for DynamoDB / KMS, but it was easy enough construct those clients if some Iceberg catalog happens to use them (I'm not sure under what circumstances, but it is part of that API...).

I don't know if we've done any Glue testing. https://docs.localstack.cloud/user-guide/aws/glue/ might be possible with the "pro" version, not sure what that entails. https://docs.localstack.cloud/user-guide/aws/glue/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, the implementation does seem straightforward and the internal methods are being touched in the S3 code, so I think that part should be okay.

The only thing I am slightly worried about is preloading in case of kms and dynamo clients.
You have a comment stating glue client is preloaded, is it possible to verify the same for kms and dynamo too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've done some manual testing of the Glue Catalog with the DeephavenAwsClientFactory, and it works. It was a good exercise though, because I realized that there is a new S3Client built for ever Table load (at least for the Glue catalog), so we can't immediately invoke the cleanup Runnable. After a quick change, it worked.

The only caller I see of kms() is LakeFormationAwsClientFactory - it's possible that some private Catalogs use it, but I'm not sure. It seems like this factory has something do do with Role ARN, and it may not actually be in use anymore since I think Iceberg has expanded how configurable AWS is since the creation of it.

The only caller of dynamodb() is DynamoDbCatalog; it may be possible to test it out with Localstack since it looks to be freely available, but it's just a matter of how much time do we want to spend trying? https://docs.localstack.cloud/user-guide/aws/dynamodb/

Copy link
Contributor

@malhotrashivam malhotrashivam Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for checking into this.
At this point, maybe just add a TODO comment for kms and dynamodb, for testing it in future?

checkInit();
return GlueClient.builder()
.applyMutation(b -> S3ClientFactory.applyAllSharedSync(b, instructions))
.build();
}

@Override
public KmsClient kms() {
checkInit();
return KmsClient.builder()
.applyMutation(b -> S3ClientFactory.applyAllSharedSync(b, instructions))
.build();
}

@Override
public DynamoDbClient dynamo() {
checkInit();
return DynamoDbClient.builder()
.applyMutation(b -> S3ClientFactory.applyAllSharedSync(b, instructions))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,32 @@
package io.deephaven.iceberg.util;

import com.google.common.base.Strings;
import io.deephaven.extensions.s3.DeephavenAwsClientFactory;
import io.deephaven.extensions.s3.S3Instructions;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.aws.AwsClientProperties;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.aws.HttpClientProperties;
import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import org.apache.iceberg.rest.RESTCatalog;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.lang.ref.Cleaner;
import java.util.HashMap;
import java.util.Map;

/**
* Tools for accessing tables in the Iceberg table format.
*/
@SuppressWarnings("unused")
public class IcebergToolsS3 extends IcebergTools {
public final class IcebergToolsS3 {
private static final String S3_FILE_IO_CLASS = "org.apache.iceberg.aws.s3.S3FileIO";

// Note: this could move to a shared location
private static final Cleaner CLEANER = Cleaner.create();

/**
* Create an Iceberg catalog adapter for a REST catalog backed by S3 storage. If {@code null} is provided for a
* value, the system defaults will be used.
Expand Down Expand Up @@ -102,4 +110,40 @@ public static IcebergCatalogAdapter createGlue(

return new IcebergCatalogAdapter(catalog, properties);
}

/**
* Create an Iceberg catalog adapter. Instead of the AWS clients being configured via the various Iceberg-specific
* properties (found amongst {@link S3FileIOProperties}, {@link HttpClientProperties}, and {@link AwsProperties}),
* the clients are created in the same way that Deephaven's AWS clients are configured with respect to
* {@code instructions}. This ensures, amongst other things, that Iceberg's AWS configuration and credentials are
* in-sync with Deephaven's AWS configuration and credentials for S3 access. The {@code instructions} will
* automatically be used as special instructions if {@link IcebergInstructions#dataInstructions()} if not explicitly
* set.
*
* <p>
* The caller is still responsible for providing the properties necessary as specified in
* {@link IcebergTools#createAdapter(String, Map, Map)}.
*
* @param name the name of the catalog; if omitted, the catalog URI will be used to generate a name
* @param properties a map containing the Iceberg catalog properties to use
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
* @param hadoopConfig a map containing Hadoop configuration properties to use
* @param instructions the s3 instructions
* @return the Iceberg catalog adapter
*
* @see DeephavenAwsClientFactory#addToProperties(S3Instructions, Map)
*/
public static IcebergCatalogAdapter createAdapter(
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
@Nullable final String name,
@NotNull final Map<String, String> properties,
@NotNull final Map<String, String> hadoopConfig,
@NotNull final S3Instructions instructions) {
final Map<String, String> newProperties = new HashMap<>(properties);
final Runnable cleanup = DeephavenAwsClientFactory.addToProperties(instructions, newProperties);
final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(name, newProperties, hadoopConfig);
// When the Catalog becomes unreachable, we can invoke the DeephavenAwsClientFactory cleanup.
// Note: it would be incorrect to register the cleanup against the adapter since the Catalog can outlive the
// adapter (and the DeephavenAwsClientFactory properties are needed by the Catalog).
CLEANER.register(adapter.catalog(), cleanup);
return adapter;
Comment on lines +146 to +147
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I had to do something similar for the S3Request objects, Ryan suggested using CleanupReferenceProcessor. You can check that too, if that has any advantages.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.google.auto.service.AutoService;
import io.deephaven.extensions.s3.Credentials;
import io.deephaven.extensions.s3.DeephavenAwsClientFactory;
import io.deephaven.extensions.s3.S3Instructions;
import io.deephaven.iceberg.internal.DataInstructionsProviderPlugin;
import org.apache.iceberg.aws.AwsClientProperties;
Expand All @@ -15,13 +16,18 @@
import java.util.Map;

/**
* {@link io.deephaven.iceberg.internal.DataInstructionsProviderPlugin} implementation used for reading files from S3.
* {@link DataInstructionsProviderPlugin} implementation used for reading files from S3.
*/
@AutoService(io.deephaven.iceberg.internal.DataInstructionsProviderPlugin.class)
@AutoService(DataInstructionsProviderPlugin.class)
@SuppressWarnings("unused")
public final class S3InstructionsProviderPlugin implements DataInstructionsProviderPlugin {
@Override
public Object createInstructions(@NotNull final URI uri, @NotNull final Map<String, String> properties) {
final S3Instructions s3Instructions = DeephavenAwsClientFactory.get(properties).orElse(null);
if (s3Instructions != null) {
return s3Instructions;
}

// If the URI scheme is "s3","s3a","s3n" or if the properties contain one of these specific keys, we can
// create a useful S3Instructions object.
if (uri.getScheme().equals("s3")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

import io.deephaven.extensions.s3.S3Instructions.Builder;
import io.deephaven.extensions.s3.testlib.SingletonContainers.LocalStack;
import org.junit.BeforeClass;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import software.amazon.awssdk.services.s3.S3AsyncClient;

import java.util.Map;
Expand All @@ -15,9 +16,11 @@
import static org.apache.iceberg.aws.s3.S3FileIOProperties.ENDPOINT;
import static org.apache.iceberg.aws.s3.S3FileIOProperties.SECRET_ACCESS_KEY;

@Tag("testcontainers")
@Deprecated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we had to deprecate these?
They might still give us coverage on a lot of smaller cases that Larry tested.
Is it deprecated in a sense that new tests should not be added?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's mostly that I think we should not add new tests here, and work to migrating them as mentioned in IcebergToolsTest. @lbooker42 has so far owned this layer, but it should be relatively easy to migrate to db_resource, and then we get the benefit of:

  1. No separate container (s3) needed (+ no need to upload files)
  2. Ne need to have custom catalog IcebergTestCatalog
  3. On disk JDBC catalog + on disk warehouse

I see db_resource as mainly a way to test out how well we can interoperate with Iceberg that has been written via different processes (pyiceberg, spark, etc).

For more thorough testing (once we have our own writing support) we should be able to extend SqliteCatalogBase (or create further specialized tests that look similar to it), which can work with any warehouse - currently the same logic is tested out via local disk, minio, and localstack.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, these new catalog testing code is the more comprehensive way for all future tests. Once we can migrate these tests so we don't lose any coverage, we should remove these as well as the IcebergTestCatalog class.
cc: @lbooker42

public class IcebergLocalStackTest extends IcebergToolsTest {

@BeforeClass
@BeforeAll
public static void initContainer() {
// ensure container is started so container startup time isn't associated with a specific test
LocalStack.init();
Expand All @@ -34,7 +37,7 @@ public S3AsyncClient s3AsyncClient() {
}

@Override
public Map<String, String> s3Properties() {
public Map<String, String> properties() {
return Map.of(
ENDPOINT, LocalStack.s3Endpoint(),
CLIENT_REGION, LocalStack.region(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import io.deephaven.extensions.s3.testlib.SingletonContainers.MinIO;
import io.deephaven.stats.util.OSUtil;
import org.junit.jupiter.api.Assumptions;
import org.junit.BeforeClass;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import software.amazon.awssdk.services.s3.S3AsyncClient;

import java.util.Map;
Expand All @@ -17,9 +18,11 @@
import static org.apache.iceberg.aws.s3.S3FileIOProperties.ENDPOINT;
import static org.apache.iceberg.aws.s3.S3FileIOProperties.SECRET_ACCESS_KEY;

@Tag("testcontainers")
@Deprecated
public class IcebergMinIOTest extends IcebergToolsTest {

@BeforeClass
@BeforeAll
public static void initContainer() {
// TODO(deephaven-core#5116): MinIO testcontainers does not work on OS X
Assumptions.assumeFalse(OSUtil.runningMacOS(), "OSUtil.runningMacOS()");
Expand All @@ -38,12 +41,11 @@ public S3AsyncClient s3AsyncClient() {
}

@Override
public Map<String, String> s3Properties() {
public Map<String, String> properties() {
return Map.of(
ENDPOINT, MinIO.s3Endpoint(),
CLIENT_REGION, MinIO.region(),
ACCESS_KEY_ID, MinIO.accessKey(),
SECRET_ACCESS_KEY, MinIO.secretAccessKey());
}

}
Loading