Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow parquet column access by field_id #6156

Open
wants to merge 14 commits into
base: main
Choose a base branch
from

Conversation

devinrsmith
Copy link
Member

@devinrsmith devinrsmith commented Sep 30, 2024

This allows the the resolution of a parquet column by field_id instead of by its "path". This is a lower-level option that will not typically be used by end-users; as such, this option has not been plumbed through to python. This feature will be used in follow-up PRs in combination with Iceberg's field-ids to improve column mappings.

Writing support has also been added.

Fixes #6128

This allows the the resolution of a parquet column by field_id instead of by its "path". This is a lower-level option that will not typically be used by end-users; as such, this option has not been plumbed through to python. This feature will be used in follow-up PRs in combination with Iceberg's field-ids to improve column mappings.

Fixes deephaven#6128
@devinrsmith devinrsmith added parquet Related to the Parquet integration NoDocumentationNeeded ReleaseNotesNeeded Release notes are needed labels Sep 30, 2024
@devinrsmith devinrsmith added this to the 0.37.0 milestone Sep 30, 2024
@devinrsmith devinrsmith self-assigned this Sep 30, 2024
Copy link
Contributor

@malhotrashivam malhotrashivam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First level of review, can do a more detailed review tomorrow.

@malhotrashivam
Copy link
Contributor

Do verify the nightlies pass before merging.

This also fixes a bug where `parquetColumnNameToInstructions.put(parquetColumnName, ci);` was called without setting the parqute column name on ci and the KeyDef would blow up.
…t skip the logic when a user explicitly sets the parquet column name the same as the column name
@devinrsmith
Copy link
Member Author

Do verify the nightlies pass before merging.

Verified.

Copy link
Contributor

@malhotrashivam malhotrashivam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like the change, minor comments.

Comment on lines 178 to 179
// TODO: how should we handle this? Ignore?
// throw new IllegalArgumentException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that this should be an error in ParquetInstructions, right when the user sets it instead of here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole code path is very smelly; I want to go through a larger refactoring that would alleviate the need to make these types of calls in the first place. This code path is only hit when inferring the TableDefinition, so I don't think it should be an error to set the same field id multiple times in general. We have set it up this way with parquet column names, but we shouldn't technically need to do that either - every little modelling mismatch we present is a small papercut that can lead to larger modelling problems at higher layers IMO.

I would be okay throwing an error here or silently ignoring wrt inferrence. Ideally, the user would be able to choose the behavior they desire. The structure of ParquetInstructions / builder makes that tedious (I wish we could redo it w/ Immutables and saner structures).

I'll change this to throw an error here, with a note we could think about exposing option to silently ignore if that's what the user wants.

@devinrsmith
Copy link
Member Author

I couldn't find any resources to confirm, but this does feel incorrect to me, having two columns with same field ID. For example, if we get a field ID by Iceberg, it would expect a single column, right?

Iceberg probably mandates the uniqueness of field-ids.

Parquet doesn't have any mandates wrt that. And even the column names aren't guaranteed to be unique. I need to find the reference I found earlier that the parquet format "strongly recommends" unique column names, but it's not even a guarantee.

@@ -474,6 +474,7 @@ default Type createSchemaType(
builder = getBuilder(isRequired(columnDefinition), false, dataType);
isRepeating = false;
}
instructions.getFieldId(columnDefinition.getName()).ifPresent(builder::id);
Copy link
Contributor

@malhotrashivam malhotrashivam Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can skip it here, I am making the change and testing it as part of my PR here.
Or if you have already added the tests, you can copy the logic from my PR. The main difference is how we nested columns like handle lists.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ability to write Parquet field ids doesn't necessarily need to be tied into Iceberg's usage of it. Given how simple it was here, I think we can leave it in?

malhotrashivam
malhotrashivam previously approved these changes Oct 2, 2024
private final String columnName;
private String parquetColumnName;
private String codecName;
private String codecArgs;
private boolean useDictionary;
private Integer fieldId;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the point of this? Seems like we don't get anything but a little bit of extra allocation for this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The javadoc on OptionalInt specifically calls out that it is intended for return types.

/*
 * ...
 * @apiNote
 * {@code OptionalInt} is primarily intended for use as a method return type where
 * there is a clear need to represent "no result." A variable whose type is
 * {@code OptionalInt} should never itself be {@code null}; it should always point
 * to an {@code OptionalInt} instance.
 */

IntelliJ, and likely other editors, will complain.

Immutables will also use this pattern internally when you have an object that returns OptionalInt.

I'm very heavily in favor of preferring the Java-canonical approach, especially when it comes to configuration objects which we should not really care about in terms of performance implications.

