Skip to content

Commit

Permalink
fix: Ensure ParquetTools.readTable attaches the inferred definition (#…
Browse files Browse the repository at this point in the history
…6184)

Fixes an inconsistency between `ParquetTools#readSingleFileTable` and
`ParquetTools#readTable` in regards to whether they set the
TableDefinition in the ParquetInstructions after inference.

This PR is the non-controversial part extracted from #6149
  • Loading branch information
devinrsmith authored Oct 9, 2024
1 parent 7a01ac4 commit cd237b1
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@ public abstract class ParquetInstructions implements ColumnToCodecMappings {

private static volatile String defaultCompressionCodecName = CompressionCodecName.SNAPPY.toString();

/**
* Throws an exception if {@link ParquetInstructions#getTableDefinition()} is empty.
*
* @param parquetInstructions the parquet instructions
* @throws IllegalArgumentException if there is not a table definition
*/
public static TableDefinition ensureDefinition(ParquetInstructions parquetInstructions) {
return parquetInstructions.getTableDefinition()
.orElseThrow(() -> new IllegalArgumentException("Table definition must be provided"));
}

/**
* Set the default for {@link #getCompressionCodecName()}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -822,8 +822,7 @@ private static Table readTable(
if (readInstructions.isRefreshing()) {
throw new IllegalArgumentException("Unable to have a refreshing single parquet file");
}
final TableDefinition tableDefinition = readInstructions.getTableDefinition().orElseThrow(
() -> new IllegalArgumentException("Table definition must be provided"));
final TableDefinition tableDefinition = ParquetInstructions.ensureDefinition(readInstructions);
verifyFileLayout(readInstructions, ParquetFileLayout.SINGLE_FILE);
final TableLocationProvider locationProvider = new PollingTableLocationProvider<>(
StandaloneTableKey.getInstance(),
Expand Down Expand Up @@ -857,11 +856,10 @@ public static Table readTable(
if (readInstructions.getTableDefinition().isEmpty()) {
// Infer the definition
final KnownLocationKeyFinder<ParquetTableLocationKey> inferenceKeys = toKnownKeys(locationKeyFinder);
final Pair<TableDefinition, ParquetInstructions> inference = infer(inferenceKeys, readInstructions);
useInstructions = infer(inferenceKeys, readInstructions);
definition = useInstructions.getTableDefinition().orElseThrow();
// In the case of a static output table, we can re-use the already fetched inference keys
useLocationKeyFinder = readInstructions.isRefreshing() ? locationKeyFinder : inferenceKeys;
definition = inference.getFirst();
useInstructions = inference.getSecond();
useLocationKeyFinder = useInstructions.isRefreshing() ? locationKeyFinder : inferenceKeys;
} else {
definition = readInstructions.getTableDefinition().get();
useInstructions = readInstructions;
Expand Down Expand Up @@ -894,7 +892,12 @@ public static Table readTable(
updateSourceRegistrar);
}

private static Pair<TableDefinition, ParquetInstructions> infer(
/**
* Infers additional information regarding the parquet file(s) based on the inferenceKeys and returns a potentially
* updated parquet instructions. If the incoming {@code readInstructions} does not have a {@link TableDefinition},
* the returned instructions will have an inferred {@link TableDefinition}.
*/
private static ParquetInstructions infer(
final KnownLocationKeyFinder<ParquetTableLocationKey> inferenceKeys,
final ParquetInstructions readInstructions) {
// TODO(deephaven-core#877): Support schema merge when discovering multiple parquet files
Expand Down Expand Up @@ -929,7 +932,7 @@ private static Pair<TableDefinition, ParquetInstructions> infer(
columnDefinitionsFromParquetFile.stream()
.filter(columnDefinition -> !partitionKeys.contains(columnDefinition.getName()))
.forEach(allColumns::add);
return new Pair<>(TableDefinition.of(allColumns), schemaInfo.getSecond());
return ensureTableDefinition(schemaInfo.getSecond(), TableDefinition.of(allColumns), true);
}

private static KnownLocationKeyFinder<ParquetTableLocationKey> toKnownKeys(
Expand Down Expand Up @@ -1044,11 +1047,7 @@ private static Table readSingleFileTable(
}
// Infer the table definition
final KnownLocationKeyFinder<ParquetTableLocationKey> inferenceKeys = new KnownLocationKeyFinder<>(locationKey);
final Pair<TableDefinition, ParquetInstructions> inference = infer(inferenceKeys, readInstructions);
final TableDefinition inferredTableDefinition = inference.getFirst();
final ParquetInstructions inferredInstructions = inference.getSecond();
return readTable(inferenceKeys.getFirstKey().orElseThrow(),
ensureTableDefinition(inferredInstructions, inferredTableDefinition, true));
return readTable(inferenceKeys.getFirstKey().orElseThrow(), infer(inferenceKeys, readInstructions));
}

@VisibleForTesting
Expand Down

0 comments on commit cd237b1

Please sign in to comment.