Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow parquet column access by field_id #6156

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.nio.channels.SeekableByteChannel;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.Function;

import static io.deephaven.parquet.base.ParquetUtils.resolve;
Expand All @@ -44,7 +45,7 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {
private final CompressorAdapter decompressor;
private final ColumnDescriptor path;
private final OffsetIndexReader offsetIndexReader;
private final List<Type> fieldTypes;
private final List<Type> nonRequiredFields;
private final Function<SeekableChannelContext, Dictionary> dictionarySupplier;
private final URI columnChunkURI;
/**
Expand All @@ -62,12 +63,12 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {
final SeekableChannelsProvider channelsProvider,
final URI rootURI,
final MessageType type,
final List<Type> fieldTypes,
final List<Type> nonRequiredFields,
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
final long numRows,
final String version) {
this.columnName = columnName;
this.channelsProvider = channelsProvider;
this.columnChunk = columnChunk;
this.columnName = Objects.requireNonNull(columnName);
this.channelsProvider = Objects.requireNonNull(channelsProvider);
this.columnChunk = Objects.requireNonNull(columnChunk);
this.path = type
.getColumnDescription(columnChunk.meta_data.getPath_in_schema().toArray(new String[0]));
if (columnChunk.getMeta_data().isSetCodec()) {
Expand All @@ -76,7 +77,7 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {
} else {
decompressor = CompressorAdapter.PASSTHRU;
}
this.fieldTypes = fieldTypes;
this.nonRequiredFields = Objects.requireNonNull(nonRequiredFields);
this.dictionarySupplier = new SoftCachingFunction<>(this::getDictionary);
this.numRows = numRows;
this.version = version;
Expand Down Expand Up @@ -280,7 +281,8 @@ public ColumnPageReader next(@NotNull final SeekableChannelContext channelContex
final Function<SeekableChannelContext, Dictionary> pageDictionarySupplier =
getPageDictionarySupplier(pageHeader);
return new ColumnPageReaderImpl(columnName, channelsProvider, decompressor, pageDictionarySupplier,
pageMaterializerFactory, path, getURI(), fieldTypes, dataOffset, pageHeader, numValuesInPage);
pageMaterializerFactory, path, getURI(), nonRequiredFields, dataOffset, pageHeader,
numValuesInPage);
} catch (IOException e) {
throw new UncheckedDeephavenException("Error reading page header at offset " + headerOffset + " for " +
"column: " + columnName + ", uri: " + getURI(), e);
Expand Down Expand Up @@ -358,7 +360,7 @@ public ColumnPageReader getPageReader(final int pageNum, final SeekableChannelCo
final Function<SeekableChannelContext, Dictionary> pageDictionarySupplier =
getPageDictionarySupplier(pageHeader);
return new ColumnPageReaderImpl(columnName, channelsProvider, decompressor, pageDictionarySupplier,
pageMaterializerFactory, path, getURI(), fieldTypes, dataOffset, pageHeader,
pageMaterializerFactory, path, getURI(), nonRequiredFields, dataOffset, pageHeader,
getNumValues(pageHeader));
} catch (final IOException e) {
throw new UncheckedDeephavenException("Error reading page header for page number " + pageNum +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ final class ColumnPageReaderImpl implements ColumnPageReader {
private final PageMaterializerFactory pageMaterializerFactory;
private final ColumnDescriptor path;
private final URI uri;
private final List<Type> fieldTypes;
private final List<Type> nonRequiredFields;

/**
* The offset for data following the page header in the file.
Expand All @@ -80,7 +80,7 @@ final class ColumnPageReaderImpl implements ColumnPageReader {
* @param materializerFactory The factory for creating {@link PageMaterializer}.
* @param path The path of the column.
* @param uri The uri of the parquet file.
* @param fieldTypes The types of the fields in the column.
* @param nonRequiredFields The types of the fields in the column.
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
* @param dataOffset The offset for data following the page header in the file.
* @param pageHeader The page header, should not be {@code null}.
* @param numValues The number of values in the page.
Expand All @@ -93,7 +93,7 @@ final class ColumnPageReaderImpl implements ColumnPageReader {
final PageMaterializerFactory materializerFactory,
final ColumnDescriptor path,
final URI uri,
final List<Type> fieldTypes,
final List<Type> nonRequiredFields,
final long dataOffset,
final PageHeader pageHeader,
final int numValues) {
Expand All @@ -104,7 +104,7 @@ final class ColumnPageReaderImpl implements ColumnPageReader {
this.pageMaterializerFactory = materializerFactory;
this.path = path;
this.uri = uri;
this.fieldTypes = fieldTypes;
this.nonRequiredFields = nonRequiredFields;
this.dataOffset = dataOffset;
this.pageHeader = Require.neqNull(pageHeader, "pageHeader");
this.numValues = Require.geqZero(numValues, "numValues");
Expand Down Expand Up @@ -716,7 +716,7 @@ private Pair<Pair<Type.Repetition, IntBuffer>[], Integer> getOffsetsAndNulls(
int currentRl = rlDecoder == null ? 0 : rlDecoder.currentValue();

final LevelsController levelsController = new LevelsController(
fieldTypes.stream().map(Type::getRepetition).toArray(Type.Repetition[]::new));
nonRequiredFields.stream().map(Type::getRepetition).toArray(Type.Repetition[]::new));
for (int valuesProcessed = 0; valuesProcessed < numValues;) {
if (dlRangeSize == 0) {
dlDecoder.readNextRange();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class ParquetFileReader {
* If reading a single parquet file, root URI is the URI of the file, else the parent directory for a metadata file
*/
private final URI rootURI;
private final MessageType type;
private final MessageType schema;

/**
* Make a {@link ParquetFileReader} for the supplied {@link File}. Wraps {@link IOException} as
Expand Down Expand Up @@ -102,7 +102,7 @@ private ParquetFileReader(
fileMetaData = Util.readFileMetaData(in);
}
}
type = fromParquetSchema(fileMetaData.schema, fileMetaData.column_orders);
schema = fromParquetSchema(fileMetaData.schema, fileMetaData.column_orders);
}

/**
Expand Down Expand Up @@ -476,7 +476,7 @@ private static boolean isAdjustedToUTC(final LogicalType logicalType) {
}

public MessageType getSchema() {
return type;
return schema;
}

public int rowGroupCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

final class RowGroupReaderImpl implements RowGroupReader {
private final RowGroup rowGroup;
Expand Down Expand Up @@ -45,14 +48,21 @@ final class RowGroupReaderImpl implements RowGroupReader {
"Expected schema fieldCount and row group columns siize to be equal, schema.getFieldCount()=%d, rowGroup.getColumnsSize()=%d, rootURI=%s",
fieldCount, rowGroup.getColumnsSize(), rootURI));
}
this.channelsProvider = channelsProvider;
this.rowGroup = rowGroup;
this.rootURI = rootURI;
this.schema = schema;
this.channelsProvider = Objects.requireNonNull(channelsProvider);
this.rowGroup = Objects.requireNonNull(rowGroup);
this.rootURI = Objects.requireNonNull(rootURI);
this.schema = Objects.requireNonNull(schema);
schemaMap = new HashMap<>(fieldCount);
chunkMap = new HashMap<>(fieldCount);
schemaMapByFieldId = new HashMap<>(fieldCount);
chunkMapByFieldId = new HashMap<>(fieldCount);
// Note: there is no technical guarantee from parquet that column names, path_in_schema, or field_ids are
// unique; it's technically possible that they are duplicated. Ultimately, getColumnChunk is a bad abstraction -
// we shouldn't need to re-do matching for every single row group column chunk, the matching should be done
// _once_ per column to get the column index, and then for every row group we should just need to do
// rowGroup.getColumns().get(columnIndex). If we want to
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
final Set<String> nonUniqueKeys = new HashSet<>();
final Set<Integer> nonUniqueFieldIds = new HashSet<>();
final Iterator<Type> fieldsIt = schema.getFields().iterator();
final Iterator<ColumnChunk> colsIt = rowGroup.getColumnsIterator();
while (fieldsIt.hasNext() && colsIt.hasNext()) {
Expand All @@ -68,16 +78,29 @@ final class RowGroupReaderImpl implements RowGroupReader {
nonRequiredFields.add(fieldType);
}
}
chunkMap.put(key, column);
schemaMap.put(key, nonRequiredFields);
if (chunkMap.putIfAbsent(key, column) != null) {
nonUniqueKeys.add(key);
}
schemaMap.putIfAbsent(key, nonRequiredFields);
if (ft.getId() != null) {
chunkMapByFieldId.put(ft.getId().intValue(), column);
schemaMapByFieldId.put(ft.getId().intValue(), nonRequiredFields);
final int fieldId = ft.getId().intValue();
if (chunkMapByFieldId.putIfAbsent(fieldId, column) != null) {
nonUniqueFieldIds.add(fieldId);
}
schemaMapByFieldId.putIfAbsent(fieldId, nonRequiredFields);
}
}
if (fieldsIt.hasNext() || colsIt.hasNext()) {
throw new IllegalStateException(String.format("Unexpected, iterators not exhausted, rootURI=%s", rootURI));
}
for (String nonUniqueKey : nonUniqueKeys) {
chunkMap.remove(nonUniqueKey);
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
schemaMap.remove(nonUniqueKey);
}
for (Integer nonUniqueFieldId : nonUniqueFieldIds) {
chunkMapByFieldId.remove(nonUniqueFieldId);
schemaMapByFieldId.remove(nonUniqueFieldId);
}
this.version = version;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -865,10 +865,10 @@ public Builder useDictionary(final String columnName, final boolean useDictionar
* ({@code null}) values.
*
* <p>
* This field is not typically configured by end users.
* For writing, this will set the {@code field_id} in the proper Parquet {@code SchemaElement}.
*
* <p>
* For writing, this is not currently supported.
* This field is not typically configured by end users.
*/
public Builder setFieldId(final String columnName, final int fieldId) {
final ColumnInstructions ci = getColumnInstructions(columnName);
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,9 @@ public static ParquetInstructions readParquetSchema(
colName = columnNames.get(0);
break COL_NAME;
} else if (columnNames.size() > 1) {
// TODO: how should we handle this? Ignore?
// throw new IllegalArgumentException();
throw new IllegalArgumentException(String.format(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this limitation is entirely because you didn't want to refactor the code. If you're going to argue for that, we should at least guide the user to use an updateView to achieve their goals.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Added a comment that this could be improved with refactoring of the code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you also update the error message? If we're not going to let the user do this, tell them how they can achieve the same result.

"Non-unique Field ID mapping provided; unable to infer TableDefinition for fieldId=%d, parquetColumnName=%s",
fieldId.intValue(), parquetColumnName));
}
}
final String mappedName = readInstructions.getColumnNameFromParquetColumnName(parquetColumnName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ default Type createSchemaType(
builder = getBuilder(isRequired(columnDefinition), false, dataType);
isRepeating = false;
}
instructions.getFieldId(columnDefinition.getName()).ifPresent(builder::id);
Copy link
Contributor

@malhotrashivam malhotrashivam Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can skip it here, I am making the change and testing it as part of my PR here.
Or if you have already added the tests, you can copy the logic from my PR. The main difference is how we nested columns like handle lists.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ability to write Parquet field ids doesn't necessarily need to be tied into Iceberg's usage of it. Given how simple it was here, I think we can leave it in?

if (!isRepeating) {
return builder.named(parquetColumnName);
}
Expand Down
Loading