diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java index 47126bc595a..70c9ab4fff4 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java @@ -29,6 +29,7 @@ import java.nio.channels.SeekableByteChannel; import java.util.List; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.function.Function; import static io.deephaven.parquet.base.ParquetUtils.resolve; @@ -44,7 +45,7 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader { private final CompressorAdapter decompressor; private final ColumnDescriptor path; private final OffsetIndexReader offsetIndexReader; - private final List fieldTypes; + private final List nonRequiredFields; private final Function dictionarySupplier; private final URI columnChunkURI; /** @@ -62,12 +63,12 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader { final SeekableChannelsProvider channelsProvider, final URI rootURI, final MessageType type, - final List fieldTypes, + final List nonRequiredFields, final long numRows, final String version) { - this.columnName = columnName; - this.channelsProvider = channelsProvider; - this.columnChunk = columnChunk; + this.columnName = Objects.requireNonNull(columnName); + this.channelsProvider = Objects.requireNonNull(channelsProvider); + this.columnChunk = Objects.requireNonNull(columnChunk); this.path = type .getColumnDescription(columnChunk.meta_data.getPath_in_schema().toArray(new String[0])); if (columnChunk.getMeta_data().isSetCodec()) { @@ -76,7 +77,7 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader { } else { decompressor = CompressorAdapter.PASSTHRU; } - this.fieldTypes = fieldTypes; + this.nonRequiredFields = Objects.requireNonNull(nonRequiredFields); this.dictionarySupplier = new SoftCachingFunction<>(this::getDictionary); this.numRows = numRows; this.version = version; @@ -280,7 +281,8 @@ public ColumnPageReader next(@NotNull final SeekableChannelContext channelContex final Function pageDictionarySupplier = getPageDictionarySupplier(pageHeader); return new ColumnPageReaderImpl(columnName, channelsProvider, decompressor, pageDictionarySupplier, - pageMaterializerFactory, path, getURI(), fieldTypes, dataOffset, pageHeader, numValuesInPage); + pageMaterializerFactory, path, getURI(), nonRequiredFields, dataOffset, pageHeader, + numValuesInPage); } catch (IOException e) { throw new UncheckedDeephavenException("Error reading page header at offset " + headerOffset + " for " + "column: " + columnName + ", uri: " + getURI(), e); @@ -358,7 +360,7 @@ public ColumnPageReader getPageReader(final int pageNum, final SeekableChannelCo final Function pageDictionarySupplier = getPageDictionarySupplier(pageHeader); return new ColumnPageReaderImpl(columnName, channelsProvider, decompressor, pageDictionarySupplier, - pageMaterializerFactory, path, getURI(), fieldTypes, dataOffset, pageHeader, + pageMaterializerFactory, path, getURI(), nonRequiredFields, dataOffset, pageHeader, getNumValues(pageHeader)); } catch (final IOException e) { throw new UncheckedDeephavenException("Error reading page header for page number " + pageNum + diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java index d708397f5e6..fd2c357a31a 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java @@ -58,7 +58,7 @@ final class ColumnPageReaderImpl implements ColumnPageReader { private final PageMaterializerFactory pageMaterializerFactory; private final ColumnDescriptor path; private final URI uri; - private final List fieldTypes; + private final List nonRequiredFields; /** * The offset for data following the page header in the file. @@ -80,7 +80,7 @@ final class ColumnPageReaderImpl implements ColumnPageReader { * @param materializerFactory The factory for creating {@link PageMaterializer}. * @param path The path of the column. * @param uri The uri of the parquet file. - * @param fieldTypes The types of the fields in the column. + * @param nonRequiredFields The types of the non-required fields in the column. * @param dataOffset The offset for data following the page header in the file. * @param pageHeader The page header, should not be {@code null}. * @param numValues The number of values in the page. @@ -93,7 +93,7 @@ final class ColumnPageReaderImpl implements ColumnPageReader { final PageMaterializerFactory materializerFactory, final ColumnDescriptor path, final URI uri, - final List fieldTypes, + final List nonRequiredFields, final long dataOffset, final PageHeader pageHeader, final int numValues) { @@ -104,7 +104,7 @@ final class ColumnPageReaderImpl implements ColumnPageReader { this.pageMaterializerFactory = materializerFactory; this.path = path; this.uri = uri; - this.fieldTypes = fieldTypes; + this.nonRequiredFields = nonRequiredFields; this.dataOffset = dataOffset; this.pageHeader = Require.neqNull(pageHeader, "pageHeader"); this.numValues = Require.geqZero(numValues, "numValues"); @@ -716,7 +716,7 @@ private Pair[], Integer> getOffsetsAndNulls( int currentRl = rlDecoder == null ? 0 : rlDecoder.currentValue(); final LevelsController levelsController = new LevelsController( - fieldTypes.stream().map(Type::getRepetition).toArray(Type.Repetition[]::new)); + nonRequiredFields.stream().map(Type::getRepetition).toArray(Type.Repetition[]::new)); for (int valuesProcessed = 0; valuesProcessed < numValues;) { if (dlRangeSize == 0) { dlDecoder.readNextRange(); diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java index da085646a41..bfc10217b39 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java @@ -12,13 +12,19 @@ import org.apache.parquet.schema.*; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import shaded.parquet.org.apache.thrift.protocol.TSimpleJSONProtocol; +import shaded.parquet.org.apache.thrift.transport.TIOStreamTransport; +import shaded.parquet.org.apache.thrift.transport.TTransport; +import java.io.BufferedOutputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.UncheckedIOException; import java.net.URI; import java.nio.channels.SeekableByteChannel; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.*; import static io.deephaven.parquet.base.ParquetUtils.MAGIC; @@ -39,7 +45,7 @@ public class ParquetFileReader { * If reading a single parquet file, root URI is the URI of the file, else the parent directory for a metadata file */ private final URI rootURI; - private final MessageType type; + private final MessageType schema; /** * Make a {@link ParquetFileReader} for the supplied {@link File}. Wraps {@link IOException} as @@ -102,7 +108,7 @@ private ParquetFileReader( fileMetaData = Util.readFileMetaData(in); } } - type = fromParquetSchema(fileMetaData.schema, fileMetaData.column_orders); + schema = fromParquetSchema(fileMetaData.schema, fileMetaData.column_orders); } /** @@ -239,7 +245,6 @@ public RowGroupReader getRowGroup(final int groupNumber, final String version) { fileMetaData.getRow_groups().get(groupNumber), channelsProvider, rootURI, - type, getSchema(), version); } @@ -477,10 +482,18 @@ private static boolean isAdjustedToUTC(final LogicalType logicalType) { } public MessageType getSchema() { - return type; + return schema; } public int rowGroupCount() { return fileMetaData.getRow_groups().size(); } + + // Useful debugging utility + void writeFileMetadata(Path path) throws Exception { + try (final TTransport out = new TIOStreamTransport(new BufferedOutputStream(Files.newOutputStream(path)))) { + fileMetaData.write(new TSimpleJSONProtocol(out)); + out.flush(); + } + } } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetTotalColumns.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetTotalColumns.java new file mode 100644 index 00000000000..d87f31a1caa --- /dev/null +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetTotalColumns.java @@ -0,0 +1,59 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.parquet.base; + +import io.deephaven.util.annotations.InternalUseOnly; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.TypeVisitor; + +@InternalUseOnly +public final class ParquetTotalColumns { + + public static int of(Type type) { + final TotalColumnsVisitor visitor = new TotalColumnsVisitor(); + type.accept(visitor); + return visitor.out; + } + + public static int of(@SuppressWarnings("unused") PrimitiveType primitiveType) { + return 1; + } + + public static int of(GroupType groupType) { + return groupType.getFields().stream().mapToInt(ParquetTotalColumns::of).sum(); + } + + public static int of(MessageType messageType) { + final int numColumns = of((GroupType) messageType); + // same size as messageType.getColumns().size(), but this is cheaper. + final int numPaths = messageType.getPaths().size(); + if (numColumns != numPaths) { + throw new IllegalStateException( + String.format("Inconsistent sizes, numColumns=%d, numPaths=%d", numColumns, numPaths)); + } + return numColumns; + } + + private static class TotalColumnsVisitor implements TypeVisitor { + private int out; + + @Override + public void visit(GroupType groupType) { + out = of(groupType); + } + + @Override + public void visit(MessageType messageType) { + out = of(messageType); + } + + @Override + public void visit(PrimitiveType primitiveType) { + out = of(primitiveType); + } + } +} diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupReader.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupReader.java index 057e7bfc09a..dcf793e7efa 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupReader.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupReader.java @@ -3,6 +3,7 @@ // package io.deephaven.parquet.base; +import io.deephaven.util.annotations.InternalUseOnly; import org.apache.parquet.format.RowGroup; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -12,16 +13,20 @@ /** * Provides read access to a parquet Row Group */ +@InternalUseOnly public interface RowGroupReader { /** - * Returns the accessor to a given Column Chunk + * Returns the accessor to a given Column Chunk. If {@code fieldId} is present, it will be matched over + * {@code parquetColumnNamePath}. * * @param columnName the name of the column - * @param path the full column path + * @param parquetColumnNamePath the full column parquetColumnNamePath + * @param fieldId the field_id to fetch * @return the accessor to a given Column Chunk, or null if the column is not present in this Row Group */ @Nullable - ColumnChunkReader getColumnChunk(@NotNull String columnName, @NotNull List path); + ColumnChunkReader getColumnChunk(@NotNull String columnName, @NotNull List defaultPath, + @Nullable List parquetColumnNamePath, @Nullable Integer fieldId); long numRows(); diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupReaderImpl.java index d9155b64fa6..25115512497 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupReaderImpl.java @@ -8,21 +8,80 @@ import org.apache.parquet.format.RowGroup; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Type.ID; +import org.apache.parquet.schema.Type.Repetition; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.net.URI; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Set; final class RowGroupReaderImpl implements RowGroupReader { + + private class ColumnHolder { + private final int fieldIndex; + private final int columnIndex; + + public ColumnHolder(int fieldIndex, int columnIndex) { + this.fieldIndex = fieldIndex; + this.columnIndex = columnIndex; + } + + String pathKey() { + return pathInSchema().toString(); + } + + Integer fieldId() { + final ID id = fieldType().getId(); + return id == null ? null : id.intValue(); + } + + ColumnChunkReaderImpl reader(String columnName) { + return new ColumnChunkReaderImpl(columnName, columnChunk(), channelsProvider, rootURI, schema, + nonRequiredFields(), numRows(), version); + } + + private Type fieldType() { + return schema.getFields().get(fieldIndex); + } + + private List pathInSchema() { + return columnChunk().getMeta_data().path_in_schema; + } + + private ColumnChunk columnChunk() { + return rowGroup.getColumns().get(columnIndex); + } + + private List nonRequiredFields() { + final List path_in_schema = pathInSchema(); + final List nonRequiredFields = new ArrayList<>(); + for (int indexInPath = 0; indexInPath < path_in_schema.size(); indexInPath++) { + Type fieldType = schema + .getType(path_in_schema.subList(0, indexInPath + 1).toArray(new String[0])); + if (fieldType.getRepetition() != Repetition.REQUIRED) { + nonRequiredFields.add(fieldType); + } + } + return nonRequiredFields; + } + + private String debugString() { + return String.format("colIx=%d, pathKey=%s, fieldId=%d", columnIndex, pathKey(), fieldId()); + } + } + private final RowGroup rowGroup; private final SeekableChannelsProvider channelsProvider; - private final MessageType type; - private final Map> schemaMap = new HashMap<>(); - private final Map chunkMap = new HashMap<>(); + private final MessageType schema; + private final Map byPath; + private final Map byFieldId; /** * If reading a single parquet file, root URI is the URI of the file, else the parent directory for a metadata file @@ -34,41 +93,83 @@ final class RowGroupReaderImpl implements RowGroupReader { @NotNull final RowGroup rowGroup, @NotNull final SeekableChannelsProvider channelsProvider, @NotNull final URI rootURI, - @NotNull final MessageType type, @NotNull final MessageType schema, @Nullable final String version) { - this.channelsProvider = channelsProvider; - this.rowGroup = rowGroup; - this.rootURI = rootURI; - this.type = type; - for (ColumnChunk column : rowGroup.columns) { - List path_in_schema = column.getMeta_data().path_in_schema; - String key = path_in_schema.toString(); - chunkMap.put(key, column); - List nonRequiredFields = new ArrayList<>(); - for (int indexInPath = 0; indexInPath < path_in_schema.size(); indexInPath++) { - Type fieldType = schema - .getType(path_in_schema.subList(0, indexInPath + 1).toArray(new String[0])); - if (fieldType.getRepetition() != Type.Repetition.REQUIRED) { - nonRequiredFields.add(fieldType); + final int fieldCount = schema.getFieldCount(); + this.channelsProvider = Objects.requireNonNull(channelsProvider); + this.rowGroup = Objects.requireNonNull(rowGroup); + this.rootURI = Objects.requireNonNull(rootURI); + this.schema = Objects.requireNonNull(schema); + byPath = new HashMap<>(fieldCount); + byFieldId = new HashMap<>(fieldCount); + // Note: there is no technical guarantee from parquet that column names, path_in_schema, or field_ids are + // unique; it's technically possible that they are duplicated. Ultimately, getColumnChunk is a bad abstraction - + // we shouldn't need to re-do matching for every single row group column chunk, the matching should be done + // _once_ per column to get the column index, and then for every row group we should just need to do + // rowGroup.getColumns().get(columnIx). + // + // Also, this logic divorced from our inference + // (io.deephaven.parquet.table.ParquetSchemaReader.readParquetSchema) + // makes it harder to keep the two in-sync. + final Set nonUniquePaths = new HashSet<>(); + final Set nonUniqueFieldIds = new HashSet<>(); + int fieldIx = 0; + int columnIx = 0; + for (final Type fieldType : schema.getFields()) { + final int totalColumns = ParquetTotalColumns.of(fieldType); + if (totalColumns == 1) { + final ColumnHolder holder = new ColumnHolder(fieldIx, columnIx); + final String key = holder.pathKey(); + final Integer fieldId = holder.fieldId(); + if (byPath.putIfAbsent(key, holder) != null) { + nonUniquePaths.add(key); + } + if (fieldId != null) { + if (byFieldId.putIfAbsent(fieldId, holder) != null) { + nonUniqueFieldIds.add(fieldId); + } } } - schemaMap.put(key, nonRequiredFields); + columnIx += totalColumns; + ++fieldIx; + } + if (columnIx != schema.getPaths().size()) { + throw new IllegalStateException( + String.format("Inconsistent column count, columnIx=%d, schema.getPaths().size()=%d", columnIx, + schema.getPaths().size())); + } + for (String nonUniquePath : nonUniquePaths) { + byPath.remove(nonUniquePath); + } + for (Integer nonUniqueFieldId : nonUniqueFieldIds) { + byFieldId.remove(nonUniqueFieldId); } this.version = version; } @Override - @Nullable - public ColumnChunkReaderImpl getColumnChunk(@NotNull final String columnName, @NotNull final List path) { - final String key = path.toString(); - final ColumnChunk columnChunk = chunkMap.get(key); - final List fieldTypes = schemaMap.get(key); - if (columnChunk == null) { - return null; + public @Nullable ColumnChunkReader getColumnChunk( + @NotNull String columnName, + @NotNull List defaultPath, + @Nullable List parquetColumnNamePath, + @Nullable Integer fieldId) { + final ColumnHolder holder; + if (fieldId == null && parquetColumnNamePath == null) { + holder = byPath.get(defaultPath.toString()); + } else { + final ColumnHolder byFieldId = fieldId == null ? null : this.byFieldId.get(fieldId); + final ColumnHolder byPath = + parquetColumnNamePath == null ? null : this.byPath.get(parquetColumnNamePath.toString()); + if (byFieldId != null && byPath != null) { + if (byFieldId != byPath) { + throw new IllegalArgumentException(String.format( + "For columnName=%s, providing an explicit parquet column name path (%s) and field id (%d) mapping, but they are resolving to different columns, byFieldId=[%s], byPath=[%s]", + columnName, parquetColumnNamePath, fieldId, byFieldId.debugString(), byPath.debugString())); + } + } + holder = byFieldId != null ? byFieldId : byPath; } - return new ColumnChunkReaderImpl(columnName, columnChunk, channelsProvider, rootURI, type, fieldTypes, - numRows(), version); + return holder == null ? null : holder.reader(columnName); } @Override diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java index 069ddd4dac6..33a5cfd79f2 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java @@ -3,24 +3,27 @@ // package io.deephaven.parquet.table; +import io.deephaven.api.util.NameValidator; import io.deephaven.base.verify.Require; import io.deephaven.configuration.Configuration; import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.table.impl.ColumnToCodecMappings; import io.deephaven.hash.KeyedObjectHashMap; import io.deephaven.hash.KeyedObjectKey; +import io.deephaven.hash.KeyedObjectKey.Basic; import io.deephaven.parquet.base.ParquetUtils; import io.deephaven.util.annotations.VisibleForTesting; import org.apache.parquet.hadoop.metadata.CompressionCodecName; -import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; +import java.util.OptionalInt; import java.util.Set; import java.util.function.Function; import java.util.function.Predicate; @@ -164,10 +167,20 @@ public final String getColumnNameFromParquetColumnNameOrDefault(final String par return (mapped != null) ? mapped : parquetColumnName; } + /** + * Returns the explicitly mapped parquet column name if set. + * + * @param columnName the Deephaven column name + * @return the parquet column name + */ + public abstract Optional getParquetColumnName(final String columnName); + public abstract String getParquetColumnNameFromColumnNameOrDefault(final String columnName); public abstract String getColumnNameFromParquetColumnName(final String parquetColumnName); + public abstract List getColumnNamesFromParquetFieldId(final int fieldId); + @Override public abstract String getCodecName(final String columnName); @@ -180,6 +193,14 @@ public final String getColumnNameFromParquetColumnNameOrDefault(final String par */ public abstract boolean useDictionary(String columnName); + /** + * The field ID for the given {@code columnName}. + * + * @param columnName the Deephaven column name + * @return the field id + */ + public abstract OptionalInt getFieldId(final String columnName); + public abstract Object getSpecialInstructions(); public abstract String getCompressionCodecName(); @@ -266,6 +287,12 @@ public static boolean sameColumnNamesAndCodecMappings(final ParquetInstructions } public static final ParquetInstructions EMPTY = new ParquetInstructions() { + + @Override + public Optional getParquetColumnName(String columnName) { + return Optional.empty(); + } + @Override public String getParquetColumnNameFromColumnNameOrDefault(final String columnName) { return columnName; @@ -277,6 +304,11 @@ public String getColumnNameFromParquetColumnName(final String parquetColumnName) return null; } + @Override + public List getColumnNamesFromParquetFieldId(int fieldId) { + return List.of(); + } + @Override @Nullable public String getCodecName(final String columnName) { @@ -294,6 +326,11 @@ public boolean useDictionary(final String columnName) { return false; } + @Override + public OptionalInt getFieldId(String columnName) { + return OptionalInt.empty(); + } + @Override @Nullable public Object getSpecialInstructions() { @@ -385,14 +422,31 @@ ParquetInstructions withIndexColumns(final Collection> indexColumns }; private static class ColumnInstructions { + + private static final KeyedObjectKey COLUMN_NAME_KEY = new Basic<>() { + @Override + public String getKey(ColumnInstructions columnInstructions) { + return columnInstructions.getColumnName(); + } + }; + + private static final KeyedObjectKey PARQUET_COLUMN_NAME_KEY = new Basic<>() { + @Override + public String getKey(ColumnInstructions columnInstructions) { + return columnInstructions.getParquetColumnName(); + } + }; + private final String columnName; private String parquetColumnName; private String codecName; private String codecArgs; private boolean useDictionary; + private Integer fieldId; public ColumnInstructions(final String columnName) { - this.columnName = columnName; + this.columnName = Objects.requireNonNull(columnName); + NameValidator.validateColumnName(columnName); } public String getColumnName() { @@ -403,7 +457,17 @@ public String getParquetColumnName() { return parquetColumnName != null ? parquetColumnName : columnName; } + public Optional getParquetColumnNameOpt() { + return Optional.ofNullable(parquetColumnName); + } + public ColumnInstructions setParquetColumnName(final String parquetColumnName) { + if (this.parquetColumnName != null && !this.parquetColumnName.equals(parquetColumnName)) { + throw new IllegalArgumentException( + "Cannot add a mapping from parquetColumnName=" + parquetColumnName + + ": columnName=" + columnName + " already mapped to parquetColumnName=" + + this.parquetColumnName); + } this.parquetColumnName = parquetColumnName; return this; } @@ -433,6 +497,19 @@ public boolean useDictionary() { public void useDictionary(final boolean useDictionary) { this.useDictionary = useDictionary; } + + public OptionalInt fieldId() { + return fieldId == null ? OptionalInt.empty() : OptionalInt.of(fieldId); + } + + public void setFieldId(final int fieldId) { + if (this.fieldId != null && this.fieldId != fieldId) { + throw new IllegalArgumentException( + String.format("Inconsistent fieldId for columnName=%s, already set fieldId=%d", columnName, + this.fieldId)); + } + this.fieldId = fieldId; + } } private static final class ReadOnly extends ParquetInstructions { @@ -490,8 +567,8 @@ private ReadOnly( .collect(Collectors.toUnmodifiableList()); } - private String getOrDefault(final String columnName, final String defaultValue, - final Function fun) { + private T getOrDefault(final String columnName, final T defaultValue, + final Function fun) { if (columnNameToInstructions == null) { return defaultValue; } @@ -514,6 +591,11 @@ private boolean getOrDefault(final String columnName, final boolean defaultValue return fun.test(ci); } + @Override + public Optional getParquetColumnName(String columnName) { + return getOrDefault(columnName, Optional.empty(), ColumnInstructions::getParquetColumnNameOpt); + } + @Override public String getParquetColumnNameFromColumnNameOrDefault(final String columnName) { return getOrDefault(columnName, columnName, ColumnInstructions::getParquetColumnName); @@ -531,6 +613,21 @@ public String getColumnNameFromParquetColumnName(final String parquetColumnName) return ci.getColumnName(); } + @Override + public List getColumnNamesFromParquetFieldId(int fieldId) { + if (columnNameToInstructions == null) { + return List.of(); + } + final List out = new ArrayList<>(); + for (Entry e : columnNameToInstructions.entrySet()) { + final OptionalInt parquetFieldId = e.getValue().fieldId(); + if (parquetFieldId.isPresent() && parquetFieldId.getAsInt() == fieldId) { + out.add(e.getKey()); + } + } + return out; + } + @Override public String getCodecName(final String columnName) { return getOrDefault(columnName, null, ColumnInstructions::getCodecName); @@ -546,6 +643,11 @@ public boolean useDictionary(final String columnName) { return getOrDefault(columnName, false, ColumnInstructions::useDictionary); } + @Override + public OptionalInt getFieldId(String columnName) { + return getOrDefault(columnName, OptionalInt.empty(), ColumnInstructions::fieldId); + } + @Override public String getCompressionCodecName() { return compressionCodecName; @@ -722,75 +824,22 @@ public Builder(final ParquetInstructions parquetInstructions) { indexColumns = readOnlyParquetInstructions.getIndexColumns().orElse(null); } - private void newColumnNameToInstructionsMap() { - columnNameToInstructions = new KeyedObjectHashMap<>(new KeyedObjectKey.Basic<>() { - @Override - public String getKey(@NotNull final ColumnInstructions value) { - return value.getColumnName(); - } - }); - } - - private void newParquetColumnNameToInstructionsMap() { - parquetColumnNameToInstructions = - new KeyedObjectHashMap<>(new KeyedObjectKey.Basic<>() { - @Override - public String getKey(@NotNull final ColumnInstructions value) { - return value.getParquetColumnName(); - } - }); - } - public Builder addColumnNameMapping(final String parquetColumnName, final String columnName) { - if (parquetColumnName.equals(columnName)) { - return this; - } - if (columnNameToInstructions == null) { - newColumnNameToInstructionsMap(); - final ColumnInstructions ci = new ColumnInstructions(columnName); - ci.setParquetColumnName(parquetColumnName); - columnNameToInstructions.put(columnName, ci); - newParquetColumnNameToInstructionsMap(); - parquetColumnNameToInstructions.put(parquetColumnName, ci); - return this; - } - - ColumnInstructions ci = columnNameToInstructions.get(columnName); - if (ci != null) { - if (ci.parquetColumnName != null) { - if (ci.parquetColumnName.equals(parquetColumnName)) { - return this; - } - throw new IllegalArgumentException( - "Cannot add a mapping from parquetColumnName=" + parquetColumnName - + ": columnName=" + columnName + " already mapped to parquetColumnName=" - + ci.parquetColumnName); - } - } else { - ci = new ColumnInstructions(columnName); - columnNameToInstructions.put(columnName, ci); - } - + final ColumnInstructions ci = getOrCreateColumnInstructions(columnName); + ci.setParquetColumnName(parquetColumnName); if (parquetColumnNameToInstructions == null) { - newParquetColumnNameToInstructionsMap(); - parquetColumnNameToInstructions.put(parquetColumnName, ci); - return this; + parquetColumnNameToInstructions = new KeyedObjectHashMap<>(ColumnInstructions.PARQUET_COLUMN_NAME_KEY); } - - final ColumnInstructions fromParquetColumnNameInstructions = - parquetColumnNameToInstructions.get(parquetColumnName); - if (fromParquetColumnNameInstructions != null) { - if (fromParquetColumnNameInstructions == ci) { - return this; - } + final ColumnInstructions existing = parquetColumnNameToInstructions.putIfAbsent(parquetColumnName, ci); + if (existing != null) { + // Note: this is a limitation that doesn't need to exist. Technically, we could allow a single physical + // parquet column to manifest as multiple Deephaven columns. throw new IllegalArgumentException( "Cannot add new mapping from parquetColumnName=" + parquetColumnName + " to columnName=" + columnName + ": already mapped to columnName=" - + fromParquetColumnNameInstructions.getColumnName()); + + existing.getColumnName()); } - ci.setParquetColumnName(parquetColumnName); - parquetColumnNameToInstructions.put(parquetColumnName, ci); return this; } @@ -803,7 +852,7 @@ public Builder addColumnCodec(final String columnName, final String codecName) { } public Builder addColumnCodec(final String columnName, final String codecName, final String codecArgs) { - final ColumnInstructions ci = getColumnInstructions(columnName); + final ColumnInstructions ci = getOrCreateColumnInstructions(columnName); ci.setCodecName(codecName); ci.setCodecArgs(codecArgs); return this; @@ -817,21 +866,53 @@ public Builder addColumnCodec(final String columnName, final String codecName, f * @param useDictionary The hint value */ public Builder useDictionary(final String columnName, final boolean useDictionary) { - final ColumnInstructions ci = getColumnInstructions(columnName); + final ColumnInstructions ci = getOrCreateColumnInstructions(columnName); ci.useDictionary(useDictionary); return this; } - private ColumnInstructions getColumnInstructions(final String columnName) { - final ColumnInstructions ci; + /** + * For reading, provides a mapping between a Deephaven column name and a parquet column by field id. This allows + * resolving a parquet column where the physical "parquet column name" may not be known apriori by the caller. + * In the case where both a field id mapping and a parquet colum name mapping is provided, the field id will + * take precedence over the parquet column name. This may happen in cases where the parquet file is managed by a + * higher-level schema that has the concept of a "field id"; for example, Iceberg. As documented + * in the parquet format: + * + *
+         *     When the original schema supports field ids, this will save the original field id in the parquet schema
+         * 
+ * + * In the case where a field id mapping is provided but no matching parquet column is found, the column will not + * be inferred; and in the case where it's explicitly included as part of a + * {@link #setTableDefinition(TableDefinition)}, the resulting column will contain the appropriate default + * ({@code null}) values. In the case where there are multiple parquet columns with the same field_id, those + * parquet columns will not be resolvable via a field id. + * + *

+ * For writing, this will set the {@code field_id} in the proper Parquet {@code SchemaElement}. + * + *

+ * Setting multiple field ids for a single column name is not allowed. + * + *

+ * Field ids are not typically configured by end users. + * + * @param columnName the Deephaven column name + * @param fieldId the field id + */ + public Builder setFieldId(final String columnName, final int fieldId) { + final ColumnInstructions ci = getOrCreateColumnInstructions(columnName); + ci.setFieldId(fieldId); + return this; + } + + private ColumnInstructions getOrCreateColumnInstructions(final String columnName) { if (columnNameToInstructions == null) { - newColumnNameToInstructionsMap(); - ci = new ColumnInstructions(columnName); - columnNameToInstructions.put(columnName, ci); - } else { - ci = columnNameToInstructions.putIfAbsent(columnName, ColumnInstructions::new); + columnNameToInstructions = new KeyedObjectHashMap<>(ColumnInstructions.COLUMN_NAME_KEY); } - return ci; + return columnNameToInstructions.putIfAbsent(columnName, ColumnInstructions::new); } public Builder setCompressionCodecName(final String compressionCodecName) { diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java index dc99271414b..ebbec45e1dd 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java @@ -8,6 +8,7 @@ import io.deephaven.base.ClassUtil; import io.deephaven.base.Pair; import io.deephaven.engine.table.ColumnDefinition; +import io.deephaven.parquet.base.ParquetTotalColumns; import io.deephaven.stringset.StringSet; import io.deephaven.engine.table.impl.locations.TableDataException; import io.deephaven.parquet.table.metadata.CodecInfo; @@ -32,6 +33,8 @@ import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Type.ID; import org.jetbrains.annotations.NotNull; import java.io.IOException; @@ -43,7 +46,9 @@ import java.time.LocalTime; import java.util.*; import java.util.function.BiFunction; +import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import static io.deephaven.parquet.base.ParquetUtils.METADATA_KEY; @@ -142,34 +147,61 @@ public static ParquetInstructions readParquetSchema( return instructionsBuilder.getValue(); }; final ParquetMessageDefinition colDef = new ParquetMessageDefinition(); - final Map parquetColumnNameToFirstPath = new HashMap<>(); - for (final ColumnDescriptor column : schema.getColumns()) { + final Map topLevelFieldIdCount = schema.getFields() + .stream() + .map(Type::getId) + .filter(Objects::nonNull) + .map(ID::intValue) + .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); + int columnIx = 0; + final List columnDescriptors = schema.getColumns(); + for (final Type fieldType : schema.getFields()) { + final int numColumns = ParquetTotalColumns.of(fieldType); + if (numColumns > 1) { + // TODO(deephaven-core#871): Parquet: Support repetition level >1 and multi-column fields + throw new UnsupportedOperationException( + String.format("Encountered unsupported multi-column field %s, has %d total columns", + fieldType.getName(), numColumns)); + } + final ColumnDescriptor column = columnDescriptors.get(columnIx); if (column.getMaxRepetitionLevel() > 1) { - // TODO (https://github.com/deephaven/deephaven-core/issues/871): Support this + // TODO(deephaven-core#871): Parquet: Support repetition level >1 and multi-column fields throw new UnsupportedOperationException("Unsupported maximum repetition level " + column.getMaxRepetitionLevel() + " in column " + String.join("/", column.getPath())); } - + columnIx += numColumns; colDef.reset(); currentColumn.setValue(column); final PrimitiveType primitiveType = column.getPrimitiveType(); final LogicalTypeAnnotation logicalTypeAnnotation = primitiveType.getLogicalTypeAnnotation(); - + // Note: we are taking the id from the fieldType which is not equivalent to the primitiveType field id (ie, + // in the case of repeated types). + final ID fieldId = fieldType.getId(); final String parquetColumnName = column.getPath()[0]; - parquetColumnNameToFirstPath.compute(parquetColumnName, (final String pcn, final String[] oldPath) -> { - if (oldPath != null) { - // TODO (https://github.com/deephaven/deephaven-core/issues/871): Support this - throw new UnsupportedOperationException("Encountered unsupported multi-column field " - + parquetColumnName + ": found columns " + String.join("/", oldPath) + " and " - + String.join("/", column.getPath())); - } - return column.getPath(); - }); final String colName; - final String mappedName = readInstructions.getColumnNameFromParquetColumnName(parquetColumnName); - if (mappedName != null) { - colName = mappedName; - } else { + COL_NAME: { + FIELD_ID: if (fieldId != null) { + if (topLevelFieldIdCount.getOrDefault(fieldId.intValue(), 0L) > 1) { + // This file has multiple entries for fieldId; don't match against it for field ids. + break FIELD_ID; + } + final List columnNames = + readInstructions.getColumnNamesFromParquetFieldId(fieldId.intValue()); + if (columnNames.size() == 1) { + colName = columnNames.get(0); + break COL_NAME; + } else if (columnNames.size() > 1) { + // This limitation could likely be removed with a more thorough refactoring of the parquet code. + throw new IllegalArgumentException(String.format( + "Non-unique Field ID mapping provided; unable to infer TableDefinition for fieldId=%d, parquetColumnName=%s", + fieldId.intValue(), parquetColumnName)); + } + } + final String mappedName = readInstructions.getColumnNameFromParquetColumnName(parquetColumnName); + if (mappedName != null) { + colName = mappedName; + break COL_NAME; + } final String legalized = legalizeColumnNameFunc.apply( parquetColumnName, (instructionsBuilder.getValue() == null) @@ -266,6 +298,9 @@ public static ParquetInstructions readParquetSchema( } consumer.accept(colDef); } + if (columnIx != columnDescriptors.size()) { + throw new IllegalStateException("Not proper size"); + } return (instructionsBuilder.getValue() == null) ? readInstructions : instructionsBuilder.getValue().build(); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java index b95dfe98412..d380b953353 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java @@ -13,11 +13,14 @@ import io.deephaven.util.codec.SerializableCodec; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; +import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Type.Repetition; import org.apache.parquet.schema.Types; +import org.apache.parquet.schema.Types.GroupBuilder; import org.apache.parquet.schema.Types.PrimitiveBuilder; import org.jetbrains.annotations.NotNull; @@ -475,9 +478,12 @@ default Type createSchemaType( isRepeating = false; } if (!isRepeating) { + instructions.getFieldId(columnDefinition.getName()).ifPresent(builder::id); return builder.named(parquetColumnName); } - return Types.buildGroup(Type.Repetition.OPTIONAL).addField( + final GroupBuilder groupBuilder = Types.buildGroup(Repetition.OPTIONAL); + instructions.getFieldId(columnDefinition.getName()).ifPresent(groupBuilder::id); + return groupBuilder.addField( Types.buildGroup(Type.Repetition.REPEATED).addField( builder.named("item")).named(parquetColumnName)) .as(LogicalTypeAnnotation.listType()).named(parquetColumnName); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java index 4ddebb33685..6224b33d7f7 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java @@ -76,19 +76,24 @@ final class ParquetColumnLocation extends AbstractColumnLoc * Construct a new {@link ParquetColumnLocation} for the specified {@link ParquetTableLocation} and column name. * * @param tableLocation The table location enclosing this column location - * @param parquetColumnName The Parquet file column name + * @param columnName The Deephaven column name + * @param parquetColumnName The Parquet file column name if explicitly set * @param columnChunkReaders The {@link ColumnChunkReader column chunk readers} for this location */ ParquetColumnLocation( @NotNull final ParquetTableLocation tableLocation, @NotNull final String columnName, - @NotNull final String parquetColumnName, + @Nullable final String parquetColumnName, @Nullable final ColumnChunkReader[] columnChunkReaders) { super(tableLocation, columnName); this.parquetColumnName = parquetColumnName; this.columnChunkReaders = columnChunkReaders; } + private String parquetColumnNameOrDefault() { + return parquetColumnName != null ? parquetColumnName : getName(); + } + private PageCache ensurePageCache() { PageCache localPageCache; if ((localPageCache = pageCache) != null) { @@ -311,8 +316,8 @@ private void fetchValues(@NotNull final ColumnDefinition columnDefinition) { ensurePageCache(), columnChunkReader, tl().getRegionParameters().regionMask, - makeToPage(tl().getColumnTypes().get(parquetColumnName), - tl().getReadInstructions(), parquetColumnName, columnChunkReader, + makeToPage(tl().getColumnTypes().get(parquetColumnNameOrDefault()), + tl().getReadInstructions(), parquetColumnNameOrDefault(), columnChunkReader, columnDefinition), columnDefinition); pageStores[psi] = creatorResult.pageStore; diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java index b782c8488c6..d4e7fd1f363 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java @@ -181,15 +181,29 @@ public List getSortedColumns() { @Override @NotNull protected ColumnLocation makeColumnLocation(@NotNull final String columnName) { - final String parquetColumnName = readInstructions.getParquetColumnNameFromColumnNameOrDefault(columnName); - final String[] columnPath = parquetColumnNameToPath.get(parquetColumnName); - final List nameList = - columnPath == null ? Collections.singletonList(parquetColumnName) : Arrays.asList(columnPath); + final OptionalInt fid = readInstructions.getFieldId(columnName); + final Integer fieldId = fid.isPresent() ? fid.getAsInt() : null; + final List defaultPath; + { + final String[] path = parquetColumnNameToPath.get(columnName); + defaultPath = path == null ? List.of(columnName) : List.of(path); + } + final List parquetPath; + final String parquetColumnName; + { + parquetColumnName = readInstructions.getParquetColumnName(columnName).orElse(null); + if (parquetColumnName == null) { + parquetPath = null; + } else { + final String[] path = parquetColumnNameToPath.get(parquetColumnName); + parquetPath = path == null ? List.of(parquetColumnName) : List.of(path); + } + } final ColumnChunkReader[] columnChunkReaders = Arrays.stream(getRowGroupReaders()) - .map(rgr -> rgr.getColumnChunk(columnName, nameList)).toArray(ColumnChunkReader[]::new); + .map(rgr -> rgr.getColumnChunk(columnName, defaultPath, parquetPath, fieldId)) + .toArray(ColumnChunkReader[]::new); final boolean exists = Arrays.stream(columnChunkReaders).anyMatch(ccr -> ccr != null && ccr.numRows() > 0); - return new ParquetColumnLocation<>(this, columnName, parquetColumnName, - exists ? columnChunkReaders : null); + return new ParquetColumnLocation<>(this, columnName, parquetColumnName, exists ? columnChunkReaders : null); } private RowSet computeIndex() { diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetInstructionsTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetInstructionsTest.java new file mode 100644 index 00000000000..08f87ddf263 --- /dev/null +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetInstructionsTest.java @@ -0,0 +1,132 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.parquet.table; + +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown; + +public class ParquetInstructionsTest { + + @Test + public void setFieldId() { + final ParquetInstructions instructions = ParquetInstructions.builder() + .setFieldId("Foo", 42) + .setFieldId("Bar", 99) + .setFieldId("Baz", 99) + .build(); + + assertThat(instructions.getFieldId("Foo")).hasValue(42); + assertThat(instructions.getFieldId("Bar")).hasValue(99); + assertThat(instructions.getFieldId("Baz")).hasValue(99); + assertThat(instructions.getFieldId("Zap")).isEmpty(); + + assertThat(instructions.getColumnNamesFromParquetFieldId(42)).containsExactly("Foo"); + assertThat(instructions.getColumnNamesFromParquetFieldId(99)).containsExactly("Bar", "Baz"); + assertThat(instructions.getColumnNamesFromParquetFieldId(100)).isEmpty(); + } + + @Test + public void setFieldIdAlreadySet() { + + // Setting the same fieldId on a given column name is "ok" if it's the same value, this is to be more consistent + // with how addColumnNameMapping works. + { + final ParquetInstructions instructions = ParquetInstructions.builder() + .setFieldId("Foo", 42) + .setFieldId("Foo", 42) + .build(); + assertThat(instructions.getFieldId("Foo")).hasValue(42); + } + + try { + ParquetInstructions.builder() + .setFieldId("Foo", 42) + .setFieldId("Foo", 43) + .build(); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessage("Inconsistent fieldId for columnName=Foo, already set fieldId=42"); + } + } + + @Test + public void setFieldBadName() { + try { + ParquetInstructions.builder() + .setFieldId("Not a legal column name", 42) + .build(); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessageContaining("Invalid column name"); + } + } + + @Test + public void addColumnNameMapping() { + final ParquetInstructions instructions = ParquetInstructions.builder() + .addColumnNameMapping("Foo", "Foo") + .addColumnNameMapping("PARQUET COLUMN 2!", "Bar") + .addColumnNameMapping("ParquetColumn3", "Baz") + .build(); + + assertThat(instructions.getColumnNameFromParquetColumnName("Foo")).isEqualTo("Foo"); + assertThat(instructions.getColumnNameFromParquetColumnName("PARQUET COLUMN 2!")).isEqualTo("Bar"); + assertThat(instructions.getColumnNameFromParquetColumnName("ParquetColumn3")).isEqualTo("Baz"); + assertThat(instructions.getColumnNameFromParquetColumnName("Does Not Exist")).isNull(); + + assertThat(instructions.getParquetColumnName("Foo")).hasValue("Foo"); + assertThat(instructions.getParquetColumnName("Bar")).hasValue("PARQUET COLUMN 2!"); + assertThat(instructions.getParquetColumnName("Baz")).hasValue("ParquetColumn3"); + assertThat(instructions.getParquetColumnName("Zap")).isEmpty(); + + assertThat(instructions.getParquetColumnNameFromColumnNameOrDefault("Foo")).isEqualTo("Foo"); + assertThat(instructions.getParquetColumnNameFromColumnNameOrDefault("Bar")).isEqualTo("PARQUET COLUMN 2!"); + assertThat(instructions.getParquetColumnNameFromColumnNameOrDefault("Baz")).isEqualTo("ParquetColumn3"); + assertThat(instructions.getParquetColumnNameFromColumnNameOrDefault("Zap")).isEqualTo("Zap"); + } + + @Test + public void addColumnNameMappingMultipleParquetColumnsToSameDeephavenColumn() { + try { + ParquetInstructions.builder() + .addColumnNameMapping("ParquetColumn1", "Foo") + .addColumnNameMapping("ParquetColumn2", "Foo") + .build(); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessage( + "Cannot add a mapping from parquetColumnName=ParquetColumn2: columnName=Foo already mapped to parquetColumnName=ParquetColumn1"); + } + } + + @Test + public void addColumnNameMappingSameParquetColumnToMultipleDeephavenColumns() { + // Note: this is a limitation that doesn't need to exist. Technically, we could allow a single physical + // parquet column to manifest as multiple Deephaven columns. + try { + ParquetInstructions.builder() + .addColumnNameMapping("ParquetColumn1", "Foo") + .addColumnNameMapping("ParquetColumn1", "Bar") + .build(); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessage( + "Cannot add new mapping from parquetColumnName=ParquetColumn1 to columnName=Bar: already mapped to columnName=Foo"); + } + } + + @Test + public void addColumnNameMappingBadName() { + try { + ParquetInstructions.builder() + .addColumnNameMapping("SomeParquetColumnName", "Not a legal column name") + .build(); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessageContaining("Invalid column name"); + } + } +} diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java index d096669b192..90e3b75b753 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java @@ -13,17 +13,34 @@ import io.deephaven.engine.table.impl.UncoalescedTable; import io.deephaven.engine.table.impl.indexer.DataIndexer; import io.deephaven.engine.table.impl.locations.TableDataException; +import io.deephaven.engine.table.impl.util.ColumnHolder; import io.deephaven.engine.table.vectors.ColumnVectors; import io.deephaven.engine.testutil.junit4.EngineCleanup; import io.deephaven.engine.util.TableTools; import io.deephaven.parquet.base.InvalidParquetFileException; import io.deephaven.parquet.table.layout.ParquetKeyValuePartitionedLayout; +import io.deephaven.qst.type.Type; import io.deephaven.stringset.HashStringSet; import io.deephaven.stringset.StringSet; import io.deephaven.time.DateTimeUtils; -import io.deephaven.vector.*; +import io.deephaven.util.QueryConstants; +import io.deephaven.vector.DoubleVector; +import io.deephaven.vector.DoubleVectorDirect; +import io.deephaven.vector.FloatVector; +import io.deephaven.vector.FloatVectorDirect; +import io.deephaven.vector.IntVector; +import io.deephaven.vector.IntVectorDirect; +import io.deephaven.vector.LongVector; +import io.deephaven.vector.LongVectorDirect; +import io.deephaven.vector.ObjectVector; +import io.deephaven.vector.ObjectVectorDirect; import junit.framework.TestCase; -import org.junit.*; +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; import java.io.File; import java.io.IOException; @@ -34,13 +51,21 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.UUID; import java.util.function.Function; import java.util.stream.IntStream; import java.util.stream.LongStream; import static io.deephaven.engine.testutil.TstUtils.assertTableEquals; import static io.deephaven.engine.testutil.TstUtils.tableRangesAreEqual; -import static io.deephaven.engine.util.TableTools.*; +import static io.deephaven.engine.util.TableTools.col; +import static io.deephaven.engine.util.TableTools.doubleCol; +import static io.deephaven.engine.util.TableTools.emptyTable; +import static io.deephaven.engine.util.TableTools.intCol; +import static io.deephaven.engine.util.TableTools.longCol; +import static io.deephaven.engine.util.TableTools.newTable; +import static io.deephaven.engine.util.TableTools.shortCol; +import static io.deephaven.engine.util.TableTools.stringCol; import static org.junit.Assert.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -580,4 +605,563 @@ public void testNoDictionaryOffset() { DateTimeUtils.epochNanos((Instant) withNullsAndMissingOffsets.getColumnSource("CREATE_DATE").get(0))); assertTableEquals(withNullsAndMissingOffsets, clean); } + + /** + * This data was generated via the script: + * + *

+     * import pyarrow as pa
+     * import pyarrow.parquet as pq
+     *
+     * field_id = b"PARQUET:field_id"
+     * fields = [
+     *     pa.field(
+     *         "e0cf7927-45dc-4dfc-b4ef-36bf4b6ae463", pa.int64(), metadata={field_id: b"0"}
+     *     ),
+     *     pa.field(
+     *         "53f0de5a-e06f-476e-b82a-a3f9294fcd05", pa.string(), metadata={field_id: b"1"}
+     *     ),
+     * ]
+     * table = pa.table([[99, 101], ["Foo", "Bar"]], schema=pa.schema(fields))
+     * pq.write_table(table, "ReferenceSimpleParquetFieldIds.parquet")
+     * 
+ * + * @see Arrow Parquet field_id + */ + @Test + public void testParquetFieldIds() { + final String file = TestParquetTools.class.getResource("/ReferenceSimpleParquetFieldIds.parquet").getFile(); + + // No instructions; will sanitize the names. Both columns get the "-" removed and the second column gets the + // "column_" prefix added because it starts with a digit. + { + final TableDefinition expectedInferredTD = TableDefinition.of( + ColumnDefinition.ofLong("e0cf792745dc4dfcb4ef36bf4b6ae463"), + ColumnDefinition.ofString("column_53f0de5ae06f476eb82aa3f9294fcd05")); + final Table table = ParquetTools.readTable(file); + assertEquals(expectedInferredTD, table.getDefinition()); + assertTableEquals(newTable(expectedInferredTD, + longCol("e0cf792745dc4dfcb4ef36bf4b6ae463", 99, 101), + stringCol("column_53f0de5ae06f476eb82aa3f9294fcd05", "Foo", "Bar")), table); + } + + final int BAZ_ID = 0; + final int ZAP_ID = 1; + final String BAZ = "Baz"; + final String ZAP = "Zap"; + final ColumnDefinition bazCol = ColumnDefinition.ofLong(BAZ); + final ColumnDefinition zapCol = ColumnDefinition.ofString(ZAP); + + final ParquetInstructions instructions = ParquetInstructions.builder() + .setFieldId(BAZ, BAZ_ID) + .setFieldId(ZAP, ZAP_ID) + .build(); + + final TableDefinition td = TableDefinition.of(bazCol, zapCol); + + // It's enough to just provide the mapping based on field_id + { + final Table table = ParquetTools.readTable(file, instructions); + assertEquals(td, table.getDefinition()); + assertTableEquals(newTable(td, + longCol(BAZ, 99, 101), + stringCol(ZAP, "Foo", "Bar")), table); + } + + // But, the user can still provide a TableDefinition + { + final Table table = ParquetTools.readTable(file, instructions.withTableDefinition(td)); + assertEquals(td, table.getDefinition()); + assertTableEquals(newTable(td, + longCol(BAZ, 99, 101), + stringCol(ZAP, "Foo", "Bar")), table); + } + + // The user can provide the full mapping, but still a more limited definition + { + final TableDefinition justIdTD = TableDefinition.of(bazCol); + final Table table = ParquetTools.readTable(file, instructions.withTableDefinition(justIdTD)); + assertEquals(justIdTD, table.getDefinition()); + assertTableEquals(newTable(justIdTD, + longCol(BAZ, 99, 101)), table); + } + + // If only a partial id mapping is provided, only that will be "properly" mapped + { + final TableDefinition partialTD = TableDefinition.of( + ColumnDefinition.ofLong(BAZ), + ColumnDefinition.ofString("column_53f0de5ae06f476eb82aa3f9294fcd05")); + final ParquetInstructions partialInstructions = ParquetInstructions.builder() + .setFieldId(BAZ, BAZ_ID) + .build(); + final Table table = ParquetTools.readTable(file, partialInstructions); + assertEquals(partialTD, table.getDefinition()); + } + + // There are no errors if a field ID is configured but not found; it won't be inferred. + { + final Table table = ParquetTools.readTable(file, ParquetInstructions.builder() + .setFieldId(BAZ, BAZ_ID) + .setFieldId(ZAP, ZAP_ID) + .setFieldId("Fake", 99) + .build()); + assertEquals(td, table.getDefinition()); + assertTableEquals(newTable(td, + longCol(BAZ, 99, 101), + stringCol(ZAP, "Foo", "Bar")), table); + } + + // If it's explicitly asked for, like other columns, it will return an appropriate null value + { + final TableDefinition tdWithFake = + TableDefinition.of(bazCol, zapCol, ColumnDefinition.ofShort("Fake")); + final Table table = ParquetTools.readTable(file, ParquetInstructions.builder() + .setFieldId(BAZ, BAZ_ID) + .setFieldId(ZAP, ZAP_ID) + .setFieldId("Fake", 99) + .build() + .withTableDefinition(tdWithFake)); + assertEquals(tdWithFake, table.getDefinition()); + assertTableEquals(newTable(tdWithFake, + longCol(BAZ, 99, 101), + stringCol(ZAP, "Foo", "Bar"), + shortCol("Fake", QueryConstants.NULL_SHORT, QueryConstants.NULL_SHORT)), table); + } + + // You can even re-use IDs to get the same physical column out multiple times + { + final String BAZ_DUPE = "BazDupe"; + final TableDefinition dupeTd = + TableDefinition.of(bazCol, zapCol, ColumnDefinition.ofLong(BAZ_DUPE)); + final ParquetInstructions dupeInstructions = ParquetInstructions.builder() + .setFieldId(BAZ, BAZ_ID) + .setFieldId(ZAP, ZAP_ID) + .setFieldId(BAZ_DUPE, BAZ_ID) + .build(); + { + final Table table = ParquetTools.readTable(file, dupeInstructions.withTableDefinition(dupeTd)); + assertEquals(dupeTd, table.getDefinition()); + assertTableEquals(newTable(dupeTd, + longCol(BAZ, 99, 101), + stringCol(ZAP, "Foo", "Bar"), + longCol(BAZ_DUPE, 99, 101)), table); + } + + // In the case where we have dupe field IDs and don't provide an explicit definition, we are preferring to + // fail during the inference step + { + try { + ParquetTools.readTable(file, dupeInstructions); + Assertions.failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + Assertions.assertThat(e).hasMessageContaining("Non-unique Field ID mapping provided"); + } + } + } + + // If both a field id and parquet column name mapping is provided, they need to map to the same parquet column. + { + final TableDefinition bazTd = TableDefinition.of(bazCol); + final ParquetInstructions inconsistent = ParquetInstructions.builder() + .setFieldId(BAZ, BAZ_ID) + .addColumnNameMapping("53f0de5a-e06f-476e-b82a-a3f9294fcd05", BAZ) + .build(); + // In the case where we are inferring the TableDefinition from parquet schema, the inconsistency will be + // noticed up front + try { + ParquetTools.readTable(file, inconsistent); + Assertions.failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + Assertions.assertThat(e) + .hasMessageContaining("Supplied ColumnDefinitions include duplicate names [Baz]"); + } + // In the case where we provide a TableDefinition, the inconsistency will be noticed when reading the + // data + try { + // Need to force read of data + ParquetTools.readTable(file, inconsistent.withTableDefinition(bazTd)).select(); + Assertions.failBecauseExceptionWasNotThrown(TableDataException.class); + } catch (TableDataException e) { + Assertions.assertThat(e).getRootCause().hasMessageContaining( + "For columnName=Baz, providing an explicit parquet column name path ([53f0de5a-e06f-476e-b82a-a3f9294fcd05]) and field id (0) mapping, but they are resolving to different columns, byFieldId=[colIx=0, pathKey=[e0cf7927-45dc-4dfc-b4ef-36bf4b6ae463], fieldId=0], byPath=[colIx=1, pathKey=[53f0de5a-e06f-476e-b82a-a3f9294fcd05], fieldId=1]"); + } + } + } + + /** + * This data was generated via the script: + * + *
+     * import uuid
+     * import pyarrow as pa
+     * import pyarrow.parquet as pq
+     *
+     *
+     * def write_to(path: str):
+     *     field_id = b"PARQUET:field_id"
+     *     fields = [
+     *         pa.field(str(uuid.uuid4()), pa.int64(), metadata={field_id: b"42"}),
+     *         pa.field(str(uuid.uuid4()), pa.string(), metadata={field_id: b"43"}),
+     *     ]
+     *     table = pa.table([[] for _ in fields], schema=pa.schema(fields))
+     *     pq.write_table(table, path)
+     *
+     *
+     * write_to("/ReferencePartitionedFieldIds/Partition=0/table.parquet")
+     * write_to("/ReferencePartitionedFieldIds/Partition=1/table.parquet")
+     * 
+ * + * It mimics the case of a higher-level schema management where the physical column names may be random. + * + * @see Arrow Parquet field_id + */ + @Test + public void testPartitionedParquetFieldIds() { + final String file = TestParquetTools.class.getResource("/ReferencePartitionedFieldIds").getFile(); + + final int BAZ_ID = 42; + final int ZAP_ID = 43; + final String BAZ = "Baz"; + final String ZAP = "Zap"; + final String PARTITION = "Partition"; + final ColumnDefinition partitionColumn = ColumnDefinition.ofInt(PARTITION).withPartitioning(); + final ColumnDefinition bazCol = ColumnDefinition.ofLong(BAZ); + final ColumnDefinition zapCol = ColumnDefinition.ofString(ZAP); + final ParquetInstructions instructions = ParquetInstructions.builder() + .setFieldId(BAZ, BAZ_ID) + .setFieldId(ZAP, ZAP_ID) + .build(); + + final TableDefinition expectedTd = TableDefinition.of(partitionColumn, bazCol, zapCol); + + final Table expected = newTable(expectedTd, + intCol(PARTITION, 0, 0, 1, 1), + longCol(BAZ, 99, 101, 99, 101), + stringCol(ZAP, "Foo", "Bar", "Foo", "Bar")); + + { + final Table actual = ParquetTools.readTable(file, instructions); + assertEquals(expectedTd, actual.getDefinition()); + assertTableEquals(expected, actual); + } + + { + final Table actual = ParquetTools.readTable(file, instructions.withTableDefinition(expectedTd)); + assertEquals(expectedTd, actual.getDefinition()); + assertTableEquals(expected, actual); + } + } + + /** + * This data was generated via the script: + * + *
+     * import pyarrow as pa
+     * import pyarrow.parquet as pq
+     *
+     * field_id = b"PARQUET:field_id"
+     * schema = pa.schema(
+     *     [pa.field("some random name", pa.list_(pa.int32()), metadata={field_id: b"999"})]
+     * )
+     * data = [pa.array([[1, 2, 3], None, [], [42]], type=pa.list_(pa.int32()))]
+     * table = pa.Table.from_arrays(data, schema=schema)
+     * pq.write_table(table, "ReferenceListParquetFieldIds.parquet")
+     * 
+ * + * @see Arrow Parquet field_id + */ + @Test + public void testParquetFieldIdsWithListType() { + final String file = TestParquetTools.class.getResource("/ReferenceListParquetFieldIds.parquet").getFile(); + final String FOO = "Foo"; + final TableDefinition td = TableDefinition.of(ColumnDefinition.of(FOO, Type.intType().arrayType())); + final ParquetInstructions instructions = ParquetInstructions.builder() + .setFieldId(FOO, 999) + .build(); + final Table expected = TableTools.newTable(td, new ColumnHolder<>(FOO, int[].class, int.class, false, + new int[] {1, 2, 3}, + null, + new int[0], + new int[] {42})); + { + final Table actual = ParquetTools.readTable(file, instructions); + assertEquals(td, actual.getDefinition()); + assertTableEquals(expected, actual); + } + { + final Table actual = ParquetTools.readTable(file, instructions.withTableDefinition(td)); + assertEquals(td, actual.getDefinition()); + assertTableEquals(expected, actual); + } + } + + @Test + public void testWriteParquetFieldIds() { + final int BAZ_ID = 111; + final int ZAP_ID = 112; + final String BAZ = "Baz"; + final String ZAP = "Zap"; + final ColumnDefinition bazCol = ColumnDefinition.ofLong(BAZ); + final ColumnDefinition zapCol = ColumnDefinition.of(ZAP, Type.stringType().arrayType()); + final TableDefinition td = TableDefinition.of(bazCol, zapCol); + final Table expected = newTable(td, + longCol(BAZ, 99, 101), + new ColumnHolder<>(ZAP, String[].class, String.class, false, new String[] {"Foo", "Bar"}, + new String[] {"Hello"})); + final File file = new File(testRoot, "testWriteParquetFieldIds.parquet"); + { + // Writing down random parquet column names that we _don't_ keep a reference to. This way, the only way we + // can successfully resolve them is by field id. + final ParquetInstructions writeInstructions = ParquetInstructions.builder() + .setFieldId(BAZ, BAZ_ID) + .setFieldId(ZAP, ZAP_ID) + .addColumnNameMapping(UUID.randomUUID().toString(), BAZ) + .addColumnNameMapping(UUID.randomUUID().toString(), ZAP) + .build(); + ParquetTools.writeTable(expected, file.getPath(), writeInstructions); + } + + // This test is a bit circular; but assuming we trust our reading code, we should have relative confidence that + // we are writing it down correctly if we can read it correctly. + { + final ParquetInstructions readInstructions = ParquetInstructions.builder() + .setFieldId(BAZ, BAZ_ID) + .setFieldId(ZAP, ZAP_ID) + .build(); + { + final Table actual = ParquetTools.readTable(file.getPath(), readInstructions); + assertEquals(td, actual.getDefinition()); + assertTableEquals(expected, actual); + } + { + final Table actual = ParquetTools.readTable(file.getPath(), readInstructions.withTableDefinition(td)); + assertEquals(td, actual.getDefinition()); + assertTableEquals(expected, actual); + } + } + } + + /** + * This is meant to test a "common" renaming scenario. Originally, a schema might be written down with a column + * named "Name" where semantically, this was really a first name. Later, the schema might be "corrected" to label + * this column as "FirstName". Both standalone, and in combination with the newer file, we should be able to read it + * with the latest schema. + */ + @Test + public void testRenamingResolveViaFieldId() { + final File f1 = new File(testRoot, "testRenamingResolveViaFieldId.00.parquet"); + final File f2 = new File(testRoot, "testRenamingResolveViaFieldId.01.parquet"); + + final int FIRST_NAME_ID = 15; + { + final String NAME = "Name"; + final Table t1 = newTable(TableDefinition.of(ColumnDefinition.ofString(NAME)), + stringCol(NAME, "Shivam", "Ryan")); + ParquetTools.writeTable(t1, f1.getPath(), ParquetInstructions.builder() + .setFieldId(NAME, FIRST_NAME_ID) + .build()); + } + + final int LAST_NAME_ID = 16; + final String FIRST_NAME = "FirstName"; + final String LAST_NAME = "LastName"; + final TableDefinition td = TableDefinition.of( + ColumnDefinition.ofString(FIRST_NAME), + ColumnDefinition.ofString(LAST_NAME)); + final ParquetInstructions instructions = ParquetInstructions.builder() + .setFieldId(FIRST_NAME, FIRST_NAME_ID) + .setFieldId(LAST_NAME, LAST_NAME_ID) + .build(); + { + final Table t = newTable(td, + stringCol(FIRST_NAME, "Pete", "Colin"), + stringCol(LAST_NAME, "Goddard", "Alworth")); + ParquetTools.writeTable(t, f2.getPath(), instructions); + } + + // If we read first file without an explicit definition, we should only get the column from the file + { + final TableDefinition expectedTd = TableDefinition.of(ColumnDefinition.ofString(FIRST_NAME)); + final Table expected = newTable(expectedTd, stringCol(FIRST_NAME, "Shivam", "Ryan")); + final Table actual = ParquetTools.readTable(f1.getPath(), instructions); + assertEquals(expectedTd, actual.getDefinition()); + assertTableEquals(expected, actual); + } + + // If we read first file with an explicit definition, the new column should return nulls + { + final Table expected = newTable(td, + stringCol(FIRST_NAME, "Shivam", "Ryan"), + stringCol(LAST_NAME, null, null)); + final Table actual = ParquetTools.readTable(f1.getPath(), instructions.withTableDefinition(td)); + assertEquals(td, actual.getDefinition()); + assertTableEquals(expected, actual); + } + + // We should be able to read both (flat partitioning) with the latest schema + { + final Table expected = newTable(td, + stringCol(FIRST_NAME, "Shivam", "Ryan", "Pete", "Colin"), + stringCol(LAST_NAME, null, null, "Goddard", "Alworth")); + { + final Table actual = ParquetTools.readTable(testRoot, instructions); + assertEquals(td, actual.getDefinition()); + assertTableEquals(expected, actual); + } + { + final Table actual = ParquetTools.readTable(testRoot, instructions.withTableDefinition(td)); + assertEquals(td, actual.getDefinition()); + assertTableEquals(expected, actual); + } + } + } + + + @Test + public void parquetWithNonUniqueFieldIds() { + final File f = new File(testRoot, "parquetWithNonUniqueFieldIds.parquet"); + final String FOO = "Foo"; + final String BAR = "Bar"; + final int fieldId = 31337; + final ParquetInstructions instructions = ParquetInstructions.builder() + .setFieldId(FOO, fieldId) + .setFieldId(BAR, fieldId) + .build(); + final TableDefinition td = TableDefinition.of(ColumnDefinition.ofInt(FOO), ColumnDefinition.ofString(BAR)); + final Table expected = newTable(td, + intCol(FOO, 44, 45), + stringCol(BAR, "Zip", "Zap")); + { + ParquetTools.writeTable(expected, f.getPath(), instructions); + } + + { + final String BAZ = "Baz"; + final ParquetInstructions bazInstructions = ParquetInstructions.builder() + .setFieldId(BAZ, fieldId) + .build(); + + // fieldId _won't_ be used to actually create a Baz column since the underlying file has multiple. In this + // case, we just infer the physical parquet column names. + { + + final Table actual = ParquetTools.readTable(f.getPath(), bazInstructions); + assertEquals(td, actual.getDefinition()); + assertTableEquals(expected, actual); + } + + // If the user explicitly asks for a definition with a mapping to a non-unique field id, they will get back + // the column of default (null) values. + { + final TableDefinition bazTd = TableDefinition.of(ColumnDefinition.ofInt(BAZ)); + final Table bazTable = newTable(bazTd, intCol(BAZ, QueryConstants.NULL_INT, QueryConstants.NULL_INT)); + final Table actual = ParquetTools.readTable(f.getPath(), bazInstructions.withTableDefinition(bazTd)); + assertEquals(bazTd, actual.getDefinition()); + assertTableEquals(bazTable, actual); + } + } + } + + // // We are unable to generate this sort of file via DH atm. + // @Test + // public void parquetWithNonUniqueColumnNames() { + // + // } + + /** + *
+     * import pyarrow as pa
+     * import pyarrow.parquet as pq
+     *
+     * fields = [
+     *     pa.field("Foo", pa.int64()),
+     *     pa.field(
+     *         "MyStruct",
+     *         pa.struct(
+     *             [
+     *                 pa.field("Zip", pa.int16()),
+     *                 pa.field("Zap", pa.int32()),
+     *             ]
+     *         ),
+     *     ),
+     *     pa.field("Bar", pa.string()),
+     * ]
+     *
+     * table = pa.table([[] for _ in fields], schema=pa.schema(fields))
+     * pq.write_table(table, "NestedStruct1.parquet", compression="none")
+     * 
+ */ + @Test + public void nestedMessageEmpty() { + final String file = TestParquetTools.class.getResource("/NestedStruct1.parquet").getFile(); + final TableDefinition expectedTd = + TableDefinition.of(ColumnDefinition.ofLong("Foo"), ColumnDefinition.ofString("Bar")); + final Table table = TableTools.newTable(expectedTd, longCol("Foo"), stringCol("Bar")); + // If we use an explicit definition, we can skip over MyStruct and read Foo, Bar + { + final Table actual = + ParquetTools.readTable(file, ParquetInstructions.EMPTY.withTableDefinition(expectedTd)); + assertEquals(expectedTd, actual.getDefinition()); + assertTableEquals(table, actual); + } + + // If we try to infer, we currently throw an error. + // TODO(deephaven-core#871): Parquet: Support repetition level >1 and multi-column fields + try { + ParquetTools.readTable(file); + Assertions.failBecauseExceptionWasNotThrown(UnsupportedOperationException.class); + } catch (UnsupportedOperationException e) { + Assertions.assertThat(e) + .hasMessageContaining("Encountered unsupported multi-column field MyStruct, has 2 total columns"); + } + } + + /** + *
+     * import pyarrow as pa
+     * import pyarrow.parquet as pq
+     *
+     * fields = [
+     *     pa.field("Foo", pa.int64()),
+     *     pa.field(
+     *         "MyStruct",
+     *         pa.struct(
+     *             [
+     *                 pa.field("Zip", pa.int16()),
+     *                 pa.field("Zap", pa.int32()),
+     *             ]
+     *         ),
+     *     ),
+     *     pa.field("Bar", pa.string()),
+     * ]
+     *
+     * table = pa.table([[None] for _ in fields], schema=pa.schema(fields))
+     * pq.write_table(table, "NestedStruct2.parquet", compression="none")
+     * 
+ */ + @Test + public void nestedMessage1Row() { + final String file = TestParquetTools.class.getResource("/NestedStruct2.parquet").getFile(); + final TableDefinition expectedTd = + TableDefinition.of(ColumnDefinition.ofLong("Foo"), ColumnDefinition.ofString("Bar")); + final Table table = TableTools.newTable(expectedTd, longCol("Foo", QueryConstants.NULL_LONG), + stringCol("Bar", (String) null)); + // If we use an explicit definition, we can skip over MyStruct and read Foo, Bar + { + final Table actual = + ParquetTools.readTable(file, ParquetInstructions.EMPTY.withTableDefinition(expectedTd)); + assertEquals(expectedTd, actual.getDefinition()); + assertTableEquals(table, actual); + } + + // If we try to infer, we currently throw an error. + // TODO(deephaven-core#871): Parquet: Support repetition level >1 and multi-column fields + try { + ParquetTools.readTable(file); + Assertions.failBecauseExceptionWasNotThrown(UnsupportedOperationException.class); + } catch (UnsupportedOperationException e) { + Assertions.assertThat(e) + .hasMessageContaining("Encountered unsupported multi-column field MyStruct, has 2 total columns"); + } + } } diff --git a/extensions/parquet/table/src/test/resources/NestedStruct1.parquet b/extensions/parquet/table/src/test/resources/NestedStruct1.parquet new file mode 100644 index 00000000000..d592f378b92 --- /dev/null +++ b/extensions/parquet/table/src/test/resources/NestedStruct1.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:d3b0b28c657e8991f0ffa48aedb68f23d1a746da8ddeaa473dbee82a3f80c66c +size 1019 diff --git a/extensions/parquet/table/src/test/resources/NestedStruct2.parquet b/extensions/parquet/table/src/test/resources/NestedStruct2.parquet new file mode 100644 index 00000000000..f8cee910583 --- /dev/null +++ b/extensions/parquet/table/src/test/resources/NestedStruct2.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:a6ea5e3b9a2d5907adf6ecc3909a6491a62e78c0ae7ca9516f96ad9d4db3432d +size 1237 diff --git a/extensions/parquet/table/src/test/resources/ReferenceListParquetFieldIds.parquet b/extensions/parquet/table/src/test/resources/ReferenceListParquetFieldIds.parquet new file mode 100644 index 00000000000..72ebdb26baa --- /dev/null +++ b/extensions/parquet/table/src/test/resources/ReferenceListParquetFieldIds.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:037307d886a9eade2dca4982917eec86516686fdb2d33b66ec9e8724ee59aeb0 +size 843 diff --git a/extensions/parquet/table/src/test/resources/ReferencePartitionedFieldIds/Partition=0/table.parquet b/extensions/parquet/table/src/test/resources/ReferencePartitionedFieldIds/Partition=0/table.parquet new file mode 100644 index 00000000000..5a20bee6eeb --- /dev/null +++ b/extensions/parquet/table/src/test/resources/ReferencePartitionedFieldIds/Partition=0/table.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:6f7c9501d597752d7295cd7b5f098336df5c67a341cceb583f661f2d0ac6d265 +size 1323 diff --git a/extensions/parquet/table/src/test/resources/ReferencePartitionedFieldIds/Partition=1/table.parquet b/extensions/parquet/table/src/test/resources/ReferencePartitionedFieldIds/Partition=1/table.parquet new file mode 100644 index 00000000000..b31a2c3ee60 --- /dev/null +++ b/extensions/parquet/table/src/test/resources/ReferencePartitionedFieldIds/Partition=1/table.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:61fa98e653bb6d913c4828a53d92977d3c443796e616996d3fcec76821d89b92 +size 1323 diff --git a/extensions/parquet/table/src/test/resources/ReferenceSimpleParquetFieldIds.parquet b/extensions/parquet/table/src/test/resources/ReferenceSimpleParquetFieldIds.parquet new file mode 100644 index 00000000000..4fa8b5ebdb2 --- /dev/null +++ b/extensions/parquet/table/src/test/resources/ReferenceSimpleParquetFieldIds.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:439904cfa5209c40a7c0c75c38e0835d7ce7af79a850ab270216dfff833ec0a8 +size 1315