Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-2430: Add parquet joiner v2 #1335

Merged
merged 67 commits into from
Sep 19, 2024

Conversation

MaxNevermind
Copy link
Contributor

@MaxNevermind MaxNevermind commented Apr 28, 2024

This is a simplified version of original proposed functionality of a joiner, see description of original idea and simplified design below.

Original design

See related original PR: [WIP][Proposal] PARQUET-2430: Add parquet joiner

ParquetJoiner feature is similar to ParquetRewrite class. ParquetRewrite allows to stitch files with the same schema into a single file while ParquetJoiner should enable stitching files with different schemas into a single file. That is possible when: 1) the number of rows in the main and extra files is the same, 2) the ordering of rows in the main and extra files is the same. Main benefit of ParquetJoiner is performance, for the cases when you join/stitch Terabytes/Petabytes of data that seemingly simple low level API can be up to 10x more resource efficient.
Implementation details

ParquetJoiner allows to specify the main input parquet file and extra input parquet files. ParquetJoiner will copy the main input as binary data and write extra input files with row groups adjusted to the main input. If main input is much larger than extra inputs then a lot of resources will be saved by working with the main input as binary.
Use-case examples

A very large Parquet based dataset(dozens or hundreds of fields/Terabytes of data daily/Petabytes of historical partitions). The task is to modify a column or add a new column to it for all the historic data. It is trivial using Spark, but taking into consideration the share scale of a dataset it will take a lot of resources to do that.
Side notes

Note that this class of problems could be in theory solved by storing main input and extra inputs in HMS/Iceberg bucketed tables and use a view that joins those tables on the fly into the final version but in practice there is often a requirement to merge parquet files and have a single parquet sources in the file system.
Use-case implementation details using Apache Spark

You can use Apache Spark to perform the join with ParquetJoiner, read the large main input and prepare the right side of a join in a way that each file on the left have a corresponding file on the right and it preserves records ordering on the right side in the same order as on the left side, that allows the whole input on the left and right to have the same number of files and the same number of records in corresponding files and the same ordering of records in each file pair. Then run ParquetJoiner in parallel for each file pair and perform a join. Example of the code that utilizes this new feature: https://gist.github.com/MaxNevermind/0feaaf380520ca34c2637027ef349a7d.

A simplified design(this PR)

  • has only one list of inputFilesToJoin instead of List<List<>> as in original PR
  • inputFilesToJoin is expected to have the same rowGroups ordering as in inputFiles, number of files in inputFiles and inputFilesToJoin is not necessarily has be the same, but ordering of rowGroups and the rowCount of paired rowGroups must be the same
  • joinColumnsOverwrite is used if the inputFilesToJoin is expected to overwrite column in inputFiles
  • all the capabilities that available for inputFiles like pruning, nullification, binary copy, now should be available for inputFilesToJoin too

Post PR action points

maxim_konstantinov added 29 commits January 28, 2024 14:22
@MaxNevermind
Copy link
Contributor Author

MaxNevermind commented Apr 28, 2024

@wgtmac @ConeyLiu

This PR is the outcome of simplification I mention in a comment here a couple of weeks ago: #1273 (comment)
I’ve limited the set of capabilities, see this PR description.
I’ve tired different ideas and it all come out as having too complex of implementation, so I decided to finalize at least something with as simple implementation as possible.
PR is not yet polished. Just wanted to do a quick overview of the new approach. If it looks good, I will polish it.

@MaxNevermind
Copy link
Contributor Author

@wgtmac

I started to work on the tests but I can't figure out the current approach to ParquetRewriter testing based on already existing tests. The whole list of features I see:

  • data validity after merging
  • single / multiple files merging
  • column nullification
  • column pruning
  • column encryption
  • codec preservation
  • bloom filter preservation
  • page index verification
  • metadata(CREATED_BY_KEY) preservation

I'm used to approach when features are unit tested independently sequentially. But looking at existing ParquetRewriter tests I can see that some of test tests for multiple things in the same test and I'm not able to figure out the system behind mixing features to tests into a single test.

So how I should approach it, should I just target covering all the features in one/two big tests or multiple tests while trying to cover all of those at least in one of those?

@wgtmac
Copy link
Member

