From 324a669fbf0fa523435aa5b634f3ec6cbffe4411 Mon Sep 17 00:00:00 2001 From: MaheshGPai Date: Sat, 15 Jul 2023 17:37:10 +0530 Subject: [PATCH 1/3] Support merging of rowgroups during file rewrite --- .../parquet/cli/commands/RewriteCommand.java | 21 + .../parquet/hadoop/ParquetFileWriter.java | 66 +- .../apache/parquet/hadoop/RowGroupMerger.java | 652 ++++++++++++++++++ .../hadoop/rewrite/ParquetRewriter.java | 37 + .../hadoop/rewrite/RewriteOptions.java | 44 +- .../parquet/hadoop/TestRowGroupMerging.java | 263 +++++++ .../hadoop/rewrite/ParquetRewriterTest.java | 42 ++ 7 files changed, 1097 insertions(+), 28 deletions(-) create mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java create mode 100644 parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestRowGroupMerging.java diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java index da0b3eff4e..e84235dc56 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java @@ -82,6 +82,18 @@ public class RewriteCommand extends BaseCommand { required = false) String codec; + @Parameter( + names = {"-m", "--merge-rowgroups"}, + description = "", + required = false) + boolean mergeRowGroups; + + @Parameter( + names = {"-s", "--max-rowgroup-size"}, + description = "", + required = false) + long maxRowGroupSize; + public RewriteCommand(Logger console) { super(console); } @@ -118,6 +130,15 @@ private RewriteOptions buildOptionsOrFail() throws IOException { builder.transform(codecName); } + if (mergeRowGroups) { + Preconditions.checkArgument(maxRowGroupSize > 0, + "If merge rowgroup is enabled, max rowgroups size should be specified"); + Preconditions.checkArgument(null != codec, + "If merge rowgroup is enabled, new compression codec needs to be specified"); + builder.enableRowGroupMerge(); + builder.maxRowGroupSize(maxRowGroupSize); + } + RewriteOptions options = builder.build(); // If RewriteOptions are successfully built and the overwrite option is specified, remove the output path diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index 80b9907a2d..6b7c9219cd 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -74,6 +74,7 @@ import org.apache.parquet.hadoop.metadata.GlobalMetaData; import org.apache.parquet.hadoop.metadata.KeyValueMetadataMergeStrategy; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.CompressionConverter; import org.apache.parquet.hadoop.util.HadoopOutputFile; import org.apache.parquet.hadoop.util.HadoopStreams; import org.apache.parquet.internal.column.columnindex.ColumnIndex; @@ -1033,32 +1034,7 @@ public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup, boolean dropColumns) throws IOException { startBlock(rowGroup.getRowCount()); - Map columnsToCopy = - new HashMap(); - for (ColumnChunkMetaData chunk : rowGroup.getColumns()) { - columnsToCopy.put(chunk.getPath().toDotString(), chunk); - } - - List columnsInOrder = - new ArrayList(); - - for (ColumnDescriptor descriptor : schema.getColumns()) { - String path = ColumnPath.get(descriptor.getPath()).toDotString(); - ColumnChunkMetaData chunk = columnsToCopy.remove(path); - if (chunk != null) { - columnsInOrder.add(chunk); - } else { - throw new IllegalArgumentException(String.format( - "Missing column '%s', cannot copy row group: %s", path, rowGroup)); - } - } - - // complain if some columns would be dropped and that's not okay - if (!dropColumns && !columnsToCopy.isEmpty()) { - throw new IllegalArgumentException(String.format( - "Columns cannot be copied (missing from target schema): %s", - String.join(", ", columnsToCopy.keySet()))); - } + List columnsInOrder = getColumnsInOrder(rowGroup, dropColumns); // copy the data for all chunks long start = -1; @@ -1157,6 +1133,44 @@ public void appendColumnChunk(ColumnDescriptor descriptor, SeekableInputStream f currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + chunk.getTotalUncompressedSize()); } + private List getColumnsInOrder(BlockMetaData rowGroup, boolean dropColumns) { + return getColumnsInOrder(rowGroup, schema, dropColumns); + } + + /** + * @param rowGroup row group containing columns + * @param schema the schema to use for column ordering + * @param dropColumns whether we should drop columns that are not defined in the provided schema + */ + public static List getColumnsInOrder(BlockMetaData rowGroup, + MessageType schema, boolean dropColumns) { + Map columnsToCopy = new HashMap<>(); + for (ColumnChunkMetaData chunk : rowGroup.getColumns()) { + columnsToCopy.put(chunk.getPath().toDotString(), chunk); + } + + List columnsInOrder = new ArrayList<>(); + + for (ColumnDescriptor descriptor : schema.getColumns()) { + String path = ColumnPath.get(descriptor.getPath()).toDotString(); + ColumnChunkMetaData chunk = columnsToCopy.remove(path); + if (chunk != null) { + columnsInOrder.add(chunk); + } else { + throw new IllegalArgumentException(String.format( + "Missing column '%s', cannot copy row group: %s", path, rowGroup)); + } + } + + // complain if some columns would be dropped and that's not okay + if (!dropColumns && !columnsToCopy.isEmpty()) { + throw new IllegalArgumentException(String.format( + "Columns cannot be copied (missing from target schema): %s", + String.join(", ", columnsToCopy.keySet()))); + } + return columnsInOrder; + } + // Buffers for the copy function. private static final ThreadLocal COPY_BUFFER = ThreadLocal.withInitial(() -> new byte[8192]); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java new file mode 100644 index 0000000000..e30a070aae --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java @@ -0,0 +1,652 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import static java.lang.String.format; +import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL; +import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL; +import static org.apache.parquet.column.ValuesType.VALUES; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.AbstractMap.SimpleEntry; +import java.util.Map.Entry; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.Preconditions; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Dictionary; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.column.page.DataPage; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.page.DataPageV2; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.util.CompressionConverter; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; + +public class RowGroupMerger { + + private final MessageType schema; + private final CodecFactory.BytesInputCompressor compressor; + private final ParquetProperties parquetProperties; + + public RowGroupMerger(MessageType schema, CompressionCodecName compression, boolean useV2ValueWriter) { + this(schema, new Configuration(), compression, useV2ValueWriter); + } + + RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName compression, boolean useV2ValueWriter) { + this(schema, conf, compression, createParquetProperties(useV2ValueWriter)); + } + + RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName compression, ParquetProperties parquetProperties) { + this.schema = schema; + this.parquetProperties = parquetProperties; + this.compressor = new CodecFactory(conf, this.parquetProperties.getPageSizeThreshold()).getCompressor(compression); + } + + /** + * Merges the row groups making sure that new row groups do not exceed the supplied maxRowGroupSize + * + * @param inputFiles input files to merge + * @param maxRowGroupSize the max limit for new blocks + * @param writer writer to write the new blocks to + * @throws IOException if an IO error occurs + */ + public void merge(List inputFiles, final long maxRowGroupSize, + ParquetFileWriter writer) throws IOException { + + SizeEstimator estimator = new SizeEstimator(compressor.getCodecName() != CompressionCodecName.UNCOMPRESSED); + MutableMergedBlock mergedBlock = null; + for (ParquetFileReader reader : inputFiles) { + for (BlockMetaData blockMeta : reader.getRowGroups()) { + PageReadStore group = reader.readNextRowGroup(); + Preconditions.checkState(group != null, + "number of groups returned by FileReader does not match metadata"); + + if (mergedBlock != null && mergedBlock.getCompressedSize() + estimator.estimate(blockMeta) > maxRowGroupSize) { + saveBlockTo(mergedBlock, writer); + mergedBlock = null; + } + + if (mergedBlock == null && estimator.estimate(blockMeta) > maxRowGroupSize) { + //save it directly without re encoding it + saveBlockTo(ReadOnlyMergedBlock.of(blockMeta, group, schema, compressor), writer); + continue; + } + + if (mergedBlock == null) { + mergedBlock = new MutableMergedBlock(schema); + } + + long sizeBeforeMerge = mergedBlock.getCompressedSize(); + mergedBlock.merge(blockMeta, group); + //update our estimator + long currentBlockEffect = mergedBlock.getCompressedSize() - sizeBeforeMerge; + estimator.update(currentBlockEffect, blockMeta); + } + } + if (mergedBlock != null) { + saveBlockTo(mergedBlock, writer); + } + mergedBlock = null; + } + + + private void saveBlockTo(MergedBlock block, ParquetFileWriter writer) { + try { + writer.startBlock(block.rowCount()); + + for (MergedColumn col : block.columnsInOrder()) { + writer.startColumn(col.getColumnDesc(), col.getValueCount(), col.getCompression()); + + col.writeDictionaryPageTo(writer); + col.writeDataPagesTo(writer); + + writer.endColumn(); + } + + writer.endBlock(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private static ParquetProperties createParquetProperties(boolean useV2Writer) { + ParquetProperties.Builder builder = ParquetProperties.builder(); + if (useV2Writer) { + builder.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0); + } + return builder.build(); + } + + private BytesInput compress(BytesInput bytes) { + return compress(bytes, compressor); + } + + private static BytesInput compress(BytesInput bytes, CompressionCodecFactory.BytesInputCompressor compressor) { + try { + //we copy as some compressors use shared memory + return BytesInput.copy(compressor.compress(bytes)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private BiConsumer createWritingBridge(PrimitiveType.PrimitiveTypeName typeName) { + switch (typeName) { + case FIXED_LEN_BYTE_ARRAY: + case INT96: + case BINARY: + return (src, dest) -> dest.writeBytes(src.readBytes()); + case BOOLEAN: + return (src, dest) -> dest.writeBoolean(src.readBoolean()); + case DOUBLE: + return (src, dest) -> dest.writeDouble(src.readDouble()); + case FLOAT: + return (src, dest) -> dest.writeFloat(src.readFloat()); + case INT32: + return (src, dest) -> dest.writeInteger(src.readInteger()); + case INT64: + return (src, dest) -> dest.writeLong(src.readLong()); + default: + throw new RuntimeException("Unsupported column primitive type: " + typeName.name()); + } + } + + private static void writePageTo(DataPage dataPage, ParquetFileWriter writer) { + dataPage.accept(new DataPage.Visitor() { + @Override + public Void visit(DataPageV1 page) { + try { + if (page.getIndexRowCount().isPresent()) { + writer.writeDataPage(page.getValueCount(), page.getUncompressedSize(), + page.getBytes(), page.getStatistics(), page.getIndexRowCount().get(), page.getRlEncoding(), + page.getDlEncoding(), page.getValueEncoding()); + + } else { + writer.writeDataPage(page.getValueCount(), page.getUncompressedSize(), + page.getBytes(), page.getStatistics(), page.getRlEncoding(), + page.getDlEncoding(), page.getValueEncoding()); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return null; + } + + @Override + public Void visit(DataPageV2 page) { + try { + writer.writeDataPageV2(page.getRowCount(), page.getNullCount(), page.getValueCount(), + page.getRepetitionLevels(), page.getDefinitionLevels(), page.getDataEncoding(), + page.getData(), page.getUncompressedSize(), page.getStatistics()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return null; + } + }); + } + + private static DictionaryPage getCompressedDictionary(DictionaryPage dictionary, CompressionCodecFactory.BytesInputCompressor compressor) { + return new DictionaryPage( + compress(dictionary.getBytes(), compressor), + dictionary.getUncompressedSize(), + dictionary.getDictionarySize(), + dictionary.getEncoding()); + } + + private interface MergedBlock { + long rowCount(); + + List columnsInOrder(); + } + + private interface MergedColumn { + long getValueCount(); + + CompressionCodecName getCompression(); + + ColumnDescriptor getColumnDesc(); + + void writeDataPagesTo(ParquetFileWriter writer); + + void writeDictionaryPageTo(ParquetFileWriter writer) throws IOException; + } + + private class MutableMergedBlock implements MergedBlock { + + private final Map columns = new HashMap<>(); + private final MessageType schema; + private long recordCount; + private long compressedSize; + + private MutableMergedBlock(MessageType schema) { + this.schema = schema; + } + + @Override + public long rowCount() { + return recordCount; + } + + private long getCompressedSize() { + return compressedSize; + } + + @Override + public List columnsInOrder() { + return schema.getColumns() + .stream() + .map(columns::get) + .collect(Collectors.toList()); + } + + MutableMergedColumn getOrCreateColumn(ColumnDescriptor column) { + return columns.computeIfAbsent(column, desc -> new MutableMergedColumn(desc, this::addCompressedBytes)); + } + + void addRowCount(long rowCount) { + recordCount += rowCount; + } + + void addCompressedBytes(long size) { + compressedSize += size; + } + + void merge(BlockMetaData blockMeta, PageReadStore group) throws IOException { + for (Entry col : getColumnsInOrder(blockMeta, schema)) { + + MutableMergedColumn column = getOrCreateColumn(col.getKey()); + PageReader columnReader = group.getPageReader(col.getKey()); + + DictionaryPage dictPage = columnReader.readDictionaryPage(); + Dictionary decodedDictionary = null; + if (dictPage != null) { + decodedDictionary = dictPage.getEncoding().initDictionary(column.getColumnDesc(), dictPage); + } + + //read all pages in this column chunk + DataPage data; + while ((data = columnReader.readPage()) != null) { + column.addPage(data, decodedDictionary); + } + } + addRowCount(blockMeta.getRowCount()); + } + } + + private static List> getColumnsInOrder(BlockMetaData blockMeta, + MessageType schema) { + + return ParquetFileWriter.getColumnsInOrder(blockMeta, schema, false).stream() + .map(c -> toEntry(schema, c)) + .collect(Collectors.toList()); + } + + private static SimpleEntry toEntry(MessageType schema, + ColumnChunkMetaData column) { + + return new SimpleEntry<>( + schema.getColumnDescription(column.getPath().toArray()), + column); + } + + private static class ReadOnlyMergedBlock implements MergedBlock { + private final List columns; + private final long recordCount; + + private ReadOnlyMergedBlock(List columns, long recordCount) { + this.columns = Collections.unmodifiableList(columns); + this.recordCount = recordCount; + } + + @Override + public long rowCount() { + return recordCount; + } + + @Override + public List columnsInOrder() { + return columns; + } + + static ReadOnlyMergedBlock of(BlockMetaData blockMeta, PageReadStore group, MessageType schema, + CompressionCodecFactory.BytesInputCompressor compressor) { + List columns = new ArrayList<>(); + for (Entry col : getColumnsInOrder(blockMeta, schema)) { + + List pages = new ArrayList<>(); + PageReader columnReader = group.getPageReader(col.getKey()); + + DictionaryPage dictPage = columnReader.readDictionaryPage(); + if (dictPage != null) { + dictPage = getCompressedDictionary(dictPage, compressor); + } + + //read all pages in this column chunk + DataPage data; + while ((data = columnReader.readPage()) != null) { + + data = data.accept(new DataPage.Visitor() { + @Override + public DataPage visit(DataPageV1 pageV1) { + + return new DataPageV1(compress(pageV1.getBytes(), compressor), pageV1.getValueCount(), + pageV1.getUncompressedSize(), pageV1.getFirstRowIndex().orElse(-1L), + pageV1.getIndexRowCount().orElse(-1), pageV1.getStatistics(), pageV1.getRlEncoding(), + pageV1.getDlEncoding(), pageV1.getValueEncoding()); + } + + @Override + public DataPage visit(DataPageV2 pageV2) { + + return DataPageV2.compressed( + pageV2.getRowCount(), pageV2.getNullCount(), pageV2.getValueCount(), + pageV2.getRepetitionLevels(), pageV2.getDefinitionLevels(), pageV2.getDataEncoding(), + compress(pageV2.getData(), compressor), pageV2.getUncompressedSize(), pageV2.getStatistics()); + } + }); + + pages.add(data); + } + + ReadOnlyMergedColumn column = new ReadOnlyMergedColumn(pages, dictPage, col.getKey(), + col.getValue().getValueCount(), compressor.getCodecName()); + + columns.add(column); + } + return new ReadOnlyMergedBlock(columns, blockMeta.getRowCount()); + } + } + + private static class ReadOnlyMergedColumn implements MergedColumn { + private final List pages; + private final DictionaryPage dictionary; + private final ColumnDescriptor columnDesc; + private final long valueCount; + private final CompressionCodecName codecName; + + private ReadOnlyMergedColumn(List pages, DictionaryPage dictionary, ColumnDescriptor columnDesc, + long valueCount, CompressionCodecName codecName) { + this.pages = pages; + this.dictionary = dictionary; + this.columnDesc = columnDesc; + this.valueCount = valueCount; + this.codecName = codecName; + } + + @Override + public long getValueCount() { + return valueCount; + } + + @Override + public CompressionCodecName getCompression() { + return codecName; + } + + @Override + public ColumnDescriptor getColumnDesc() { + return columnDesc; + } + + @Override + public void writeDataPagesTo(ParquetFileWriter writer) { + pages.forEach(page -> writePageTo(page, writer)); + } + + @Override + public void writeDictionaryPageTo(ParquetFileWriter writer) throws IOException { + if (dictionary != null) { + writer.writeDictionaryPage(dictionary); + } + } + } + + private class MutableMergedColumn implements MergedColumn { + + private final List pages = new ArrayList<>(); + private final ColumnDescriptor columnDesc; + private final ValuesWriter newValuesWriter; + private final BiConsumer dataWriter; + private final Consumer compressedSizeAccumulator; + + private long valueCount; + + private MutableMergedColumn(ColumnDescriptor column, Consumer compressedSizeAccumulator) { + this.columnDesc = column; + this.compressedSizeAccumulator = compressedSizeAccumulator; + this.newValuesWriter = parquetProperties.newValuesWriter(columnDesc); + this.dataWriter = createWritingBridge(columnDesc.getPrimitiveType().getPrimitiveTypeName()); + } + + @Override + public long getValueCount() { + return valueCount; + } + + @Override + public CompressionCodecName getCompression() { + return compressor.getCodecName(); + } + + @Override + public ColumnDescriptor getColumnDesc() { + return columnDesc; + } + + @Override + public void writeDataPagesTo(ParquetFileWriter writer) { + pages.forEach(page -> writePageTo(page, writer)); + } + + @Override + public void writeDictionaryPageTo(ParquetFileWriter writer) throws IOException { + DictionaryPage page = newValuesWriter.toDictPageAndClose(); + if (page != null) { + writer.writeDictionaryPage(getCompressedDictionary(page, compressor)); + newValuesWriter.resetDictionary(); + } + } + + void addPage(DataPage data, Dictionary pageDictionary) { + DataPage recodePage = recodePage(data, pageDictionary); + compressedSizeAccumulator.accept((long) recodePage.getCompressedSize()); + valueCount += recodePage.getValueCount(); + pages.add(recodePage); + } + + DataPage recodePage(DataPage data, Dictionary pageDictionary) { + return data.accept(new DataPage.Visitor() { + + @Override + public DataPage visit(DataPageV1 pageV1) { + + try { + verifyDataEncoding(pageV1.getValueEncoding(), pageDictionary); + + final byte[] originalBytes = pageV1.getBytes().toByteArray(); + + if (pageV1.getValueEncoding().usesDictionary()) { + + ValuesReader rlReader = pageV1.getRlEncoding().getValuesReader(columnDesc, REPETITION_LEVEL); + ValuesReader dlReader = pageV1.getDlEncoding().getValuesReader(columnDesc, DEFINITION_LEVEL); + ValuesReader dataReader = pageV1.getValueEncoding() + .getDictionaryBasedValuesReader(columnDesc, VALUES, pageDictionary); + + ByteBufferInputStream input = ByteBufferInputStream.wrap(ByteBuffer.wrap(originalBytes)); + + int startPos = Math.toIntExact(input.position()); + rlReader.initFromPage(pageV1.getValueCount(), input); + dlReader.initFromPage(pageV1.getValueCount(), input); + int dlEndPos = Math.toIntExact(input.position()); + + dataReader.initFromPage(pageV1.getValueCount(), input); + + int rowCount = 0; + for (int i = 0; i < pageV1.getValueCount(); i++) { + if (dlReader.readInteger() == columnDesc.getMaxDefinitionLevel()) + dataWriter.accept(dataReader, newValuesWriter); + + rowCount = rlReader.readInteger() == 0 ? rowCount + 1 : rowCount; + } + + BytesInput recodedBytes = BytesInput.concat( + BytesInput.from(originalBytes, startPos, dlEndPos - startPos), newValuesWriter.getBytes() + ); + + Encoding valuesEncoding = newValuesWriter.getEncoding(); + int uncompressedSize = Math.toIntExact(recodedBytes.size()); + BytesInput compressedBytes = compress(recodedBytes); + + newValuesWriter.reset(); + + long firstRowIndex = pageV1.getFirstRowIndex().orElse(-1L); + + return new DataPageV1(compressedBytes, pageV1.getValueCount(), uncompressedSize, + firstRowIndex, rowCount, pageV1.getStatistics(), pageV1.getRlEncoding(), + pageV1.getDlEncoding(), valuesEncoding + ); + + } else { + //not dictionary encoded + int rowCount = pageV1.getIndexRowCount() + .orElseGet(() -> calculateRowCount(pageV1, ByteBufferInputStream.wrap(ByteBuffer.wrap(originalBytes)))); + + return new DataPageV1(compress(BytesInput.from(originalBytes)), pageV1.getValueCount(), + pageV1.getUncompressedSize(), pageV1.getFirstRowIndex().orElse(-1L), + rowCount, pageV1.getStatistics(), pageV1.getRlEncoding(), + pageV1.getDlEncoding(), pageV1.getValueEncoding() + ); + } + + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public DataPage visit(DataPageV2 page) { + verifyDataEncoding(page.getDataEncoding(), pageDictionary); + + if (page.getDataEncoding().usesDictionary()) { + + ValuesReader reader = page.getDataEncoding() + .getDictionaryBasedValuesReader(columnDesc, VALUES, pageDictionary); + + try { + reader.initFromPage(page.getValueCount(), page.getData().toInputStream()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + for (int i = 0; i < page.getValueCount() - page.getNullCount(); i++) { + dataWriter.accept(reader, newValuesWriter); + } + + BytesInput dataBytes = newValuesWriter.getBytes(); + Encoding dataEncoding = newValuesWriter.getEncoding(); + int uncompressedSize = Math.toIntExact(dataBytes.size()); + BytesInput compressedBytes = compress(dataBytes); + + newValuesWriter.reset(); + + return DataPageV2.compressed( + page.getRowCount(), page.getNullCount(), page.getValueCount(), + page.getRepetitionLevels(), page.getDefinitionLevels(), dataEncoding, + compressedBytes, uncompressedSize, page.getStatistics() + ); + + } else { //not dictionary encoded + + return DataPageV2.compressed( + page.getRowCount(), page.getNullCount(), page.getValueCount(), + page.getRepetitionLevels(), page.getDefinitionLevels(), page.getDataEncoding(), + compress(page.getData()), page.getUncompressedSize(), page.getStatistics()); + } + } + }); + } + + private void verifyDataEncoding(Encoding encoding, Dictionary pageDictionary) { + if (encoding.usesDictionary() && pageDictionary == null) { + throw new ParquetDecodingException( + format("could not read page in column %s as the dictionary was missing for encoding %s", columnDesc, encoding)); + } + } + + //expensive should be used as a last resort + private int calculateRowCount(DataPageV1 pageV1, ByteBufferInputStream in) { + try { + ValuesReader rlReader = pageV1.getRlEncoding().getValuesReader(columnDesc, REPETITION_LEVEL); + rlReader.initFromPage(pageV1.getValueCount(), in); + int rowCount = 0; + for (int i = 0; i < pageV1.getValueCount(); i++) { + rowCount = rlReader.readInteger() == 0 ? rowCount + 1 : rowCount; + } + return rowCount; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + + private static class SizeEstimator { + //is destination block compressed + private final boolean targetCompressed; + private double factor; + + private SizeEstimator(boolean targetCompressed) { + this.targetCompressed = targetCompressed; + } + + private long estimate(BlockMetaData blockMeta) { + if (factor <= 0) { + //very naive estimator + return targetCompressed ? blockMeta.getCompressedSize() : blockMeta.getTotalByteSize(); + } + return (long) (factor * blockMeta.getTotalByteSize()); + } + + private void update(long actualBytes, BlockMetaData blockMeta) { + // basically remembers last ratio + factor = (double) actualBytes / blockMeta.getTotalByteSize(); + } + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index 043eb24235..d436efb504 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -27,6 +27,7 @@ import org.apache.parquet.column.ColumnReader; import org.apache.parquet.column.ColumnWriteStore; import org.apache.parquet.column.ColumnWriter; +import org.apache.parquet.column.EncodingStats; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.impl.ColumnReadStoreImpl; import org.apache.parquet.column.page.DictionaryPage; @@ -45,7 +46,9 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.CodecFactory; import org.apache.parquet.hadoop.ColumnChunkPageWriteStore; +import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.RowGroupMerger; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ColumnPath; @@ -57,6 +60,7 @@ import org.apache.parquet.hadoop.util.HadoopOutputFile; import org.apache.parquet.internal.column.columnindex.ColumnIndex; import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.io.InputFile; import org.apache.parquet.io.ParquetEncodingException; import org.apache.parquet.io.api.Converter; import org.apache.parquet.io.api.GroupConverter; @@ -71,6 +75,7 @@ import java.io.Closeable; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -117,6 +122,10 @@ public class ParquetRewriter implements Closeable { private String originalCreatedBy = ""; // Unique created_by information from all input files private Set allOriginalCreatedBys = new HashSet<>(); + // Indicates if rowgroups from different needs to be merged. + private boolean mergeRowGroups; + // Max size of the merged rowgroup + private long maxRowGroupSize; public ParquetRewriter(RewriteOptions options) throws IOException { Configuration conf = options.getConf(); @@ -130,6 +139,8 @@ public ParquetRewriter(RewriteOptions options) throws IOException { newCodecName = options.getNewCodecName(); pruneColumns = options.getPruneColumns(); + mergeRowGroups = options.isMergeRowGroups(); + maxRowGroupSize = options.getMaxRowGroupSize(); // Prune columns if specified if (pruneColumns != null && !pruneColumns.isEmpty()) { @@ -246,6 +257,11 @@ public void close() throws IOException { } public void processBlocks() throws IOException { + if (mergeRowGroups) { + mergeRowGroups(); + return; + } + while (reader != null) { processBlocksFromReader(); initNextReader(); @@ -752,6 +768,27 @@ public GroupConverter asGroupConverter() { } } + private void mergeRowGroups() throws IOException { + if (null == reader) { + return; + } + + boolean v2EncodingHint = meta.getBlocks().stream() + .flatMap(b -> b.getColumns().stream()) + .anyMatch(chunk -> { + EncodingStats stats = chunk.getEncodingStats(); + return stats != null && stats.usesV2Pages(); + }); + + List readers = new ArrayList<>(); + do { + readers.add(reader); + initNextReader(); + } + while(reader != null); + new RowGroupMerger(schema, newCodecName, v2EncodingHint).merge(readers, maxRowGroupSize, writer); + } + private static class ColumnChunkEncryptorRunTime { private final InternalColumnEncryptionSetup colEncrSetup; private final BlockCipher.Encryptor dataEncryptor; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java index cc1280921b..c1364f5af4 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path; import org.apache.parquet.Preconditions; import org.apache.parquet.crypto.FileEncryptionProperties; +import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import java.util.Arrays; @@ -41,6 +42,8 @@ public class RewriteOptions { final Map maskColumns; final List encryptColumns; final FileEncryptionProperties fileEncryptionProperties; + final boolean mergeRowGroups; + final long maxRowGroupSize; private RewriteOptions(Configuration conf, List inputFiles, @@ -49,7 +52,9 @@ private RewriteOptions(Configuration conf, CompressionCodecName newCodecName, Map maskColumns, List encryptColumns, - FileEncryptionProperties fileEncryptionProperties) { + FileEncryptionProperties fileEncryptionProperties, + boolean mergeRowGroups, + long maxRowGroupSize) { this.conf = conf; this.inputFiles = inputFiles; this.outputFile = outputFile; @@ -58,6 +63,8 @@ private RewriteOptions(Configuration conf, this.maskColumns = maskColumns; this.encryptColumns = encryptColumns; this.fileEncryptionProperties = fileEncryptionProperties; + this.mergeRowGroups = mergeRowGroups; + this.maxRowGroupSize = maxRowGroupSize; } public Configuration getConf() { @@ -92,6 +99,14 @@ public FileEncryptionProperties getFileEncryptionProperties() { return fileEncryptionProperties; } + public boolean isMergeRowGroups() { + return mergeRowGroups; + } + + public long getMaxRowGroupSize() { + return maxRowGroupSize; + } + // Builder to create a RewriterOptions. public static class Builder { private Configuration conf; @@ -102,6 +117,8 @@ public static class Builder { private Map maskColumns; private List encryptColumns; private FileEncryptionProperties fileEncryptionProperties; + private boolean mergeRowGroups; + private long maxRowGroupSize = ParquetWriter.DEFAULT_BLOCK_SIZE; /** * Create a builder to create a RewriterOptions. @@ -163,6 +180,27 @@ public Builder transform(CompressionCodecName newCodecName) { return this; } + /** + * Enable merging of rowgroups + * + * @return self + */ + public Builder enableRowGroupMerge() { + this.mergeRowGroups = true; + return this; + } + + /** + * Sets the max size of the rowgroup + * + * @param maxRowGroupSize Max row group size + * @return self + */ + public Builder maxRowGroupSize(long maxRowGroupSize) { + this.maxRowGroupSize = maxRowGroupSize; + return this; + } + /** * Set the columns to mask. *

