diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java index da0b3eff4e..0c4b4bf08d 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java @@ -82,6 +82,18 @@ public class RewriteCommand extends BaseCommand { required = false) String codec; + @Parameter( + names = {"-m", "--merge-rowgroups"}, + description = "", + required = false) + boolean mergeRowGroups; + + @Parameter( + names = {"-s", "--max-rowgroup-size"}, + description = "", + required = false) + long maxRowGroupSize; + public RewriteCommand(Logger console) { super(console); } @@ -118,6 +130,14 @@ private RewriteOptions buildOptionsOrFail() throws IOException { builder.transform(codecName); } + if (mergeRowGroups) { + Preconditions.checkArgument(maxRowGroupSize > 0, + "If merge rowgroup is enabled, max rowgroups size should be specified"); + Preconditions.checkArgument(null != codec, + "If merge rowgroup is enabled, new compression codec needs to be specified"); + builder.mergeRowGroups(maxRowGroupSize); + } + RewriteOptions options = builder.build(); // If RewriteOptions are successfully built and the overwrite option is specified, remove the output path diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index 80b9907a2d..8e087d2a1a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -1033,32 +1033,7 @@ public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup, boolean dropColumns) throws IOException { startBlock(rowGroup.getRowCount()); - Map columnsToCopy = - new HashMap(); - for (ColumnChunkMetaData chunk : rowGroup.getColumns()) { - columnsToCopy.put(chunk.getPath().toDotString(), chunk); - } - - List columnsInOrder = - new ArrayList(); - - for (ColumnDescriptor descriptor : schema.getColumns()) { - String path = ColumnPath.get(descriptor.getPath()).toDotString(); - ColumnChunkMetaData chunk = columnsToCopy.remove(path); - if (chunk != null) { - columnsInOrder.add(chunk); - } else { - throw new IllegalArgumentException(String.format( - "Missing column '%s', cannot copy row group: %s", path, rowGroup)); - } - } - - // complain if some columns would be dropped and that's not okay - if (!dropColumns && !columnsToCopy.isEmpty()) { - throw new IllegalArgumentException(String.format( - "Columns cannot be copied (missing from target schema): %s", - String.join(", ", columnsToCopy.keySet()))); - } + List columnsInOrder = getColumnsInOrder(rowGroup, dropColumns); // copy the data for all chunks long start = -1; @@ -1157,6 +1132,44 @@ public void appendColumnChunk(ColumnDescriptor descriptor, SeekableInputStream f currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + chunk.getTotalUncompressedSize()); } + private List getColumnsInOrder(BlockMetaData rowGroup, boolean dropColumns) { + return getColumnsInOrder(rowGroup, schema, dropColumns); + } + + /** + * @param rowGroup row group containing columns + * @param schema the schema to use for column ordering + * @param dropColumns whether we should drop columns that are not defined in the provided schema + */ + public static List getColumnsInOrder(BlockMetaData rowGroup, + MessageType schema, boolean dropColumns) { + Map columnsToCopy = new HashMap<>(); + for (ColumnChunkMetaData chunk : rowGroup.getColumns()) { + columnsToCopy.put(chunk.getPath().toDotString(), chunk); + } + + List columnsInOrder = new ArrayList<>(); + + for (ColumnDescriptor descriptor : schema.getColumns()) { + String path = ColumnPath.get(descriptor.getPath()).toDotString(); + ColumnChunkMetaData chunk = columnsToCopy.remove(path); + if (chunk != null) { + columnsInOrder.add(chunk); + } else { + throw new IllegalArgumentException(String.format( + "Missing column '%s', cannot copy row group: %s", path, rowGroup)); + } + } + + // complain if some columns would be dropped and that's not okay + if (!dropColumns && !columnsToCopy.isEmpty()) { + throw new IllegalArgumentException(String.format( + "Columns cannot be copied (missing from target schema): %s", + String.join(", ", columnsToCopy.keySet()))); + } + return columnsInOrder; + } + // Buffers for the copy function. private static final ThreadLocal COPY_BUFFER = ThreadLocal.withInitial(() -> new byte[8192]); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index 043eb24235..6ab8db686e 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -27,6 +27,7 @@ import org.apache.parquet.column.ColumnReader; import org.apache.parquet.column.ColumnWriteStore; import org.apache.parquet.column.ColumnWriter; +import org.apache.parquet.column.EncodingStats; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.impl.ColumnReadStoreImpl; import org.apache.parquet.column.page.DictionaryPage; @@ -45,6 +46,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.CodecFactory; import org.apache.parquet.hadoop.ColumnChunkPageWriteStore; +import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; @@ -117,6 +119,10 @@ public class ParquetRewriter implements Closeable { private String originalCreatedBy = ""; // Unique created_by information from all input files private Set allOriginalCreatedBys = new HashSet<>(); + // Indicates if rowgroups from different needs to be merged. + private boolean mergeRowGroups; + // Max size of the merged rowgroup + private long maxRowGroupSize; public ParquetRewriter(RewriteOptions options) throws IOException { Configuration conf = options.getConf(); @@ -130,6 +136,8 @@ public ParquetRewriter(RewriteOptions options) throws IOException { newCodecName = options.getNewCodecName(); pruneColumns = options.getPruneColumns(); + mergeRowGroups = options.isMergeRowGroups(); + maxRowGroupSize = options.getMaxRowGroupSize(); // Prune columns if specified if (pruneColumns != null && !pruneColumns.isEmpty()) { @@ -246,6 +254,11 @@ public void close() throws IOException { } public void processBlocks() throws IOException { + if (mergeRowGroups) { + mergeRowGroups(); + return; + } + while (reader != null) { processBlocksFromReader(); initNextReader(); @@ -752,6 +765,27 @@ public GroupConverter asGroupConverter() { } } + private void mergeRowGroups() throws IOException { + if (null == reader) { + return; + } + + boolean v2EncodingHint = meta.getBlocks().stream() + .flatMap(b -> b.getColumns().stream()) + .anyMatch(chunk -> { + EncodingStats stats = chunk.getEncodingStats(); + return stats != null && stats.usesV2Pages(); + }); + + List readers = new ArrayList<>(); + do { + readers.add(reader); + initNextReader(); + } + while(reader != null); + new RowGroupMerger(schema, newCodecName, v2EncodingHint).merge(readers, maxRowGroupSize, writer); + } + private static class ColumnChunkEncryptorRunTime { private final InternalColumnEncryptionSetup colEncrSetup; private final BlockCipher.Encryptor dataEncryptor; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java index cc1280921b..89d154b254 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path; import org.apache.parquet.Preconditions; import org.apache.parquet.crypto.FileEncryptionProperties; +import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import java.util.Arrays; @@ -41,6 +42,8 @@ public class RewriteOptions { final Map maskColumns; final List encryptColumns; final FileEncryptionProperties fileEncryptionProperties; + final boolean mergeRowGroups; + final long maxRowGroupSize; private RewriteOptions(Configuration conf, List inputFiles, @@ -49,7 +52,9 @@ private RewriteOptions(Configuration conf, CompressionCodecName newCodecName, Map maskColumns, List encryptColumns, - FileEncryptionProperties fileEncryptionProperties) { + FileEncryptionProperties fileEncryptionProperties, + boolean mergeRowGroups, + long maxRowGroupSize) { this.conf = conf; this.inputFiles = inputFiles; this.outputFile = outputFile; @@ -58,6 +63,8 @@ private RewriteOptions(Configuration conf, this.maskColumns = maskColumns; this.encryptColumns = encryptColumns; this.fileEncryptionProperties = fileEncryptionProperties; + this.mergeRowGroups = mergeRowGroups; + this.maxRowGroupSize = maxRowGroupSize; } public Configuration getConf() { @@ -92,6 +99,14 @@ public FileEncryptionProperties getFileEncryptionProperties() { return fileEncryptionProperties; } + public boolean isMergeRowGroups() { + return mergeRowGroups; + } + + public long getMaxRowGroupSize() { + return maxRowGroupSize; + } + // Builder to create a RewriterOptions. public static class Builder { private Configuration conf; @@ -102,6 +117,8 @@ public static class Builder { private Map maskColumns; private List encryptColumns; private FileEncryptionProperties fileEncryptionProperties; + private boolean mergeRowGroups; + private long maxRowGroupSize = ParquetWriter.DEFAULT_BLOCK_SIZE; /** * Create a builder to create a RewriterOptions. @@ -163,6 +180,18 @@ public Builder transform(CompressionCodecName newCodecName) { return this; } + /** + * Sets the max size of the rowgroup and enables rowgroup merging + * + * @param maxRowGroupSize Max row group size + * @return self + */ + public Builder mergeRowGroups(long maxRowGroupSize) { + this.maxRowGroupSize = maxRowGroupSize; + this.mergeRowGroups = true; + return this; + } + /** * Set the columns to mask. *

