Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add generic Iceberg catalog adapter creation to Java / Python #5754

Merged
merged 18 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.util.channel;
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved

import org.jetbrains.annotations.NotNull;

import java.net.URI;
import java.util.*;

/**
* A service loader class for loading {@link DataInstructionsProviderPlugin} implementations at runtime which provide
* {@link DataInstructionsProviderLoader} implementations for different URI paths.
*/
public final class DataInstructionsProviderLoader {
/**
* A weakly held cache of {@link DataInstructionsProviderLoader} instances keyed by the property collection.
*/
private static final WeakHashMap<Map<String, String>, DataInstructionsProviderLoader> instances =
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
new WeakHashMap<>();

/**
* Get a {@link DataInstructionsProviderLoader} instance for the given property collection.
*
* @param properties The property collection.
* @return A {@link DataInstructionsProviderLoader} instance.
*/
public static DataInstructionsProviderLoader getInstance(final Map<String, String> properties) {
return instances.computeIfAbsent(properties, DataInstructionsProviderLoader::new);
}

/**
* The properties collection for this instance.
*/
private final Map<String, String> properties;

/**
* The list of plugins loaded by the {@link ServiceLoader}.
*/
private final List<DataInstructionsProviderPlugin> providers;

/**
* A weakly held cache of {@link DataInstructionsProviderPlugin} instances keyed by the URI.
*/
private final WeakHashMap<URI, Object> cache;
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Create a new {@link DataInstructionsProviderLoader} instance for the given property collection.
*
* @param properties The property collection.
*/
private DataInstructionsProviderLoader(final Map<String, String> properties) {
this.properties = properties;
providers = new ArrayList<>();
// Load the plugins
for (final DataInstructionsProviderPlugin plugin : ServiceLoader.load(DataInstructionsProviderPlugin.class)) {
providers.add(plugin);
}
cache = new WeakHashMap<>();
}

/**
* Create a new {@link SeekableChannelsProvider} compatible for reading from and writing to the given URI, using the
* plugins loaded by the {@link ServiceLoader}. For example, for a "S3" URI, we will create a
* {@link SeekableChannelsProvider} which can read files from S3.
*
* @param uri The URI
* @return A {@link SeekableChannelsProvider} for the given URI.
*/
public Object fromServiceLoader(@NotNull final URI uri) {
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
return cache.computeIfAbsent(uri, u -> {
for (final DataInstructionsProviderPlugin plugin : providers) {
final Object pluginInstructions = plugin.createInstructions(uri, properties);
if (pluginInstructions != null) {
return pluginInstructions;
}
}
// No plugin found for this URI and property collection.
return null;
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.util.channel;

import org.jetbrains.annotations.NotNull;

import java.net.URI;
import java.util.Map;

/**
* A plugin interface for providing {@link DataInstructionsProviderPlugin} implementations for different property
* collections and URI values. Check out {@link DataInstructionsProviderLoader} for more details.
*/
public interface DataInstructionsProviderPlugin {
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
/**
* Create a data instructions object for the given URI.
*/
Object createInstructions(@NotNull URI uri, @NotNull final Map<String, String> properties);
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public static IcebergCatalogAdapter createS3Rest(
final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI;
catalog.initialize(catalogName, properties);

return new IcebergCatalogAdapter(catalog, fileIO);
return new IcebergCatalogAdapter(catalog, fileIO, properties);
}

/**
Expand Down Expand Up @@ -106,6 +106,6 @@ public static IcebergCatalogAdapter createGlue(
final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI;
catalog.initialize(catalogName, properties);

return new IcebergCatalogAdapter(catalog, fileIO);
return new IcebergCatalogAdapter(catalog, fileIO, properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.deephaven.iceberg.location.IcebergTableParquetLocationKey;
import io.deephaven.iceberg.util.IcebergInstructions;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.util.channel.DataInstructionsProviderLoader;
import org.apache.iceberg.*;
import org.apache.iceberg.io.FileIO;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -53,6 +54,11 @@ public abstract class IcebergBaseLayout implements TableLocationKeyFinder<Iceber
*/
final Map<URI, IcebergTableLocationKey> cache;

/**
* A cache of {@link IcebergTableLocationKey IcebergTableLocationKeys} keyed by the URI of the file they represent.
*/
final Map<String, String> properties;

/**
* The {@link ParquetInstructions} object that will be used to read any Parquet data files in this table. Only
* accessed while synchronized on {@code this}.
Expand All @@ -79,8 +85,17 @@ protected IcebergTableLocationKey locationKey(
}
}

// Add the data instructions.
instructions.dataInstructions().ifPresent(builder::setSpecialInstructions);
// Add the data instructions if provided as part of the IcebergInstructions.
if (instructions.dataInstructions().isPresent()) {
builder.setSpecialInstructions(instructions.dataInstructions().get());
} else {
// Attempt to create data instructions from the properties collection and URI.
final Object dataInstructions =
DataInstructionsProviderLoader.getInstance(properties).fromServiceLoader(fileUri);
if (dataInstructions != null) {
builder.setSpecialInstructions(dataInstructions);
}
}

parquetInstructions = builder.build();
}
Expand All @@ -102,12 +117,14 @@ public IcebergBaseLayout(
@NotNull final Table table,
@NotNull final Snapshot tableSnapshot,
@NotNull final FileIO fileIO,
@NotNull final IcebergInstructions instructions) {
@NotNull final IcebergInstructions instructions,
@NotNull final Map<String, String> properties) {
this.tableDef = tableDef;
this.table = table;
this.snapshot = tableSnapshot;
this.fileIO = fileIO;
this.instructions = instructions;
this.properties = properties;

this.cache = new HashMap<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.jetbrains.annotations.NotNull;

import java.net.URI;
import java.util.Map;

/**
* Iceberg {@link TableLocationKeyFinder location finder} for tables without partitions that will discover data files
Expand All @@ -30,8 +31,9 @@ public IcebergFlatLayout(
@NotNull final Table table,
@NotNull final Snapshot tableSnapshot,
@NotNull final FileIO fileIO,
@NotNull final IcebergInstructions instructions) {
super(tableDef, table, tableSnapshot, fileIO, instructions);
@NotNull final IcebergInstructions instructions,
@NotNull final Map<String, String> properties) {
super(tableDef, table, tableSnapshot, fileIO, instructions, properties);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* a {@link Snapshot}
*/
public final class IcebergKeyValuePartitionedLayout extends IcebergBaseLayout {
private class ColumnData {
private static class ColumnData {
final String name;
final Class<?> type;
final int index;
Expand Down Expand Up @@ -52,8 +52,9 @@ public IcebergKeyValuePartitionedLayout(
@NotNull final org.apache.iceberg.Snapshot tableSnapshot,
@NotNull final FileIO fileIO,
@NotNull final PartitionSpec partitionSpec,
@NotNull final IcebergInstructions instructions) {
super(tableDef, table, tableSnapshot, fileIO, instructions);
@NotNull final IcebergInstructions instructions,
@NotNull final Map<String, String> properties) {
super(tableDef, table, tableSnapshot, fileIO, instructions, properties);

// We can assume due to upstream validation that there are no duplicate names (after renaming) that are included
// in the output definition, so we can ignore duplicates.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class IcebergCatalogAdapter {
private final Catalog catalog;
private final FileIO fileIO;

private final Map<String, String> properties;

/**
* Construct an IcebergCatalogAdapter from a catalog and file IO.
*/
Expand All @@ -52,6 +54,23 @@ public class IcebergCatalogAdapter {
@NotNull final FileIO fileIO) {
this.catalog = catalog;
this.fileIO = fileIO;

// Create an empty properties map.
this.properties = Map.of();
}

/**
* Construct an IcebergCatalogAdapter from a catalog, file IO, and property collection.
*/
IcebergCatalogAdapter(
@NotNull final Catalog catalog,
@NotNull final FileIO fileIO,
@NotNull final Map<String, String> properties) {
this.catalog = catalog;
this.fileIO = fileIO;

lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
// Copy the properties map to ensure immutability.
this.properties = Map.copyOf(properties);
}

/**
Expand Down Expand Up @@ -350,6 +369,28 @@ public Table listSnapshotsAsTable(@NotNull final String tableIdentifier) {
return listSnapshotsAsTable(TableIdentifier.parse(tableIdentifier));
}

/**
* Read the latest static snapshot of an Iceberg table from the Iceberg catalog.
*
* @param tableIdentifier The table identifier to load
* @return The loaded table
*/
@SuppressWarnings("unused")
public Table readTable(@NotNull final TableIdentifier tableIdentifier) {
return readTableInternal(tableIdentifier, null, null);
}

/**
* Read the latest static snapshot of an Iceberg table from the Iceberg catalog.
*
* @param tableIdentifier The table identifier to load
* @return The loaded table
*/
@SuppressWarnings("unused")
public Table readTable(@NotNull final String tableIdentifier) {
return readTableInternal(TableIdentifier.parse(tableIdentifier), null, null);
}

/**
* Read the latest static snapshot of an Iceberg table from the Iceberg catalog.
*
Expand All @@ -360,7 +401,7 @@ public Table listSnapshotsAsTable(@NotNull final String tableIdentifier) {
@SuppressWarnings("unused")
public Table readTable(
@NotNull final TableIdentifier tableIdentifier,
@Nullable final IcebergInstructions instructions) {
@NotNull final IcebergInstructions instructions) {
return readTableInternal(tableIdentifier, null, instructions);
}

Expand All @@ -374,10 +415,47 @@ public Table readTable(
@SuppressWarnings("unused")
public Table readTable(
@NotNull final String tableIdentifier,
@Nullable final IcebergInstructions instructions) {
@NotNull final IcebergInstructions instructions) {
return readTable(TableIdentifier.parse(tableIdentifier), instructions);
}

/**
* Read a static snapshot of an Iceberg table from the Iceberg catalog.
*
* @param tableIdentifier The table identifier to load
* @param tableSnapshotId The snapshot id to load
* @return The loaded table
*/
@SuppressWarnings("unused")
public Table readTable(@NotNull final TableIdentifier tableIdentifier, final long tableSnapshotId) {
// Find the snapshot with the given snapshot id
final Snapshot tableSnapshot = listSnapshots(tableIdentifier).stream()
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
.filter(snapshot -> snapshot.snapshotId() == tableSnapshotId)
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Snapshot with id " + tableSnapshotId + " not found"));

return readTableInternal(tableIdentifier, tableSnapshot, null);
}

/**
* Read a static snapshot of an Iceberg table from the Iceberg catalog.
*
* @param tableIdentifier The table identifier to load
* @param tableSnapshotId The snapshot id to load
* @return The loaded table
*/
@SuppressWarnings("unused")
public Table readTable(@NotNull final String tableIdentifier, final long tableSnapshotId) {
final TableIdentifier tableId = TableIdentifier.parse(tableIdentifier);
// Find the snapshot with the given snapshot id
final Snapshot tableSnapshot = listSnapshots(tableId).stream()
.filter(snapshot -> snapshot.snapshotId() == tableSnapshotId)
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Snapshot with id " + tableSnapshotId + " not found"));

return readTableInternal(tableId, tableSnapshot, null);
}

/**
* Read a static snapshot of an Iceberg table from the Iceberg catalog.
*
Expand All @@ -390,7 +468,7 @@ public Table readTable(
public Table readTable(
@NotNull final TableIdentifier tableIdentifier,
final long tableSnapshotId,
@Nullable final IcebergInstructions instructions) {
@NotNull final IcebergInstructions instructions) {

// Find the snapshot with the given snapshot id
final Snapshot tableSnapshot = listSnapshots(tableIdentifier).stream()
Expand All @@ -413,7 +491,7 @@ public Table readTable(
public Table readTable(
@NotNull final String tableIdentifier,
final long tableSnapshotId,
@Nullable final IcebergInstructions instructions) {
@NotNull final IcebergInstructions instructions) {
return readTable(TableIdentifier.parse(tableIdentifier), tableSnapshotId, instructions);
}

Expand All @@ -429,7 +507,7 @@ public Table readTable(
public Table readTable(
@NotNull final TableIdentifier tableIdentifier,
@NotNull final Snapshot tableSnapshot,
@Nullable final IcebergInstructions instructions) {
@NotNull final IcebergInstructions instructions) {
return readTableInternal(tableIdentifier, tableSnapshot, instructions);
}

Expand Down Expand Up @@ -522,11 +600,11 @@ private Table readTableInternal(

if (partitionSpec.isUnpartitioned()) {
// Create the flat layout location key finder
keyFinder = new IcebergFlatLayout(tableDef, table, snapshot, fileIO, userInstructions);
keyFinder = new IcebergFlatLayout(tableDef, table, snapshot, fileIO, userInstructions, properties);
} else {
// Create the partitioning column location key finder
keyFinder = new IcebergKeyValuePartitionedLayout(tableDef, table, snapshot, fileIO, partitionSpec,
userInstructions);
userInstructions, properties);
}

refreshService = null;
Expand Down
Loading
Loading