for (final ColumnDescriptor column : schema.getColumns()) {
final Map<Integer, Long> fieldIdCount = schema.getColumns()
.stream()
.map(ColumnDescriptor::getPrimitiveType)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it weird that field ID is on the primitive type.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, yes; in the case of a list type, the field id is on the list and not on the primitive.

Comment on lines 158 to 160
throw new IllegalStateException(String.format(
"Field count inconsistent with number of columns, schema.getFieldCount()=%d, schema.getColumns().size()=%d",
schema.getFieldCount(), schema.getColumns().size()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really think this holds? I wonder, since "field" and "column" are distinct names.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great callout - I've dug into the distinction between "field" and "column"; for nested types, there is 1 field and multiple columns (potentially recursive).

I've improved the code to iterate through the each field with it's respective starting column index.

For inference purposes, we'll fail saying "we can't handle nested types" #871. For reading purposes when the user provides a specific table definition, we'll skip over nested columns.

I suspect we could greatly improve inference if we wanted (potentially to give the user the option to continue failing or to skip inference of nested fields by default) to not fail on these cases. I also suspect it should be pretty easy to actually support nested fields, at least a single level deep, by flattening them out into the table definition.

colName = columnNames.get(0);
break COL_NAME;
} else if (columnNames.size() > 1) {
throw new IllegalArgumentException(String.format(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this limitation is entirely because you didn't want to refactor the code. If you're going to argue for that, we should at least guide the user to use an updateView to achieve their goals.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Added a comment that this could be improved with refactoring of the code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you also update the error message? If we're not going to let the user do this, tell them how they can achieve the same result.

if (mappedName != null) {
colName = mappedName;
break COL_NAME;
}
final String legalized = legalizeColumnNameFunc.apply(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there may be a name legalization bug:

  1. I think we should be using builderSupplier in the below code.
  2. I think we should be recording any column we assign as a taken name, in order to ensure that we don't collide between a user-specified name and a legalized name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially related to #6119

Comment on lines +15 to +17
import shaded.parquet.org.apache.thrift.protocol.TSimpleJSONProtocol;
import shaded.parquet.org.apache.thrift.transport.TIOStreamTransport;
import shaded.parquet.org.apache.thrift.transport.TTransport;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels a little questionable to depend on someone else's shaded packages.

* @param fieldId the field_id to fetch
* @return the accessor to a given Column Chunk, or null if the column is not present in this Row Group
*/
@Nullable
ColumnChunkReader getColumnChunk(@NotNull String columnName, @NotNull List<String> path, @Nullable Integer fieldId);
ColumnChunkReader getColumnChunk(@NotNull String columnName, @NotNull List<String> defaultPath,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document defaultPath. Is it a parquet path?

Comment on lines +877 to +879
* In the case where both a field id mapping and a parquet colum name mapping is provided, the field id will
* take precedence over the parquet column name. This may happen in cases where the parquet file is managed by a
* higher-level schema that has the concept of a "field id"; for example, Iceberg. As <a href=
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change it. There's no precedence if both are present, we insist that they be consistent.

* @param columnChunkReaders The {@link ColumnChunkReader column chunk readers} for this location
*/
ParquetColumnLocation(
@NotNull final ParquetTableLocation tableLocation,
@NotNull final String columnName,
@NotNull final String parquetColumnName,
@Nullable final String parquetColumnName,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check what happens if we're inferring and legalized a Parquet column name to get the Deephaven column name. I think in that case, this change is broken as-is.

columnPath == null ? Collections.singletonList(parquetColumnName) : Arrays.asList(columnPath);
final List<String> defaultPath;
{
final String[] path = parquetColumnNameToPath.get(columnName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's something about this making me uncomfortable. I think there may be a buggy path if legalization is used.

Comment on lines 141 to 146
for (String nonUniquePath : nonUniquePaths) {
byPath.remove(nonUniquePath);
}
for (Integer nonUniqueFieldId : nonUniqueFieldIds) {
chunkMapByFieldId.remove(nonUniqueFieldId);
schemaMapByFieldId.remove(nonUniqueFieldId);
byFieldId.remove(nonUniqueFieldId);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last wins or first wins is better than "pretend we had nothing, and just give nulls".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what pyarrow does.

if (byFieldId != null && byPath != null) {
if (byFieldId != byPath) {
throw new IllegalArgumentException(String.format(
"For columnName=%s, providing an explicit parquet column name path (%s) and field id (%d) mapping, but they are resolving to different columns, byFieldId=[%s], byPath=[%s]",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"For columnName=%s, providing an explicit parquet column name path (%s) and field id (%d) mapping, but they are resolving to different columns, byFieldId=[%s], byPath=[%s]",
"For columnName=%s, instructions provided an explicit parquet column name path (%s) and field id (%d) mapping, but they are resolving to different columns, byFieldId=[%s], byPath=[%s]",

}
columnChunk = cc;
nonRequiredFields = schemaMap.get(key);
holder = byFieldId != null ? byFieldId : byPath;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the user specified a field ID and we didn't find it, I'm not sure it's correct to fall back to name mapping.
https://iceberg.apache.org/spec/#schema-evolution specifies a set of rules, and we should be making sure our Parquet implementation will let our Iceberg implementation follow them.

For Iceberg support, it looks like we need:

  1. A list of name mappings, which we fall back to if and only if the field ID was not found.
  2. Some kind of handling for encountering multiple Parquet fields with names from the name mappings: first? last? exception?
  3. Some kind of handling for finding a fallback field by name mappings, and determining that it does not match the expected field ID. Exception?

Comment on lines +161 to +164
// TODO(deephaven-core#871): Parquet: Support repetition level >1 and multi-column fields
throw new UnsupportedOperationException(
String.format("Encountered unsupported multi-column field %s, has %d total columns",
fieldType.getName(), numColumns));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You suggested we should maybe just start skipping nested fields. I bet we could also choose to include them, with some weird default. Like "UnprocessedField" singleton POJOs.

if (fieldIt.hasNext() || columnDescriptorIterator.hasNext()) {
throw new IllegalStateException("Iterators not exhausted");
if (columnIx != columnDescriptors.size()) {
throw new IllegalStateException("Not proper size");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do better than this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
NoDocumentationNeeded parquet Related to the Parquet integration ReleaseNotesNeeded Release notes are needed
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Allow Parquet column access by field_id
3 participants