@@ -255,7 +293,9 @@ public RewriteOptions build() { newCodecName, maskColumns, encryptColumns, - fileEncryptionProperties); + fileEncryptionProperties, + mergeRowGroups, + maxRowGroupSize); } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestRowGroupMerging.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestRowGroupMerging.java new file mode 100644 index 0000000000..479fd27c0c --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestRowGroupMerging.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.Preconditions; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.util.CompressionConverter; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Types; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.*; +import java.util.function.ToLongFunction; +import java.util.stream.Collectors; + +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.*; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestRowGroupMerging { + public static final int FILE_SIZE = 10000; + public static final Configuration CONF = new Configuration(); + public static final MessageType FILE_SCHEMA = Types.buildMessage() + .required(INT32).named("id") + .required(BINARY).as(LogicalTypeAnnotation.stringType()).named("string1") + .required(BINARY).as(LogicalTypeAnnotation.stringType()).named("string2") + .named("AppendTest"); + public static final SimpleGroupFactory GROUP_FACTORY = + new SimpleGroupFactory(FILE_SCHEMA); + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + public Path file1; + public List file1content = new ArrayList<>(); + public Path file2; + public List file2content = new ArrayList<>(); + public Path file3; + public List file3content = new ArrayList<>(); + + ParquetFileReader reader1, reader2, reader3; + + @Before + public void createSourceData() throws IOException { + this.file1 = newTemp(); + this.file2 = newTemp(); + this.file3 = newTemp(); + + ParquetWriter writer1 = newWriter(file1, true); + ParquetWriter writer2 = newWriter(file2, false); + ParquetWriter writer3 = newWriter(file3, false); + + for (int i = 0; i < FILE_SIZE; i += 1) { + Group group1 = getGroup(writer1, i); + file1content.add(group1); + + Group group2 = getGroup(writer2, FILE_SIZE + i); + file2content.add(group2); + + Group group3 = getGroup(writer3, 2 * FILE_SIZE + i); + file3content.add(group3); + } + + writer1.close(); + writer2.close(); + writer3.close(); + + this.reader1 = newReader(file1); + this.reader2 = newReader(file2); + this.reader3 = newReader(file3); + } + + @Test + public void testBasicBehavior() throws IOException { + Path combinedFile = newTemp(); + ParquetFileWriter writer = createFileWriter(combinedFile); + + merge(writer, Integer.MAX_VALUE, true, SNAPPY, Collections.emptyMap(), reader1, reader2, reader3); + + + LinkedList expected = new LinkedList<>(); + expected.addAll(file1content); + expected.addAll(file2content); + expected.addAll(file3content); + + ParquetReader reader = ParquetReader + .builder(new GroupReadSupport(), combinedFile) + .build(); + + Group next; + while ((next = reader.read()) != null) { + Group expectedNext = expected.removeFirst(); + // check each value; equals is not supported for simple records + assertEquals("Each id should match", + expectedNext.getInteger("id", 0), next.getInteger("id", 0)); + assertEquals("Each string should match", + expectedNext.getString("string1", 0), next.getString("string1", 0)); + assertEquals("Each string should match", + expectedNext.getString("string2", 0), next.getString("string2", 0)); + } + assertEquals("All records should be present", 0, expected.size()); + + } + + @Test + public void testFileStructure() throws IOException { + Path combinedFile = newTemp(); + ParquetFileWriter writer = createFileWriter(combinedFile); + + merge(writer, Integer.MAX_VALUE, false, GZIP, Collections.singletonMap("test-key", "value1"), + reader1, reader2, reader3); + + ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(combinedFile, new Configuration())); + assertEquals("Should be combined into 1 row group", reader.getRowGroups().size(), 1); + assertEquals("Schema should match the original", reader.getFileMetaData().getSchema(), FILE_SCHEMA); + assertEquals("Row count should be sum of the original counts", + reader.getRowGroups().get(0).getRowCount(), FILE_SIZE * 3); + + assertTrue("all columns are expected to use Gzip compression", + reader.getRowGroups().stream() + .flatMap(g -> g.getColumns().stream()) + .allMatch(c -> c.getCodec().equals(GZIP))); + + assertTrue("Column string1(at pos 1) is expected to be dictionary encoded", + reader.getRowGroups().stream().map(g -> g.getColumns().get(1)) + .allMatch(c -> c.getEncodingStats().hasDictionaryPages() && c.getEncodingStats().hasDictionaryEncodedPages())); + } + + @Test + public void testNewUncompressedBlocksSizing() throws IOException { + Path combinedFile = newTemp(); + ParquetFileWriter writer = createFileWriter(combinedFile); + + Configuration conf = new Configuration(); + + long file1Rows = sumTotalFromGroups(conf, file1, BlockMetaData::getRowCount); + long file2Rows = sumTotalFromGroups(conf, file2, BlockMetaData::getRowCount); + long file3Rows = sumTotalFromGroups(conf, file3, BlockMetaData::getRowCount); + + long file1Size = sumTotalFromGroups(conf, file1, BlockMetaData::getTotalByteSize); + long file2Size = sumTotalFromGroups(conf, file2, BlockMetaData::getTotalByteSize); + long maxRowGroupSize = file1Size + file2Size; + + merge(writer, maxRowGroupSize, false, UNCOMPRESSED, Collections.emptyMap(), reader1, reader2, reader3); + + ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(combinedFile, conf)); + assertEquals("Should be combined into 2 row groups ", 2, reader.getRowGroups().size()); + assertEquals("first row-group count should equal the sum of the first 2 groups combined", file1Rows + file2Rows, + reader.getRowGroups().get(0).getRowCount()); + assertEquals("the second row-group count should be equal to the to total of the third file", file3Rows, + reader.getRowGroups().get(1).getRowCount()); + } + + @Test + public void testNewCompressedBlocksSizing() throws IOException { + Path combinedFile = newTemp(); + ParquetFileWriter writer = createFileWriter(combinedFile); + + Configuration conf = new Configuration(); + + long file1Rows = sumTotalFromGroups(conf, file1, BlockMetaData::getRowCount); + long file2Rows = sumTotalFromGroups(conf, file2, BlockMetaData::getRowCount); + long file3Rows = sumTotalFromGroups(conf, file3, BlockMetaData::getRowCount); + + long maxRowGroupSize = sumTotalFromGroups(conf, file1, BlockMetaData::getCompressedSize) + + sumTotalFromGroups(conf, file2, BlockMetaData::getCompressedSize); + + merge(writer, maxRowGroupSize, true, SNAPPY, Collections.emptyMap(), reader1, reader2, reader3); + + ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(combinedFile, conf)); + assertEquals("Should be combined into 2 row groups ", 2, reader.getRowGroups().size()); + assertEquals("first row-group count should equal the sum of the first 2 groups combined", + file1Rows + file2Rows, reader.getRowGroups().get(0).getRowCount()); + + assertEquals("the second row-group count should be equal to the to total of the third file", + file3Rows, reader.getRowGroups().get(1).getRowCount()); + } + + private void merge(ParquetFileWriter writer, long maxRowGroupSize, boolean useV2, + CompressionCodecName codec, Map extraMeta, ParquetFileReader... files) throws IOException { + writer.start(); + List readers = new ArrayList<>(); + readers.add(reader1); + readers.add(reader2); + readers.add(reader3); + new RowGroupMerger(FILE_SCHEMA, codec, useV2).merge(readers, maxRowGroupSize, writer); + + writer.end(extraMeta); + } + + private long sumTotalFromGroups(Configuration conf, Path file1, ToLongFunction getRowCount) throws IOException { + return ParquetFileReader.open(HadoopInputFile.fromPath(file1, conf)).getRowGroups().stream() + .mapToLong(getRowCount) + .sum(); + } + + private ParquetFileWriter createFileWriter(Path combinedFile) throws IOException { + return new ParquetFileWriter( + CONF, FILE_SCHEMA, combinedFile); + } + + private ParquetFileReader newReader(Path path) throws IOException { + Configuration conf = new Configuration(); + return new CompressionConverter.TransParquetFileReader( + HadoopInputFile.fromPath(path, conf), HadoopReadOptions.builder(conf).build()); + } + + private Path newTemp() throws IOException { + File file = temp.newFile(); + Preconditions.checkArgument(file.delete(), "Could not remove temp file"); + return new Path(file.toString()); + } + + private Group getGroup(ParquetWriter writer1, int i) throws IOException { + Group group1 = GROUP_FACTORY.newGroup(); + group1.add("id", i); + group1.add("string1", "string1-125ahda-2090-410a-b249-59eb61ca17c6-" + i % 100); //force dictionary + group1.add("string2", "string2- 125ahda-2090-410a-b249-59eb61ca17c6-" + i); + writer1.write(group1); + return group1; + } + + private ParquetWriter newWriter(Path file1, boolean v2) throws IOException { + ExampleParquetWriter.Builder builder = ExampleParquetWriter.builder(file1) + .withType(FILE_SCHEMA) + .withCompressionCodec(SNAPPY); + if (v2) + builder.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0); + return builder.build(); + } +} diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java index a08633d15a..5f5d096dd4 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java @@ -492,6 +492,48 @@ public void testMergeTwoFilesOnly() throws Exception { validateRowGroupRowCount(); } + @Test + public void testMergeRowGroupFromTwoFiles() throws Exception { + testMultipleInputFilesSetup(); + + // Only merge two files but do not change anything. + List inputPaths = new ArrayList<>(); + for (EncryptionTestFile inputFile : inputFiles) { + inputPaths.add(new Path(inputFile.getFileName())); + } + Path outputPath = new Path(outputFile); + RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, inputPaths, outputPath); + RewriteOptions options = builder.enableRowGroupMerge().transform(CompressionCodecName.SNAPPY) + .maxRowGroupSize(ParquetWriter.DEFAULT_BLOCK_SIZE).build(); + + rewriter = new ParquetRewriter(options); + rewriter.processBlocks(); + rewriter.close(); + + // Verify the schema are not changed + ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER); + MessageType schema = pmd.getFileMetaData().getSchema(); + MessageType expectSchema = createSchema(); + assertEquals(expectSchema, schema); + + ParquetFileReader outReader = new ParquetFileReader( + HadoopInputFile.fromPath(new Path(outputFile), conf), HadoopReadOptions.builder(conf).build()); + + // Verify that only one RowGroup is created + assertEquals(1, outReader.getRowGroups().size()); + + // Verify codec has been translated + verifyCodec(outputFile, new HashSet() {{ + add(CompressionCodecName.SNAPPY); + }}, null); + + // Verify the merged data are not changed + validateColumnData(Collections.emptySet(), Collections.emptySet(), null); + + // Verify original.created.by is preserved + validateCreatedBy(); + } + @Test(expected = InvalidSchemaException.class) public void testMergeTwoFilesWithDifferentSchema() throws Exception { MessageType schema1 = new MessageType("schema", From c3d5f12e295ad46a65032c337afb41eceb346d00 Mon Sep 17 00:00:00 2001 From: MaheshGPai Date: Wed, 19 Jul 2023 17:48:41 +0530 Subject: [PATCH 2/3] Review comments --- .../parquet/cli/commands/RewriteCommand.java | 7 +++---- .../parquet/hadoop/ParquetFileWriter.java | 1 - .../hadoop/rewrite/ParquetRewriter.java | 3 --- .../hadoop/rewrite/RewriteOptions.java | 15 +++------------ .../hadoop/{ => rewrite}/RowGroupMerger.java | 19 ++++++++++++------- .../hadoop/rewrite/ParquetRewriterTest.java | 4 ++-- .../{ => rewrite}/TestRowGroupMerging.java | 17 +++++++++++++---- 7 files changed, 33 insertions(+), 33 deletions(-) rename parquet-hadoop/src/main/java/org/apache/parquet/hadoop/{ => rewrite}/RowGroupMerger.java (97%) rename parquet-hadoop/src/test/java/org/apache/parquet/hadoop/{ => rewrite}/TestRowGroupMerging.java (95%) diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java index e84235dc56..0c4b4bf08d 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java @@ -84,13 +84,13 @@ public class RewriteCommand extends BaseCommand { @Parameter( names = {"-m", "--merge-rowgroups"}, - description = "", + description = "", required = false) boolean mergeRowGroups; @Parameter( names = {"-s", "--max-rowgroup-size"}, - description = "", + description = "", required = false) long maxRowGroupSize; @@ -135,8 +135,7 @@ private RewriteOptions buildOptionsOrFail() throws IOException { "If merge rowgroup is enabled, max rowgroups size should be specified"); Preconditions.checkArgument(null != codec, "If merge rowgroup is enabled, new compression codec needs to be specified"); - builder.enableRowGroupMerge(); - builder.maxRowGroupSize(maxRowGroupSize); + builder.mergeRowGroups(maxRowGroupSize); } RewriteOptions options = builder.build(); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index 6b7c9219cd..8e087d2a1a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -74,7 +74,6 @@ import org.apache.parquet.hadoop.metadata.GlobalMetaData; import org.apache.parquet.hadoop.metadata.KeyValueMetadataMergeStrategy; import org.apache.parquet.hadoop.metadata.ParquetMetadata; -import org.apache.parquet.hadoop.util.CompressionConverter; import org.apache.parquet.hadoop.util.HadoopOutputFile; import org.apache.parquet.hadoop.util.HadoopStreams; import org.apache.parquet.internal.column.columnindex.ColumnIndex; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index d436efb504..6ab8db686e 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -48,7 +48,6 @@ import org.apache.parquet.hadoop.ColumnChunkPageWriteStore; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetFileWriter; -import org.apache.parquet.hadoop.RowGroupMerger; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ColumnPath; @@ -60,7 +59,6 @@ import org.apache.parquet.hadoop.util.HadoopOutputFile; import org.apache.parquet.internal.column.columnindex.ColumnIndex; import org.apache.parquet.internal.column.columnindex.OffsetIndex; -import org.apache.parquet.io.InputFile; import org.apache.parquet.io.ParquetEncodingException; import org.apache.parquet.io.api.Converter; import org.apache.parquet.io.api.GroupConverter; @@ -75,7 +73,6 @@ import java.io.Closeable; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java index c1364f5af4..89d154b254 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java @@ -181,23 +181,14 @@ public Builder transform(CompressionCodecName newCodecName) { } /** - * Enable merging of rowgroups - * - * @return self - */ - public Builder enableRowGroupMerge() { - this.mergeRowGroups = true; - return this; - } - - /** - * Sets the max size of the rowgroup + * Sets the max size of the rowgroup and enables rowgroup merging * * @param maxRowGroupSize Max row group size * @return self */ - public Builder maxRowGroupSize(long maxRowGroupSize) { + public Builder mergeRowGroups(long maxRowGroupSize) { this.maxRowGroupSize = maxRowGroupSize; + this.mergeRowGroups = true; return this; } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RowGroupMerger.java similarity index 97% rename from parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java rename to parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RowGroupMerger.java index e30a070aae..11f882695f 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RowGroupMerger.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.parquet.hadoop; +package org.apache.parquet.hadoop.rewrite; import static java.lang.String.format; import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL; @@ -26,7 +26,11 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.nio.ByteBuffer; -import java.util.*; +import java.util.List; +import java.util.Collections; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.AbstractMap.SimpleEntry; import java.util.Map.Entry; import java.util.function.BiConsumer; @@ -50,19 +54,20 @@ import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.CompressionCodecName; -import org.apache.parquet.hadoop.util.CompressionConverter; -import org.apache.parquet.io.InputFile; import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; -public class RowGroupMerger { +class RowGroupMerger { private final MessageType schema; - private final CodecFactory.BytesInputCompressor compressor; + private final CompressionCodecFactory.BytesInputCompressor compressor; private final ParquetProperties parquetProperties; public RowGroupMerger(MessageType schema, CompressionCodecName compression, boolean useV2ValueWriter) { @@ -88,7 +93,7 @@ public RowGroupMerger(MessageType schema, CompressionCodecName compression, bool * @throws IOException if an IO error occurs */ public void merge(List inputFiles, final long maxRowGroupSize, - ParquetFileWriter writer) throws IOException { + ParquetFileWriter writer) throws IOException { SizeEstimator estimator = new SizeEstimator(compressor.getCodecName() != CompressionCodecName.UNCOMPRESSED); MutableMergedBlock mergedBlock = null; diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java index 5f5d096dd4..a75c29d382 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java @@ -503,8 +503,8 @@ public void testMergeRowGroupFromTwoFiles() throws Exception { } Path outputPath = new Path(outputFile); RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, inputPaths, outputPath); - RewriteOptions options = builder.enableRowGroupMerge().transform(CompressionCodecName.SNAPPY) - .maxRowGroupSize(ParquetWriter.DEFAULT_BLOCK_SIZE).build(); + RewriteOptions options = builder.mergeRowGroups(ParquetWriter.DEFAULT_BLOCK_SIZE) + .transform(CompressionCodecName.SNAPPY).build(); rewriter = new ParquetRewriter(options); rewriter.processBlocks(); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestRowGroupMerging.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/TestRowGroupMerging.java similarity index 95% rename from parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestRowGroupMerging.java rename to parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/TestRowGroupMerging.java index 479fd27c0c..69a27b55f0 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestRowGroupMerging.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/TestRowGroupMerging.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.parquet.hadoop; +package org.apache.parquet.hadoop.rewrite; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -25,6 +25,10 @@ import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.hadoop.metadata.BlockMetaData; @@ -41,11 +45,16 @@ import java.io.File; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; import java.util.function.ToLongFunction; -import java.util.stream.Collectors; -import static org.apache.parquet.hadoop.metadata.CompressionCodecName.*; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import static org.junit.Assert.assertEquals; From 1f585e55a14fca684c6e139e7417cb7bbfca494e Mon Sep 17 00:00:00 2001 From: mpairamanat Date: Sat, 23 Sep 2023 14:11:40 +0530 Subject: [PATCH 3/3] Merge statistics --- .../hadoop/rewrite/RowGroupMerger.java | 32 +++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RowGroupMerger.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RowGroupMerger.java index 11f882695f..3dd1cdb18f 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RowGroupMerger.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RowGroupMerger.java @@ -51,6 +51,7 @@ import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.compression.CompressionCodecFactory; @@ -298,6 +299,7 @@ void merge(BlockMetaData blockMeta, PageReadStore group) throws IOException { for (Entry col : getColumnsInOrder(blockMeta, schema)) { MutableMergedColumn column = getOrCreateColumn(col.getKey()); + column.statistics.mergeStatistics(col.getValue().getStatistics()); PageReader columnReader = group.getPageReader(col.getKey()); DictionaryPage dictPage = columnReader.readDictionaryPage(); @@ -392,7 +394,7 @@ public DataPage visit(DataPageV2 pageV2) { } ReadOnlyMergedColumn column = new ReadOnlyMergedColumn(pages, dictPage, col.getKey(), - col.getValue().getValueCount(), compressor.getCodecName()); + col.getValue().getValueCount(), compressor.getCodecName(), col.getValue().getStatistics()); columns.add(column); } @@ -406,14 +408,16 @@ private static class ReadOnlyMergedColumn implements MergedColumn { private final ColumnDescriptor columnDesc; private final long valueCount; private final CompressionCodecName codecName; + private final Statistics statistics; private ReadOnlyMergedColumn(List pages, DictionaryPage dictionary, ColumnDescriptor columnDesc, - long valueCount, CompressionCodecName codecName) { + long valueCount, CompressionCodecName codecName, Statistics statistics) { this.pages = pages; this.dictionary = dictionary; this.columnDesc = columnDesc; this.valueCount = valueCount; this.codecName = codecName; + this.statistics = statistics; } @Override @@ -433,6 +437,17 @@ public ColumnDescriptor getColumnDesc() { @Override public void writeDataPagesTo(ParquetFileWriter writer) { + if(pages.isEmpty()) { + return; + } + Statistics destinationStatistics = (pages.get(0) instanceof DataPageV1) ? + ((DataPageV1)pages.get(0)).getStatistics() : + ((DataPageV2)pages.get(0)).getStatistics(); + // Has statistics been read into the pages? + if(destinationStatistics.getNumNulls() < 0) { + destinationStatistics.incrementNumNulls(); // Set this to 0 + destinationStatistics.mergeStatistics(statistics); + } pages.forEach(page -> writePageTo(page, writer)); } @@ -451,6 +466,7 @@ private class MutableMergedColumn implements MergedColumn { private final ValuesWriter newValuesWriter; private final BiConsumer dataWriter; private final Consumer compressedSizeAccumulator; + private final Statistics statistics; private long valueCount; @@ -459,6 +475,7 @@ private MutableMergedColumn(ColumnDescriptor column, Consumer compressedSi this.compressedSizeAccumulator = compressedSizeAccumulator; this.newValuesWriter = parquetProperties.newValuesWriter(columnDesc); this.dataWriter = createWritingBridge(columnDesc.getPrimitiveType().getPrimitiveTypeName()); + this.statistics = Statistics.createStats(column.getPrimitiveType()); } @Override @@ -478,6 +495,17 @@ public ColumnDescriptor getColumnDesc() { @Override public void writeDataPagesTo(ParquetFileWriter writer) { + if(pages.isEmpty()) { + return; + } + Statistics destinationStatistics = (pages.get(0) instanceof DataPageV1) ? + ((DataPageV1)pages.get(0)).getStatistics() : + ((DataPageV2)pages.get(0)).getStatistics(); + // Has statistics been read into the pages? + if(destinationStatistics.getNumNulls() < 0) { + destinationStatistics.incrementNumNulls(); // Set this to 0 + destinationStatistics.mergeStatistics(statistics); + } pages.forEach(page -> writePageTo(page, writer)); }