Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-1381: Support merging of rowgroups during file rewrite #1121

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,18 @@ public class RewriteCommand extends BaseCommand {
required = false)
String codec;

@Parameter(
names = {"-m", "--merge-rowgroups"},
description = "<merge multiple rowgroups into one>",
required = false)
boolean mergeRowGroups;

@Parameter(
names = {"-s", "--max-rowgroup-size"},
description = "<max size of the merged rowgroups. This should be used along with -m/--merge-rowgroups option>",
required = false)
long maxRowGroupSize;

public RewriteCommand(Logger console) {
super(console);
}
Expand Down Expand Up @@ -118,6 +130,14 @@ private RewriteOptions buildOptionsOrFail() throws IOException {
builder.transform(codecName);
}

if (mergeRowGroups) {
Preconditions.checkArgument(maxRowGroupSize > 0,
"If merge rowgroup is enabled, max rowgroups size should be specified");
Preconditions.checkArgument(null != codec,
"If merge rowgroup is enabled, new compression codec needs to be specified");
builder.mergeRowGroups(maxRowGroupSize);
}

RewriteOptions options = builder.build();

// If RewriteOptions are successfully built and the overwrite option is specified, remove the output path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1033,32 +1033,7 @@ public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup,
boolean dropColumns) throws IOException {
startBlock(rowGroup.getRowCount());

Map<String, ColumnChunkMetaData> columnsToCopy =
new HashMap<String, ColumnChunkMetaData>();
for (ColumnChunkMetaData chunk : rowGroup.getColumns()) {
columnsToCopy.put(chunk.getPath().toDotString(), chunk);
}

List<ColumnChunkMetaData> columnsInOrder =
new ArrayList<ColumnChunkMetaData>();

for (ColumnDescriptor descriptor : schema.getColumns()) {
String path = ColumnPath.get(descriptor.getPath()).toDotString();
ColumnChunkMetaData chunk = columnsToCopy.remove(path);
if (chunk != null) {
columnsInOrder.add(chunk);
} else {
throw new IllegalArgumentException(String.format(
"Missing column '%s', cannot copy row group: %s", path, rowGroup));
}
}

// complain if some columns would be dropped and that's not okay
if (!dropColumns && !columnsToCopy.isEmpty()) {
throw new IllegalArgumentException(String.format(
"Columns cannot be copied (missing from target schema): %s",
String.join(", ", columnsToCopy.keySet())));
}
List<ColumnChunkMetaData> columnsInOrder = getColumnsInOrder(rowGroup, dropColumns);

// copy the data for all chunks
long start = -1;
Expand Down Expand Up @@ -1157,6 +1132,44 @@ public void appendColumnChunk(ColumnDescriptor descriptor, SeekableInputStream f
currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + chunk.getTotalUncompressedSize());
}

private List<ColumnChunkMetaData> getColumnsInOrder(BlockMetaData rowGroup, boolean dropColumns) {
return getColumnsInOrder(rowGroup, schema, dropColumns);
}

/**
* @param rowGroup row group containing columns
* @param schema the schema to use for column ordering
* @param dropColumns whether we should drop columns that are not defined in the provided schema
*/
public static List<ColumnChunkMetaData> getColumnsInOrder(BlockMetaData rowGroup,
MessageType schema, boolean dropColumns) {
Map<String, ColumnChunkMetaData> columnsToCopy = new HashMap<>();
for (ColumnChunkMetaData chunk : rowGroup.getColumns()) {
columnsToCopy.put(chunk.getPath().toDotString(), chunk);
}

List<ColumnChunkMetaData> columnsInOrder = new ArrayList<>();

for (ColumnDescriptor descriptor : schema.getColumns()) {
String path = ColumnPath.get(descriptor.getPath()).toDotString();
ColumnChunkMetaData chunk = columnsToCopy.remove(path);
if (chunk != null) {
columnsInOrder.add(chunk);
} else {
throw new IllegalArgumentException(String.format(
"Missing column '%s', cannot copy row group: %s", path, rowGroup));
}
}

// complain if some columns would be dropped and that's not okay
if (!dropColumns && !columnsToCopy.isEmpty()) {
throw new IllegalArgumentException(String.format(
"Columns cannot be copied (missing from target schema): %s",
String.join(", ", columnsToCopy.keySet())));
}
return columnsInOrder;
}

// Buffers for the copy function.
private static final ThreadLocal<byte[]> COPY_BUFFER = ThreadLocal.withInitial(() -> new byte[8192]);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.parquet.column.ColumnReader;
import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ColumnWriter;
import org.apache.parquet.column.EncodingStats;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.impl.ColumnReadStoreImpl;
import org.apache.parquet.column.page.DictionaryPage;
Expand All @@ -45,6 +46,7 @@
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.CodecFactory;
import org.apache.parquet.hadoop.ColumnChunkPageWriteStore;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
Expand Down Expand Up @@ -117,6 +119,10 @@ public class ParquetRewriter implements Closeable {
private String originalCreatedBy = "";
// Unique created_by information from all input files
private Set<String> allOriginalCreatedBys = new HashSet<>();
// Indicates if rowgroups from different needs to be merged.
private boolean mergeRowGroups;
// Max size of the merged rowgroup
private long maxRowGroupSize;

public ParquetRewriter(RewriteOptions options) throws IOException {
Configuration conf = options.getConf();
Expand All @@ -130,6 +136,8 @@ public ParquetRewriter(RewriteOptions options) throws IOException {

newCodecName = options.getNewCodecName();
pruneColumns = options.getPruneColumns();
mergeRowGroups = options.isMergeRowGroups();
maxRowGroupSize = options.getMaxRowGroupSize();

// Prune columns if specified
if (pruneColumns != null && !pruneColumns.isEmpty()) {
Expand Down Expand Up @@ -246,6 +254,11 @@ public void close() throws IOException {
}

public void processBlocks() throws IOException {
if (mergeRowGroups) {
mergeRowGroups();
return;
}

while (reader != null) {
processBlocksFromReader();
initNextReader();
Expand Down Expand Up @@ -752,6 +765,27 @@ public GroupConverter asGroupConverter() {
}
}

private void mergeRowGroups() throws IOException {
if (null == reader) {
return;
}

boolean v2EncodingHint = meta.getBlocks().stream()
.flatMap(b -> b.getColumns().stream())
.anyMatch(chunk -> {
EncodingStats stats = chunk.getEncodingStats();
return stats != null && stats.usesV2Pages();
});

List<ParquetFileReader> readers = new ArrayList<>();
do {
readers.add(reader);
initNextReader();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like v2EncodingHint only checks the first parquet file..

Should all the files to be checked?

}
while(reader != null);
new RowGroupMerger(schema, newCodecName, v2EncodingHint).merge(readers, maxRowGroupSize, writer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't review it in depth. Does it handle encryption or masking properties internally?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Underneath, it uses the same instance of ParquetFileWriter which handles these operations.

}

private static class ColumnChunkEncryptorRunTime {
private final InternalColumnEncryptionSetup colEncrSetup;
private final BlockCipher.Encryptor dataEncryptor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.parquet.Preconditions;
import org.apache.parquet.crypto.FileEncryptionProperties;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

import java.util.Arrays;
Expand All @@ -41,6 +42,8 @@ public class RewriteOptions {
final Map<String, MaskMode> maskColumns;
final List<String> encryptColumns;
final FileEncryptionProperties fileEncryptionProperties;
final boolean mergeRowGroups;
final long maxRowGroupSize;

private RewriteOptions(Configuration conf,
List<Path> inputFiles,
Expand All @@ -49,7 +52,9 @@ private RewriteOptions(Configuration conf,
CompressionCodecName newCodecName,
Map<String, MaskMode> maskColumns,
List<String> encryptColumns,
FileEncryptionProperties fileEncryptionProperties) {
FileEncryptionProperties fileEncryptionProperties,
boolean mergeRowGroups,
long maxRowGroupSize) {
this.conf = conf;
this.inputFiles = inputFiles;
this.outputFile = outputFile;
Expand All @@ -58,6 +63,8 @@ private RewriteOptions(Configuration conf,
this.maskColumns = maskColumns;
this.encryptColumns = encryptColumns;
this.fileEncryptionProperties = fileEncryptionProperties;
this.mergeRowGroups = mergeRowGroups;
this.maxRowGroupSize = maxRowGroupSize;
}

public Configuration getConf() {
Expand Down Expand Up @@ -92,6 +99,14 @@ public FileEncryptionProperties getFileEncryptionProperties() {
return fileEncryptionProperties;
}

public boolean isMergeRowGroups() {
return mergeRowGroups;
}

public long getMaxRowGroupSize() {
return maxRowGroupSize;
}

// Builder to create a RewriterOptions.
public static class Builder {
private Configuration conf;
Expand All @@ -102,6 +117,8 @@ public static class Builder {
private Map<String, MaskMode> maskColumns;
private List<String> encryptColumns;
private FileEncryptionProperties fileEncryptionProperties;
private boolean mergeRowGroups;
private long maxRowGroupSize = ParquetWriter.DEFAULT_BLOCK_SIZE;

/**
* Create a builder to create a RewriterOptions.
Expand Down Expand Up @@ -163,6 +180,18 @@ public Builder transform(CompressionCodecName newCodecName) {
return this;
}

/**
* Sets the max size of the rowgroup and enables rowgroup merging
*
* @param maxRowGroupSize Max row group size
* @return self
*/
public Builder mergeRowGroups(long maxRowGroupSize) {
this.maxRowGroupSize = maxRowGroupSize;
this.mergeRowGroups = true;
return this;
}

/**
* Set the columns to mask.
* <p>
Expand Down Expand Up @@ -255,7 +284,9 @@ public RewriteOptions build() {
newCodecName,
maskColumns,
encryptColumns,
fileEncryptionProperties);
fileEncryptionProperties,
mergeRowGroups,
maxRowGroupSize);
}
}

Expand Down
Loading