Skip to content

Commit

Permalink
add a workaround until delta version is upgraded to include record co…
Browse files Browse the repository at this point in the history
…unt (#209)

* add a workaround until delta version is upgraded to include record count

* cleanup

* fix hudi test table filter

* use better query for filtering on delta source ITs
  • Loading branch information
the-other-tim-brown authored Nov 10, 2023
1 parent 59c91f4 commit e48374c
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 8 deletions.
17 changes: 13 additions & 4 deletions core/src/main/java/io/onetable/delta/DeltaActionsConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Collections;
import java.util.List;
import java.util.Map;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
Expand All @@ -33,6 +34,7 @@
import io.onetable.exception.NotSupportedException;
import io.onetable.model.schema.OneField;
import io.onetable.model.schema.OnePartitionField;
import io.onetable.model.stat.ColumnStat;
import io.onetable.model.storage.FileFormat;
import io.onetable.model.storage.OneDataFile;

Expand All @@ -55,6 +57,15 @@ public OneDataFile convertAddActionToOneDataFile(
DeltaPartitionExtractor partitionExtractor,
DeltaStatsExtractor fileStatsExtractor) {
String tableBasePath = deltaSnapshot.deltaLog().dataPath().toUri().toString();
Map<OneField, ColumnStat> columnStatMap =
includeColumnStats
? fileStatsExtractor.getColumnStatsForFile(addFile, fields)
: Collections.emptyMap();
long recordCount =
columnStatMap.values().stream()
.map(ColumnStat::getNumValues)
.max(Long::compareTo)
.orElse(0L);
// TODO(https://github.com/onetable-io/onetable/issues/102): removed record count.
return OneDataFile.builder()
.physicalPath(getFullPathToFile(tableBasePath, addFile.path()))
Expand All @@ -63,10 +74,8 @@ public OneDataFile convertAddActionToOneDataFile(
.lastModified(addFile.modificationTime())
.partitionValues(
partitionExtractor.partitionValueExtraction(addFile.partitionValues(), partitionFields))
.columnStats(
includeColumnStats
? fileStatsExtractor.getColumnStatsForFile(addFile, fields)
: Collections.emptyMap())
.columnStats(columnStatMap)
.recordCount(recordCount)
.build();
}

Expand Down
2 changes: 2 additions & 0 deletions core/src/test/java/io/onetable/GenericTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public interface GenericTable<T, Q> extends AutoCloseable {

List<String> getColumnsToSelect();

String getFilterQuery();

static GenericTable getInstance(
String tableName,
Path tempDir,
Expand Down
2 changes: 2 additions & 0 deletions core/src/test/java/io/onetable/ITOneTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ public void testVariousOperations(
table.deleteRows(insertRecords.subList(30, 50));
oneTableClient.sync(perTableConfig, sourceClientProvider);
checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats, 180);
checkDatasetEquivalenceWithFilter(
sourceTableFormat, table, targetTableFormats, table.getFilterQuery());
}

try (GenericTable tableWithUpdatedSchema =
Expand Down
5 changes: 5 additions & 0 deletions core/src/test/java/io/onetable/TestAbstractHudiTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,11 @@ public void reload() {
// no-op.
}

@Override
public String getFilterQuery() {
return String.format("%s > 'aaa'", RECORD_KEY_FIELD_NAME);
}

@Override
public String getOrderByColumn() {
return "_hoodie_record_key";
Expand Down
5 changes: 5 additions & 0 deletions core/src/test/java/io/onetable/TestSparkDeltaTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ public void deleteSpecialPartition() {
}
}

@Override
public String getFilterQuery() {
return "id % 2 = 0";
}

public void runCompaction() {
deltaTable.optimize().executeCompaction();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
import io.onetable.model.storage.OneDataFile;
import io.onetable.model.storage.OneFileGroup;
import io.onetable.model.storage.TableFormat;
import io.onetable.testutil.Issues;

public class ITDeltaSourceClient {

Expand Down Expand Up @@ -682,9 +681,7 @@ private void validatePropertiesDataFile(OneDataFile expected, OneDataFile actual
Assertions.assertEquals(expected.getPartitionValues(), actual.getPartitionValues());
Assertions.assertEquals(expected.getPartitionPath(), actual.getPartitionPath());
Assertions.assertEquals(expected.getFileSizeBytes(), actual.getFileSizeBytes());
if (Issues.ISSUE_102_FIXED) {
Assertions.assertEquals(expected.getRecordCount(), actual.getRecordCount());
}
Assertions.assertEquals(expected.getRecordCount(), actual.getRecordCount());
Instant now = Instant.now();
long minRange = now.minus(1, ChronoUnit.HOURS).toEpochMilli();
long maxRange = now.toEpochMilli();
Expand Down

0 comments on commit e48374c

Please sign in to comment.