diff --git a/core/src/main/java/io/onetable/iceberg/IcebergSourceClient.java b/core/src/main/java/io/onetable/iceberg/IcebergSourceClient.java index 5bb6a2e2..15962e98 100644 --- a/core/src/main/java/io/onetable/iceberg/IcebergSourceClient.java +++ b/core/src/main/java/io/onetable/iceberg/IcebergSourceClient.java @@ -52,7 +52,7 @@ @Log4j2 @Builder -public class IcebergSourceClient implements SourceClient { +public class IcebergSourceClient implements SourceClient { @NonNull private final Configuration hadoopConf; @NonNull private final PerTableConfig sourceTableConfig; @@ -89,8 +89,9 @@ private FileIO initTableOps() { } @Override - public OneTable getTable(Snapshot snapshot) { + public OneTable getTable(Long snapshotId) { Table iceTable = getSourceTable(); + Snapshot snapshot = iceTable.snapshot(snapshotId); Schema iceSchema = iceTable.schemas().get(snapshot.schemaId()); IcebergSchemaExtractor schemaExtractor = IcebergSchemaExtractor.getInstance(); @@ -113,8 +114,9 @@ public OneTable getTable(Snapshot snapshot) { } @Override - public SchemaCatalog getSchemaCatalog(OneTable table, Snapshot snapshot) { + public SchemaCatalog getSchemaCatalog(OneTable table, Long snapshotId) { Table iceTable = getSourceTable(); + Snapshot snapshot = iceTable.snapshot(snapshotId); Integer iceSchemaId = snapshot.schemaId(); Schema iceSchema = iceTable.schemas().get(iceSchemaId); IcebergSchemaExtractor schemaExtractor = IcebergSchemaExtractor.getInstance(); @@ -129,8 +131,8 @@ public OneSnapshot getCurrentSnapshot() { Table iceTable = getSourceTable(); Snapshot currentSnapshot = iceTable.currentSnapshot(); - OneTable irTable = getTable(currentSnapshot); - SchemaCatalog schemaCatalog = getSchemaCatalog(irTable, currentSnapshot); + OneTable irTable = getTable(currentSnapshot.snapshotId()); + SchemaCatalog schemaCatalog = getSchemaCatalog(irTable, currentSnapshot.snapshotId()); TableScan scan = iceTable.newScan().useSnapshot(currentSnapshot.snapshotId()); PartitionSpec partitionSpec = iceTable.spec(); @@ -162,11 +164,12 @@ private OneDataFile fromIceberg(DataFile file, PartitionSpec partitionSpec, OneT } @Override - public TableChange getTableChangeForCommit(Snapshot snapshot) { + public TableChange getTableChangeForCommit(Long snapshotId) { FileIO fileIO = getTableOps(); Table iceTable = getSourceTable(); PartitionSpec partitionSpec = iceTable.spec(); - OneTable irTable = getTable(snapshot); + Snapshot snapshot = iceTable.snapshot(snapshotId); + OneTable irTable = getTable(snapshotId); Set dataFilesAdded = StreamSupport.stream(snapshot.addedDataFiles(fileIO).spliterator(), false) @@ -184,12 +187,12 @@ public TableChange getTableChangeForCommit(Snapshot snapshot) { .filesRemoved(dataFilesRemoved) .build(); - OneTable table = getTable(snapshot); + OneTable table = getTable(snapshot.snapshotId()); return TableChange.builder().tableAsOfChange(table).filesDiff(filesDiff).build(); } @Override - public CommitsBacklog getCommitsBacklog(InstantsForIncrementalSync lastSyncInstant) { + public CommitsBacklog getCommitsBacklog(InstantsForIncrementalSync lastSyncInstant) { long epochMilli = lastSyncInstant.getLastSyncInstant().toEpochMilli(); Table iceTable = getSourceTable(); @@ -204,18 +207,18 @@ public CommitsBacklog getCommitsBacklog(InstantsForIncrementalSync las if (pendingSnapshot.timestampMillis() <= epochMilli) { // Even the latest snapshot was committed before the lastSyncInstant. No new commits were made // and no new snapshots need to be synced. Return empty state. - return CommitsBacklog.builder().build(); + return CommitsBacklog.builder().build(); } - List snapshots = new ArrayList<>(); + List snapshotIds = new ArrayList<>(); while (pendingSnapshot != null && pendingSnapshot.timestampMillis() > epochMilli) { - snapshots.add(pendingSnapshot); + snapshotIds.add(pendingSnapshot.snapshotId()); pendingSnapshot = pendingSnapshot.parentId() != null ? iceTable.snapshot(pendingSnapshot.parentId()) : null; } // reverse the list to process the oldest snapshot first - Collections.reverse(snapshots); - return CommitsBacklog.builder().commitsToProcess(snapshots).build(); + Collections.reverse(snapshotIds); + return CommitsBacklog.builder().commitsToProcess(snapshotIds).build(); } // TODO(https://github.com/onetable-io/onetable/issues/147): Handle this. diff --git a/core/src/main/java/io/onetable/iceberg/IcebergSourceClientProvider.java b/core/src/main/java/io/onetable/iceberg/IcebergSourceClientProvider.java index ac67f218..5ffcc43e 100644 --- a/core/src/main/java/io/onetable/iceberg/IcebergSourceClientProvider.java +++ b/core/src/main/java/io/onetable/iceberg/IcebergSourceClientProvider.java @@ -18,13 +18,11 @@ package io.onetable.iceberg; -import org.apache.iceberg.Snapshot; - import io.onetable.client.PerTableConfig; import io.onetable.client.SourceClientProvider; /** A concrete implementation of {@link SourceClientProvider} for Hudi table format. */ -public class IcebergSourceClientProvider extends SourceClientProvider { +public class IcebergSourceClientProvider extends SourceClientProvider { @Override public IcebergSourceClient getSourceClientInstance(PerTableConfig sourceTableConfig) { return IcebergSourceClient.builder() diff --git a/core/src/test/java/io/onetable/iceberg/TestIcebergSourceClient.java b/core/src/test/java/io/onetable/iceberg/TestIcebergSourceClient.java index 0d4ac72d..3c639b49 100644 --- a/core/src/test/java/io/onetable/iceberg/TestIcebergSourceClient.java +++ b/core/src/test/java/io/onetable/iceberg/TestIcebergSourceClient.java @@ -86,7 +86,7 @@ void getTableTest(@TempDir Path workingDir) throws IOException { IcebergSourceClient client = clientProvider.getSourceClientInstance(sourceTableConfig); Snapshot snapshot = catalogSales.currentSnapshot(); - OneTable oneTable = client.getTable(snapshot); + OneTable oneTable = client.getTable(snapshot.snapshotId()); Assertions.assertNotNull(oneTable); Assertions.assertEquals(TableFormat.ICEBERG, oneTable.getTableFormat()); Assertions.assertTrue(oneTable.getName().endsWith("catalog_sales")); @@ -121,7 +121,7 @@ public void getSchemaCatalogTest(@TempDir Path workingDir) throws IOException { IcebergSourceClient client = clientProvider.getSourceClientInstance(sourceTableConfig); IcebergSourceClient spyClient = spy(client); - SchemaCatalog schemaCatalog = spyClient.getSchemaCatalog(null, iceCurrentSnapshot); + SchemaCatalog schemaCatalog = spyClient.getSchemaCatalog(null, iceCurrentSnapshot.snapshotId()); Assertions.assertNotNull(schemaCatalog); Map schemas = schemaCatalog.getSchemas(); Assertions.assertEquals(1, schemas.size()); @@ -156,8 +156,9 @@ public void testGetCurrentSnapshot(@TempDir Path workingDir) throws IOException Assertions.assertEquals( String.valueOf(iceCurrentSnapshot.snapshotId()), oneSnapshot.getVersion()); Assertions.assertNotNull(oneSnapshot.getTable()); - verify(spyClient, times(1)).getTable(iceCurrentSnapshot); - verify(spyClient, times(1)).getSchemaCatalog(oneSnapshot.getTable(), iceCurrentSnapshot); + verify(spyClient, times(1)).getTable(iceCurrentSnapshot.snapshotId()); + verify(spyClient, times(1)) + .getSchemaCatalog(oneSnapshot.getTable(), iceCurrentSnapshot.snapshotId()); verify(spyPartitionConverter, times(5)).toOneTable(any(), any(), any()); verify(spyDataFileExtractor, times(5)).fromIceberg(any(), any(), any()); @@ -315,9 +316,11 @@ private void validatePendingCommits(Table table, Snapshot lastSync, Snapshot... .lastSyncInstant(Instant.ofEpochMilli(lastSync.timestampMillis())) .build(); IcebergSourceClient sourceClient = getIcebergSourceClient(table); - CommitsBacklog commitsBacklog = sourceClient.getCommitsBacklog(instant); + CommitsBacklog commitsBacklog = sourceClient.getCommitsBacklog(instant); Assertions.assertEquals(0, commitsBacklog.getInFlightInstants().size()); - Assertions.assertArrayEquals(snapshots, commitsBacklog.getCommitsToProcess().toArray()); + Long[] snapshotIds = + Arrays.stream(snapshots).map(snapshot -> snapshot.snapshotId()).toArray(Long[]::new); + Assertions.assertArrayEquals(snapshotIds, commitsBacklog.getCommitsToProcess().toArray()); } private static long getDataFileCount(Table catalogSales) throws IOException { @@ -329,7 +332,7 @@ private static long getDataFileCount(Table catalogSales) throws IOException { private void validateTableChangeDiffSize( Table table, Snapshot snapshot, int addedFiles, int removedFiles) { IcebergSourceClient sourceClient = getIcebergSourceClient(table); - TableChange tableChange = sourceClient.getTableChangeForCommit(snapshot); + TableChange tableChange = sourceClient.getTableChangeForCommit(snapshot.snapshotId()); Assertions.assertEquals(addedFiles, tableChange.getFilesDiff().getFilesAdded().size()); Assertions.assertEquals(removedFiles, tableChange.getFilesDiff().getFilesRemoved().size()); }