@@ -255,7 +284,9 @@ public RewriteOptions build() { newCodecName, maskColumns, encryptColumns, - fileEncryptionProperties); + fileEncryptionProperties, + mergeRowGroups, + maxRowGroupSize); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RowGroupMerger.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RowGroupMerger.java new file mode 100644 index 0000000000..3dd1cdb18f --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RowGroupMerger.java @@ -0,0 +1,685 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.rewrite; + +import static java.lang.String.format; +import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL; +import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL; +import static org.apache.parquet.column.ValuesType.VALUES; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Collections; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.AbstractMap.SimpleEntry; +import java.util.Map.Entry; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.Preconditions; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Dictionary; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.column.page.DataPage; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.page.DataPageV2; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; + +class RowGroupMerger { + + private final MessageType schema; + private final CompressionCodecFactory.BytesInputCompressor compressor; + private final ParquetProperties parquetProperties; + + public RowGroupMerger(MessageType schema, CompressionCodecName compression, boolean useV2ValueWriter) { + this(schema, new Configuration(), compression, useV2ValueWriter); + } + + RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName compression, boolean useV2ValueWriter) { + this(schema, conf, compression, createParquetProperties(useV2ValueWriter)); + } + + RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName compression, ParquetProperties parquetProperties) { + this.schema = schema; + this.parquetProperties = parquetProperties; + this.compressor = new CodecFactory(conf, this.parquetProperties.getPageSizeThreshold()).getCompressor(compression); + } + + /** + * Merges the row groups making sure that new row groups do not exceed the supplied maxRowGroupSize + * + * @param inputFiles input files to merge + * @param maxRowGroupSize the max limit for new blocks + * @param writer writer to write the new blocks to + * @throws IOException if an IO error occurs + */ + public void merge(List inputFiles, final long maxRowGroupSize, + ParquetFileWriter writer) throws IOException { + + SizeEstimator estimator = new SizeEstimator(compressor.getCodecName() != CompressionCodecName.UNCOMPRESSED); + MutableMergedBlock mergedBlock = null; + for (ParquetFileReader reader : inputFiles) { + for (BlockMetaData blockMeta : reader.getRowGroups()) { + PageReadStore group = reader.readNextRowGroup(); + Preconditions.checkState(group != null, + "number of groups returned by FileReader does not match metadata"); + + if (mergedBlock != null && mergedBlock.getCompressedSize() + estimator.estimate(blockMeta) > maxRowGroupSize) { + saveBlockTo(mergedBlock, writer); + mergedBlock = null; + } + + if (mergedBlock == null && estimator.estimate(blockMeta) > maxRowGroupSize) { + //save it directly without re encoding it + saveBlockTo(ReadOnlyMergedBlock.of(blockMeta, group, schema, compressor), writer); + continue; + } + + if (mergedBlock == null) { + mergedBlock = new MutableMergedBlock(schema); + } + + long sizeBeforeMerge = mergedBlock.getCompressedSize(); + mergedBlock.merge(blockMeta, group); + //update our estimator + long currentBlockEffect = mergedBlock.getCompressedSize() - sizeBeforeMerge; + estimator.update(currentBlockEffect, blockMeta); + } + } + if (mergedBlock != null) { + saveBlockTo(mergedBlock, writer); + } + mergedBlock = null; + } + + + private void saveBlockTo(MergedBlock block, ParquetFileWriter writer) { + try { + writer.startBlock(block.rowCount()); + + for (MergedColumn col : block.columnsInOrder()) { + writer.startColumn(col.getColumnDesc(), col.getValueCount(), col.getCompression()); + + col.writeDictionaryPageTo(writer); + col.writeDataPagesTo(writer); + + writer.endColumn(); + } + + writer.endBlock(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private static ParquetProperties createParquetProperties(boolean useV2Writer) { + ParquetProperties.Builder builder = ParquetProperties.builder(); + if (useV2Writer) { + builder.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0); + } + return builder.build(); + } + + private BytesInput compress(BytesInput bytes) { + return compress(bytes, compressor); + } + + private static BytesInput compress(BytesInput bytes, CompressionCodecFactory.BytesInputCompressor compressor) { + try { + //we copy as some compressors use shared memory + return BytesInput.copy(compressor.compress(bytes)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private BiConsumer createWritingBridge(PrimitiveType.PrimitiveTypeName typeName) { + switch (typeName) { + case FIXED_LEN_BYTE_ARRAY: + case INT96: + case BINARY: + return (src, dest) -> dest.writeBytes(src.readBytes()); + case BOOLEAN: + return (src, dest) -> dest.writeBoolean(src.readBoolean()); + case DOUBLE: + return (src, dest) -> dest.writeDouble(src.readDouble()); + case FLOAT: + return (src, dest) -> dest.writeFloat(src.readFloat()); + case INT32: + return (src, dest) -> dest.writeInteger(src.readInteger()); + case INT64: + return (src, dest) -> dest.writeLong(src.readLong()); + default: + throw new RuntimeException("Unsupported column primitive type: " + typeName.name()); + } + } + + private static void writePageTo(DataPage dataPage, ParquetFileWriter writer) { + dataPage.accept(new DataPage.Visitor() { + @Override + public Void visit(DataPageV1 page) { + try { + if (page.getIndexRowCount().isPresent()) { + writer.writeDataPage(page.getValueCount(), page.getUncompressedSize(), + page.getBytes(), page.getStatistics(), page.getIndexRowCount().get(), page.getRlEncoding(), + page.getDlEncoding(), page.getValueEncoding()); + + } else { + writer.writeDataPage(page.getValueCount(), page.getUncompressedSize(), + page.getBytes(), page.getStatistics(), page.getRlEncoding(), + page.getDlEncoding(), page.getValueEncoding()); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return null; + } + + @Override + public Void visit(DataPageV2 page) { + try { + writer.writeDataPageV2(page.getRowCount(), page.getNullCount(), page.getValueCount(), + page.getRepetitionLevels(), page.getDefinitionLevels(), page.getDataEncoding(), + page.getData(), page.getUncompressedSize(), page.getStatistics()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return null; + } + }); + } + + private static DictionaryPage getCompressedDictionary(DictionaryPage dictionary, CompressionCodecFactory.BytesInputCompressor compressor) { + return new DictionaryPage( + compress(dictionary.getBytes(), compressor), + dictionary.getUncompressedSize(), + dictionary.getDictionarySize(), + dictionary.getEncoding()); + } + + private interface MergedBlock { + long rowCount(); + + List columnsInOrder(); + } + + private interface MergedColumn { + long getValueCount(); + + CompressionCodecName getCompression(); + + ColumnDescriptor getColumnDesc(); + + void writeDataPagesTo(ParquetFileWriter writer); + + void writeDictionaryPageTo(ParquetFileWriter writer) throws IOException; + } + + private class MutableMergedBlock implements MergedBlock { + + private final Map columns = new HashMap<>(); + private final MessageType schema; + private long recordCount; + private long compressedSize; + + private MutableMergedBlock(MessageType schema) { + this.schema = schema; + } + + @Override + public long rowCount() { + return recordCount; + } + + private long getCompressedSize() { + return compressedSize; + } + + @Override + public List columnsInOrder() { + return schema.getColumns() + .stream() + .map(columns::get) + .collect(Collectors.toList()); + } + + MutableMergedColumn getOrCreateColumn(ColumnDescriptor column) { + return columns.computeIfAbsent(column, desc -> new MutableMergedColumn(desc, this::addCompressedBytes)); + } + + void addRowCount(long rowCount) { + recordCount += rowCount; + } + + void addCompressedBytes(long size) { + compressedSize += size; + } + + void merge(BlockMetaData blockMeta, PageReadStore group) throws IOException { + for (Entry col : getColumnsInOrder(blockMeta, schema)) { + + MutableMergedColumn column = getOrCreateColumn(col.getKey()); + column.statistics.mergeStatistics(col.getValue().getStatistics()); + PageReader columnReader = group.getPageReader(col.getKey()); + + DictionaryPage dictPage = columnReader.readDictionaryPage(); + Dictionary decodedDictionary = null; + if (dictPage != null) { + decodedDictionary = dictPage.getEncoding().initDictionary(column.getColumnDesc(), dictPage); + } + + //read all pages in this column chunk + DataPage data; + while ((data = columnReader.readPage()) != null) { + column.addPage(data, decodedDictionary); + } + } + addRowCount(blockMeta.getRowCount()); + } + } + + private static List> getColumnsInOrder(BlockMetaData blockMeta, + MessageType schema) { + + return ParquetFileWriter.getColumnsInOrder(blockMeta, schema, false).stream() + .map(c -> toEntry(schema, c)) + .collect(Collectors.toList()); + } + + private static SimpleEntry toEntry(MessageType schema, + ColumnChunkMetaData column) { + + return new SimpleEntry<>( + schema.getColumnDescription(column.getPath().toArray()), + column); + } + + private static class ReadOnlyMergedBlock implements MergedBlock { + private final List columns; + private final long recordCount; + + private ReadOnlyMergedBlock(List columns, long recordCount) { + this.columns = Collections.unmodifiableList(columns); + this.recordCount = recordCount; + } + + @Override + public long rowCount() { + return recordCount; + } + + @Override + public List columnsInOrder() { + return columns; + } + + static ReadOnlyMergedBlock of(BlockMetaData blockMeta, PageReadStore group, MessageType schema, + CompressionCodecFactory.BytesInputCompressor compressor) { + List columns = new ArrayList<>(); + for (Entry col : getColumnsInOrder(blockMeta, schema)) { + + List pages = new ArrayList<>(); + PageReader columnReader = group.getPageReader(col.getKey()); + + DictionaryPage dictPage = columnReader.readDictionaryPage(); + if (dictPage != null) { + dictPage = getCompressedDictionary(dictPage, compressor); + } + + //read all pages in this column chunk + DataPage data; + while ((data = columnReader.readPage()) != null) { + + data = data.accept(new DataPage.Visitor() { + @Override + public DataPage visit(DataPageV1 pageV1) { + + return new DataPageV1(compress(pageV1.getBytes(), compressor), pageV1.getValueCount(), + pageV1.getUncompressedSize(), pageV1.getFirstRowIndex().orElse(-1L), + pageV1.getIndexRowCount().orElse(-1), pageV1.getStatistics(), pageV1.getRlEncoding(), + pageV1.getDlEncoding(), pageV1.getValueEncoding()); + } + + @Override + public DataPage visit(DataPageV2 pageV2) { + + return DataPageV2.compressed( + pageV2.getRowCount(), pageV2.getNullCount(), pageV2.getValueCount(), + pageV2.getRepetitionLevels(), pageV2.getDefinitionLevels(), pageV2.getDataEncoding(), + compress(pageV2.getData(), compressor), pageV2.getUncompressedSize(), pageV2.getStatistics()); + } + }); + + pages.add(data); + } + + ReadOnlyMergedColumn column = new ReadOnlyMergedColumn(pages, dictPage, col.getKey(), + col.getValue().getValueCount(), compressor.getCodecName(), col.getValue().getStatistics()); + + columns.add(column); + } + return new ReadOnlyMergedBlock(columns, blockMeta.getRowCount()); + } + } + + private static class ReadOnlyMergedColumn implements MergedColumn { + private final List pages; + private final DictionaryPage dictionary; + private final ColumnDescriptor columnDesc; + private final long valueCount; + private final CompressionCodecName codecName; + private final Statistics statistics; + + private ReadOnlyMergedColumn(List pages, DictionaryPage dictionary, ColumnDescriptor columnDesc, + long valueCount, CompressionCodecName codecName, Statistics statistics) { + this.pages = pages; + this.dictionary = dictionary; + this.columnDesc = columnDesc; + this.valueCount = valueCount; + this.codecName = codecName; + this.statistics = statistics; + } + + @Override + public long getValueCount() { + return valueCount; + } + + @Override + public CompressionCodecName getCompression() { + return codecName; + } + + @Override + public ColumnDescriptor getColumnDesc() { + return columnDesc; + } + + @Override + public void writeDataPagesTo(ParquetFileWriter writer) { + if(pages.isEmpty()) { + return; + } + Statistics destinationStatistics = (pages.get(0) instanceof DataPageV1) ? + ((DataPageV1)pages.get(0)).getStatistics() : + ((DataPageV2)pages.get(0)).getStatistics(); + // Has statistics been read into the pages? + if(destinationStatistics.getNumNulls() < 0) { + destinationStatistics.incrementNumNulls(); // Set this to 0 + destinationStatistics.mergeStatistics(statistics); + } + pages.forEach(page -> writePageTo(page, writer)); + } + + @Override + public void writeDictionaryPageTo(ParquetFileWriter writer) throws IOException { + if (dictionary != null) { + writer.writeDictionaryPage(dictionary); + } + } + } + + private class MutableMergedColumn implements MergedColumn { + + private final List pages = new ArrayList<>(); + private final ColumnDescriptor columnDesc; + private final ValuesWriter newValuesWriter; + private final BiConsumer dataWriter; + private final Consumer compressedSizeAccumulator; + private final Statistics statistics; + + private long valueCount; + + private MutableMergedColumn(ColumnDescriptor column, Consumer compressedSizeAccumulator) { + this.columnDesc = column; + this.compressedSizeAccumulator = compressedSizeAccumulator; + this.newValuesWriter = parquetProperties.newValuesWriter(columnDesc); + this.dataWriter = createWritingBridge(columnDesc.getPrimitiveType().getPrimitiveTypeName()); + this.statistics = Statistics.createStats(column.getPrimitiveType()); + } + + @Override + public long getValueCount() { + return valueCount; + } + + @Override + public CompressionCodecName getCompression() { + return compressor.getCodecName(); + } + + @Override + public ColumnDescriptor getColumnDesc() { + return columnDesc; + } + + @Override + public void writeDataPagesTo(ParquetFileWriter writer) { + if(pages.isEmpty()) { + return; + } + Statistics destinationStatistics = (pages.get(0) instanceof DataPageV1) ? + ((DataPageV1)pages.get(0)).getStatistics() : + ((DataPageV2)pages.get(0)).getStatistics(); + // Has statistics been read into the pages? + if(destinationStatistics.getNumNulls() < 0) { + destinationStatistics.incrementNumNulls(); // Set this to 0 + destinationStatistics.mergeStatistics(statistics); + } + pages.forEach(page -> writePageTo(page, writer)); + } + + @Override + public void writeDictionaryPageTo(ParquetFileWriter writer) throws IOException { + DictionaryPage page = newValuesWriter.toDictPageAndClose(); + if (page != null) { + writer.writeDictionaryPage(getCompressedDictionary(page, compressor)); + newValuesWriter.resetDictionary(); + } + } + + void addPage(DataPage data, Dictionary pageDictionary) { + DataPage recodePage = recodePage(data, pageDictionary); + compressedSizeAccumulator.accept((long) recodePage.getCompressedSize()); + valueCount += recodePage.getValueCount(); + pages.add(recodePage); + } + + DataPage recodePage(DataPage data, Dictionary pageDictionary) { + return data.accept(new DataPage.Visitor() { + + @Override + public DataPage visit(DataPageV1 pageV1) { + + try { + verifyDataEncoding(pageV1.getValueEncoding(), pageDictionary); + + final byte[] originalBytes = pageV1.getBytes().toByteArray(); + + if (pageV1.getValueEncoding().usesDictionary()) { + + ValuesReader rlReader = pageV1.getRlEncoding().getValuesReader(columnDesc, REPETITION_LEVEL); + ValuesReader dlReader = pageV1.getDlEncoding().getValuesReader(columnDesc, DEFINITION_LEVEL); + ValuesReader dataReader = pageV1.getValueEncoding() + .getDictionaryBasedValuesReader(columnDesc, VALUES, pageDictionary); + + ByteBufferInputStream input = ByteBufferInputStream.wrap(ByteBuffer.wrap(originalBytes)); + + int startPos = Math.toIntExact(input.position()); + rlReader.initFromPage(pageV1.getValueCount(), input); + dlReader.initFromPage(pageV1.getValueCount(), input); + int dlEndPos = Math.toIntExact(input.position()); + + dataReader.initFromPage(pageV1.getValueCount(), input); + + int rowCount = 0; + for (int i = 0; i < pageV1.getValueCount(); i++) { + if (dlReader.readInteger() == columnDesc.getMaxDefinitionLevel()) + dataWriter.accept(dataReader, newValuesWriter); + + rowCount = rlReader.readInteger() == 0 ? rowCount + 1 : rowCount; + } + + BytesInput recodedBytes = BytesInput.concat( + BytesInput.from(originalBytes, startPos, dlEndPos - startPos), newValuesWriter.getBytes() + ); + + Encoding valuesEncoding = newValuesWriter.getEncoding(); + int uncompressedSize = Math.toIntExact(recodedBytes.size()); + BytesInput compressedBytes = compress(recodedBytes); + + newValuesWriter.reset(); + + long firstRowIndex = pageV1.getFirstRowIndex().orElse(-1L); + + return new DataPageV1(compressedBytes, pageV1.getValueCount(), uncompressedSize, + firstRowIndex, rowCount, pageV1.getStatistics(), pageV1.getRlEncoding(), + pageV1.getDlEncoding(), valuesEncoding + ); + + } else { + //not dictionary encoded + int rowCount = pageV1.getIndexRowCount() + .orElseGet(() -> calculateRowCount(pageV1, ByteBufferInputStream.wrap(ByteBuffer.wrap(originalBytes)))); + + return new DataPageV1(compress(BytesInput.from(originalBytes)), pageV1.getValueCount(), + pageV1.getUncompressedSize(), pageV1.getFirstRowIndex().orElse(-1L), + rowCount, pageV1.getStatistics(), pageV1.getRlEncoding(), + pageV1.getDlEncoding(), pageV1.getValueEncoding() + ); + } + + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public DataPage visit(DataPageV2 page) { + verifyDataEncoding(page.getDataEncoding(), pageDictionary); + + if (page.getDataEncoding().usesDictionary()) { + + ValuesReader reader = page.getDataEncoding() + .getDictionaryBasedValuesReader(columnDesc, VALUES, pageDictionary); + + try { + reader.initFromPage(page.getValueCount(), page.getData().toInputStream()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + for (int i = 0; i < page.getValueCount() - page.getNullCount(); i++) { + dataWriter.accept(reader, newValuesWriter); + } + + BytesInput dataBytes = newValuesWriter.getBytes(); + Encoding dataEncoding = newValuesWriter.getEncoding(); + int uncompressedSize = Math.toIntExact(dataBytes.size()); + BytesInput compressedBytes = compress(dataBytes); + + newValuesWriter.reset(); + + return DataPageV2.compressed( + page.getRowCount(), page.getNullCount(), page.getValueCount(), + page.getRepetitionLevels(), page.getDefinitionLevels(), dataEncoding, + compressedBytes, uncompressedSize, page.getStatistics() + ); + + } else { //not dictionary encoded + + return DataPageV2.compressed( + page.getRowCount(), page.getNullCount(), page.getValueCount(), + page.getRepetitionLevels(), page.getDefinitionLevels(), page.getDataEncoding(), + compress(page.getData()), page.getUncompressedSize(), page.getStatistics()); + } + } + }); + } + + private void verifyDataEncoding(Encoding encoding, Dictionary pageDictionary) { + if (encoding.usesDictionary() && pageDictionary == null) { + throw new ParquetDecodingException( + format("could not read page in column %s as the dictionary was missing for encoding %s", columnDesc, encoding)); + } + } + + //expensive should be used as a last resort + private int calculateRowCount(DataPageV1 pageV1, ByteBufferInputStream in) { + try { + ValuesReader rlReader = pageV1.getRlEncoding().getValuesReader(columnDesc, REPETITION_LEVEL); + rlReader.initFromPage(pageV1.getValueCount(), in); + int rowCount = 0; + for (int i = 0; i < pageV1.getValueCount(); i++) { + rowCount = rlReader.readInteger() == 0 ? rowCount + 1 : rowCount; + } + return rowCount; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + + private static class SizeEstimator { + //is destination block compressed + private final boolean targetCompressed; + private double factor; + + private SizeEstimator(boolean targetCompressed) { + this.targetCompressed = targetCompressed; + } + + private long estimate(BlockMetaData blockMeta) { + if (factor <= 0) { + //very naive estimator + return targetCompressed ? blockMeta.getCompressedSize() : blockMeta.getTotalByteSize(); + } + return (long) (factor * blockMeta.getTotalByteSize()); + } + + private void update(long actualBytes, BlockMetaData blockMeta) { + // basically remembers last ratio + factor = (double) actualBytes / blockMeta.getTotalByteSize(); + } + } +} diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java index a08633d15a..a75c29d382 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java @@ -492,6 +492,48 @@ public void testMergeTwoFilesOnly() throws Exception { validateRowGroupRowCount(); } + @Test + public void testMergeRowGroupFromTwoFiles() throws Exception { + testMultipleInputFilesSetup(); + + // Only merge two files but do not change anything. + List inputPaths = new ArrayList<>(); + for (EncryptionTestFile inputFile : inputFiles) { + inputPaths.add(new Path(inputFile.getFileName())); + } + Path outputPath = new Path(outputFile); + RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, inputPaths, outputPath); + RewriteOptions options = builder.mergeRowGroups(ParquetWriter.DEFAULT_BLOCK_SIZE) + .transform(CompressionCodecName.SNAPPY).build(); + + rewriter = new ParquetRewriter(options); + rewriter.processBlocks(); + rewriter.close(); + + // Verify the schema are not changed + ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER); + MessageType schema = pmd.getFileMetaData().getSchema(); + MessageType expectSchema = createSchema(); + assertEquals(expectSchema, schema); + + ParquetFileReader outReader = new ParquetFileReader( + HadoopInputFile.fromPath(new Path(outputFile), conf), HadoopReadOptions.builder(conf).build()); + + // Verify that only one RowGroup is created + assertEquals(1, outReader.getRowGroups().size()); + + // Verify codec has been translated + verifyCodec(outputFile, new HashSet() {{ + add(CompressionCodecName.SNAPPY); + }}, null); + + // Verify the merged data are not changed + validateColumnData(Collections.emptySet(), Collections.emptySet(), null); + + // Verify original.created.by is preserved + validateCreatedBy(); + } + @Test(expected = InvalidSchemaException.class) public void testMergeTwoFilesWithDifferentSchema() throws Exception { MessageType schema1 = new MessageType("schema", diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/TestRowGroupMerging.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/TestRowGroupMerging.java new file mode 100644 index 0000000000..69a27b55f0 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/TestRowGroupMerging.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.rewrite; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.Preconditions; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.util.CompressionConverter; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Types; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.function.ToLongFunction; + +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestRowGroupMerging { + public static final int FILE_SIZE = 10000; + public static final Configuration CONF = new Configuration(); + public static final MessageType FILE_SCHEMA = Types.buildMessage() + .required(INT32).named("id") + .required(BINARY).as(LogicalTypeAnnotation.stringType()).named("string1") + .required(BINARY).as(LogicalTypeAnnotation.stringType()).named("string2") + .named("AppendTest"); + public static final SimpleGroupFactory GROUP_FACTORY = + new SimpleGroupFactory(FILE_SCHEMA); + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + public Path file1; + public List file1content = new ArrayList<>(); + public Path file2; + public List file2content = new ArrayList<>(); + public Path file3; + public List file3content = new ArrayList<>(); + + ParquetFileReader reader1, reader2, reader3; + + @Before + public void createSourceData() throws IOException { + this.file1 = newTemp(); + this.file2 = newTemp(); + this.file3 = newTemp(); + + ParquetWriter writer1 = newWriter(file1, true); + ParquetWriter writer2 = newWriter(file2, false); + ParquetWriter writer3 = newWriter(file3, false); + + for (int i = 0; i < FILE_SIZE; i += 1) { + Group group1 = getGroup(writer1, i); + file1content.add(group1); + + Group group2 = getGroup(writer2, FILE_SIZE + i); + file2content.add(group2); + + Group group3 = getGroup(writer3, 2 * FILE_SIZE + i); + file3content.add(group3); + } + + writer1.close(); + writer2.close(); + writer3.close(); + + this.reader1 = newReader(file1); + this.reader2 = newReader(file2); + this.reader3 = newReader(file3); + } + + @Test + public void testBasicBehavior() throws IOException { + Path combinedFile = newTemp(); + ParquetFileWriter writer = createFileWriter(combinedFile); + + merge(writer, Integer.MAX_VALUE, true, SNAPPY, Collections.emptyMap(), reader1, reader2, reader3); + + + LinkedList expected = new LinkedList<>(); + expected.addAll(file1content); + expected.addAll(file2content); + expected.addAll(file3content); + + ParquetReader reader = ParquetReader + .builder(new GroupReadSupport(), combinedFile) + .build(); + + Group next; + while ((next = reader.read()) != null) { + Group expectedNext = expected.removeFirst(); + // check each value; equals is not supported for simple records + assertEquals("Each id should match", + expectedNext.getInteger("id", 0), next.getInteger("id", 0)); + assertEquals("Each string should match", + expectedNext.getString("string1", 0), next.getString("string1", 0)); + assertEquals("Each string should match", + expectedNext.getString("string2", 0), next.getString("string2", 0)); + } + assertEquals("All records should be present", 0, expected.size()); + + } + + @Test + public void testFileStructure() throws IOException { + Path combinedFile = newTemp(); + ParquetFileWriter writer = createFileWriter(combinedFile); + + merge(writer, Integer.MAX_VALUE, false, GZIP, Collections.singletonMap("test-key", "value1"), + reader1, reader2, reader3); + + ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(combinedFile, new Configuration())); + assertEquals("Should be combined into 1 row group", reader.getRowGroups().size(), 1); + assertEquals("Schema should match the original", reader.getFileMetaData().getSchema(), FILE_SCHEMA); + assertEquals("Row count should be sum of the original counts", + reader.getRowGroups().get(0).getRowCount(), FILE_SIZE * 3); + + assertTrue("all columns are expected to use Gzip compression", + reader.getRowGroups().stream() + .flatMap(g -> g.getColumns().stream()) + .allMatch(c -> c.getCodec().equals(GZIP))); + + assertTrue("Column string1(at pos 1) is expected to be dictionary encoded", + reader.getRowGroups().stream().map(g -> g.getColumns().get(1)) + .allMatch(c -> c.getEncodingStats().hasDictionaryPages() && c.getEncodingStats().hasDictionaryEncodedPages())); + } + + @Test + public void testNewUncompressedBlocksSizing() throws IOException { + Path combinedFile = newTemp(); + ParquetFileWriter writer = createFileWriter(combinedFile); + + Configuration conf = new Configuration(); + + long file1Rows = sumTotalFromGroups(conf, file1, BlockMetaData::getRowCount); + long file2Rows = sumTotalFromGroups(conf, file2, BlockMetaData::getRowCount); + long file3Rows = sumTotalFromGroups(conf, file3, BlockMetaData::getRowCount); + + long file1Size = sumTotalFromGroups(conf, file1, BlockMetaData::getTotalByteSize); + long file2Size = sumTotalFromGroups(conf, file2, BlockMetaData::getTotalByteSize); + long maxRowGroupSize = file1Size + file2Size; + + merge(writer, maxRowGroupSize, false, UNCOMPRESSED, Collections.emptyMap(), reader1, reader2, reader3); + + ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(combinedFile, conf)); + assertEquals("Should be combined into 2 row groups ", 2, reader.getRowGroups().size()); + assertEquals("first row-group count should equal the sum of the first 2 groups combined", file1Rows + file2Rows, + reader.getRowGroups().get(0).getRowCount()); + assertEquals("the second row-group count should be equal to the to total of the third file", file3Rows, + reader.getRowGroups().get(1).getRowCount()); + } + + @Test + public void testNewCompressedBlocksSizing() throws IOException { + Path combinedFile = newTemp(); + ParquetFileWriter writer = createFileWriter(combinedFile); + + Configuration conf = new Configuration(); + + long file1Rows = sumTotalFromGroups(conf, file1, BlockMetaData::getRowCount); + long file2Rows = sumTotalFromGroups(conf, file2, BlockMetaData::getRowCount); + long file3Rows = sumTotalFromGroups(conf, file3, BlockMetaData::getRowCount); + + long maxRowGroupSize = sumTotalFromGroups(conf, file1, BlockMetaData::getCompressedSize) + + sumTotalFromGroups(conf, file2, BlockMetaData::getCompressedSize); + + merge(writer, maxRowGroupSize, true, SNAPPY, Collections.emptyMap(), reader1, reader2, reader3); + + ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(combinedFile, conf)); + assertEquals("Should be combined into 2 row groups ", 2, reader.getRowGroups().size()); + assertEquals("first row-group count should equal the sum of the first 2 groups combined", + file1Rows + file2Rows, reader.getRowGroups().get(0).getRowCount()); + + assertEquals("the second row-group count should be equal to the to total of the third file", + file3Rows, reader.getRowGroups().get(1).getRowCount()); + } + + private void merge(ParquetFileWriter writer, long maxRowGroupSize, boolean useV2, + CompressionCodecName codec, Map extraMeta, ParquetFileReader... files) throws IOException { + writer.start(); + List readers = new ArrayList<>(); + readers.add(reader1); + readers.add(reader2); + readers.add(reader3); + new RowGroupMerger(FILE_SCHEMA, codec, useV2).merge(readers, maxRowGroupSize, writer); + + writer.end(extraMeta); + } + + private long sumTotalFromGroups(Configuration conf, Path file1, ToLongFunction getRowCount) throws IOException { + return ParquetFileReader.open(HadoopInputFile.fromPath(file1, conf)).getRowGroups().stream() + .mapToLong(getRowCount) + .sum(); + } + + private ParquetFileWriter createFileWriter(Path combinedFile) throws IOException { + return new ParquetFileWriter( + CONF, FILE_SCHEMA, combinedFile); + } + + private ParquetFileReader newReader(Path path) throws IOException { + Configuration conf = new Configuration(); + return new CompressionConverter.TransParquetFileReader( + HadoopInputFile.fromPath(path, conf), HadoopReadOptions.builder(conf).build()); + } + + private Path newTemp() throws IOException { + File file = temp.newFile(); + Preconditions.checkArgument(file.delete(), "Could not remove temp file"); + return new Path(file.toString()); + } + + private Group getGroup(ParquetWriter writer1, int i) throws IOException { + Group group1 = GROUP_FACTORY.newGroup(); + group1.add("id", i); + group1.add("string1", "string1-125ahda-2090-410a-b249-59eb61ca17c6-" + i % 100); //force dictionary + group1.add("string2", "string2- 125ahda-2090-410a-b249-59eb61ca17c6-" + i); + writer1.write(group1); + return group1; + } + + private ParquetWriter newWriter(Path file1, boolean v2) throws IOException { + ExampleParquetWriter.Builder builder = ExampleParquetWriter.builder(file1) + .withType(FILE_SCHEMA) + .withCompressionCodec(SNAPPY); + if (v2) + builder.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0); + return builder.build(); + } +}