Skip to content

Commit

Permalink
Review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
MaheshGPai authored and mpairamanat committed Jul 19, 2023
1 parent 1f511a6 commit a07e39f
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,13 @@ public class RewriteCommand extends BaseCommand {

@Parameter(
names = {"-m", "--merge-rowgroups"},
description = "<true/false>",
description = "<merge multiple rowgroups into one>",
required = false)
boolean mergeRowGroups;

@Parameter(
names = {"-s", "--max-rowgroup-size"},
description = "<max size of the merged rowgroups>",
description = "<max size of the merged rowgroups. This should be used along with -m/--merge-rowgroups option>",
required = false)
long maxRowGroupSize;

Expand Down Expand Up @@ -125,8 +125,7 @@ private RewriteOptions buildOptionsOrFail() {
"If merge rowgroup is enabled, max rowgroups size should be specified");
Preconditions.checkArgument(null != codec,
"If merge rowgroup is enabled, new compression codec needs to be specified");
builder.enableRowGroupMerge();
builder.maxRowGroupSize(maxRowGroupSize);
builder.mergeRowGroups(maxRowGroupSize);
}
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
Expand Down Expand Up @@ -75,7 +74,6 @@
import org.apache.parquet.hadoop.metadata.GlobalMetaData;
import org.apache.parquet.hadoop.metadata.KeyValueMetadataMergeStrategy;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.CompressionConverter;
import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.parquet.hadoop.util.HadoopStreams;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.parquet.hadoop.ColumnChunkPageWriteStore;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.RowGroupMerger;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
Expand All @@ -60,7 +59,6 @@
import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.ParquetEncodingException;
import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
Expand All @@ -75,7 +73,6 @@

import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,23 +181,14 @@ public Builder transform(CompressionCodecName newCodecName) {
}

/**
* Enable merging of rowgroups
*
* @return self
*/
public Builder enableRowGroupMerge() {
this.mergeRowGroups = true;
return this;
}

/**
* Sets the max size of the rowgroup
* Sets the max size of the rowgroup and enables rowgroup merging
*
* @param maxRowGroupSize Max row group size
* @return self
*/
public Builder maxRowGroupSize(long maxRowGroupSize) {
public Builder mergeRowGroups(long maxRowGroupSize) {
this.maxRowGroupSize = maxRowGroupSize;
this.mergeRowGroups = true;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop;
package org.apache.parquet.hadoop.rewrite;

import static java.lang.String.format;
import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
Expand All @@ -26,7 +26,11 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.List;
import java.util.Collections;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.AbstractMap.SimpleEntry;
import java.util.Map.Entry;
import java.util.function.BiConsumer;
Expand All @@ -50,19 +54,20 @@
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.hadoop.CodecFactory;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.util.CompressionConverter;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;

public class RowGroupMerger {
class RowGroupMerger {

private final MessageType schema;
private final CodecFactory.BytesInputCompressor compressor;
private final CompressionCodecFactory.BytesInputCompressor compressor;
private final ParquetProperties parquetProperties;

public RowGroupMerger(MessageType schema, CompressionCodecName compression, boolean useV2ValueWriter) {
Expand All @@ -88,7 +93,7 @@ public RowGroupMerger(MessageType schema, CompressionCodecName compression, bool
* @throws IOException if an IO error occurs
*/
public void merge(List<ParquetFileReader> inputFiles, final long maxRowGroupSize,
ParquetFileWriter writer) throws IOException {
ParquetFileWriter writer) throws IOException {

SizeEstimator estimator = new SizeEstimator(compressor.getCodecName() != CompressionCodecName.UNCOMPRESSED);
MutableMergedBlock mergedBlock = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,8 +498,8 @@ public void testMergeRowGroupFromTwoFiles() throws Exception {
}
Path outputPath = new Path(outputFile);
RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, inputPaths, outputPath);
RewriteOptions options = builder.enableRowGroupMerge().transform(CompressionCodecName.SNAPPY)
.maxRowGroupSize(ParquetWriter.DEFAULT_BLOCK_SIZE).build();
RewriteOptions options = builder.mergeRowGroups(ParquetWriter.DEFAULT_BLOCK_SIZE)
.transform(CompressionCodecName.SNAPPY).build();

rewriter = new ParquetRewriter(options);
rewriter.processBlocks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop;
package org.apache.parquet.hadoop.rewrite;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand All @@ -25,6 +25,10 @@
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
Expand All @@ -41,11 +45,16 @@

import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;

import static org.apache.parquet.hadoop.metadata.CompressionCodecName.*;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.junit.Assert.assertEquals;
Expand Down

0 comments on commit a07e39f

Please sign in to comment.