wgtmac commented Aug 20, 2024

Good question. The ParquetRewriter is created by the refactoring work to consolidate ColumnEncryptor, ColumnMasker, ColumnPruner and CompressionConverter. You can see individual unit test in the ColumnEncryptorTest, ColumnMaskerTest, ColumnPrunerTest and CompressionConverterTest respectively. So the main goal of ParquetRewriterTest is to cover the combination of these features. I think we mainly need test cases covering the new join features, and prove that it does not break when other features are turned on.

@MaxNevermind
Copy link
Contributor Author

MaxNevermind commented Aug 31, 2024

@wgtmac @ConeyLiu
Can you check out the tests changes? I created a single big test for the new functionality. The documentation is still in progress.

Some clarifications.
I had to rewrite some existing validating methods to accommodate joined columns. In couple of places I used anonymous nested functions and a local nested class to try to localize/nest method's tightly related logic into a single block. Let me know if that is too weird / too functional.

Some bugs.
I found what looks like a bug in a current version of ParquetRewriter. Probably will fill an issue.
When you try to nullify and encrypt different columns it fails. There is a related test but it nullifies and encrypts the same column which doesn't reproduce a bug. The bug can be reproduced by changing a single line maskColumns.put("DocId", MaskMode.NULLIFY); to maskColumns.put("Links.Forward", MaskMode.NULLIFY); in testNullifyAndEncryptColumn() method. The reason of a failure as I understand is that during the nullification we create a single column schema MessageType newSchema = newSchema(schema, descriptor) and later use our main writer's encryption with that schema but that encyptor is expecting our final target schema, not a single column schema.

Copy link
Member

@wgtmac wgtmac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay. The current test cases are already very complicated so the refactoring work on the validation methods makes sense to me.

@wgtmac
Copy link
Member

wgtmac commented Sep 5, 2024

@wgtmac @ConeyLiu Can you check out the tests changes? I created a single big test for the new functionality. The documentation is still in progress.

It's reasonable on my side, as long as the new feature is covered.

Some bugs. I found what looks like a bug in a current version of ParquetRewriter. Probably will fill an issue. When you try to nullify and encrypt different columns it fails. There is a related test but it nullifies and encrypts the same column which doesn't reproduce a bug. The bug can be reproduced by changing a single line maskColumns.put("DocId", MaskMode.NULLIFY); to maskColumns.put("Links.Forward", MaskMode.NULLIFY); in testNullifyAndEncryptColumn() method. The reason of a failure as I understand is that during the nullification we create a single column schema MessageType newSchema = newSchema(schema, descriptor) and later use our main writer's encryption with that schema but that encyptor is expecting our final target schema, not a single column schema.

It would be great if you have time to create a PR to fix this. Thanks!

@MaxNevermind
Copy link
Contributor Author

@wgtmac
I addressed small issues you found and added / tried to polish the documentation, check it out.

@wgtmac
Copy link
Member

wgtmac commented Sep 12, 2024

@wgtmac I addressed small issues you found and added / tried to polish the documentation, check it out.

LGTM. Thanks!

@MaxNevermind
Copy link
Contributor Author

@wgtmac
Is there anything else I'm expected to do in this PR?

@wgtmac
Copy link
Member

wgtmac commented Sep 18, 2024

I'm not sure if @ConeyLiu wants to take another look.

BTW, could you fix the PR title and description? It is no longer a WIP.

@ConeyLiu
Copy link
Contributor

+1, I have no further comments, thanks for the great work

@MaxNevermind MaxNevermind changed the title [WIP][Proposal] PARQUET-2430: Add parquet joiner v2 [Proposal] PARQUET-2430: Add parquet joiner v2 Sep 19, 2024
@MaxNevermind MaxNevermind changed the title [Proposal] PARQUET-2430: Add parquet joiner v2 PARQUET-2430: Add parquet joiner v2 Sep 19, 2024
@wgtmac wgtmac merged commit 08a4e7e into apache:master Sep 19, 2024
9 checks passed
@wgtmac
Copy link
Member

wgtmac commented Sep 19, 2024

Thanks @MaxNevermind and @ConeyLiu!

@wgtmac wgtmac added this to the 1.15.0 milestone Sep 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants