Skip to content

Commit

Permalink
Merge statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
mpairamanat committed Sep 23, 2023
1 parent c3d5f12 commit 1f585e5
Showing 1 changed file with 30 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.compression.CompressionCodecFactory;
Expand Down Expand Up @@ -298,6 +299,7 @@ void merge(BlockMetaData blockMeta, PageReadStore group) throws IOException {
for (Entry<ColumnDescriptor, ColumnChunkMetaData> col : getColumnsInOrder(blockMeta, schema)) {

MutableMergedColumn column = getOrCreateColumn(col.getKey());
column.statistics.mergeStatistics(col.getValue().getStatistics());
PageReader columnReader = group.getPageReader(col.getKey());

DictionaryPage dictPage = columnReader.readDictionaryPage();
Expand Down Expand Up @@ -392,7 +394,7 @@ public DataPage visit(DataPageV2 pageV2) {
}

ReadOnlyMergedColumn column = new ReadOnlyMergedColumn(pages, dictPage, col.getKey(),
col.getValue().getValueCount(), compressor.getCodecName());
col.getValue().getValueCount(), compressor.getCodecName(), col.getValue().getStatistics());

columns.add(column);
}
Expand All @@ -406,14 +408,16 @@ private static class ReadOnlyMergedColumn implements MergedColumn {
private final ColumnDescriptor columnDesc;
private final long valueCount;
private final CompressionCodecName codecName;
private final Statistics statistics;

private ReadOnlyMergedColumn(List<DataPage> pages, DictionaryPage dictionary, ColumnDescriptor columnDesc,
long valueCount, CompressionCodecName codecName) {
long valueCount, CompressionCodecName codecName, Statistics statistics) {
this.pages = pages;
this.dictionary = dictionary;
this.columnDesc = columnDesc;
this.valueCount = valueCount;
this.codecName = codecName;
this.statistics = statistics;
}

@Override
Expand All @@ -433,6 +437,17 @@ public ColumnDescriptor getColumnDesc() {

@Override
public void writeDataPagesTo(ParquetFileWriter writer) {
if(pages.isEmpty()) {
return;
}
Statistics destinationStatistics = (pages.get(0) instanceof DataPageV1) ?
((DataPageV1)pages.get(0)).getStatistics() :
((DataPageV2)pages.get(0)).getStatistics();
// Has statistics been read into the pages?
if(destinationStatistics.getNumNulls() < 0) {
destinationStatistics.incrementNumNulls(); // Set this to 0
destinationStatistics.mergeStatistics(statistics);
}
pages.forEach(page -> writePageTo(page, writer));
}

Expand All @@ -451,6 +466,7 @@ private class MutableMergedColumn implements MergedColumn {
private final ValuesWriter newValuesWriter;
private final BiConsumer<ValuesReader, ValuesWriter> dataWriter;
private final Consumer<Long> compressedSizeAccumulator;
private final Statistics statistics;

private long valueCount;

Expand All @@ -459,6 +475,7 @@ private MutableMergedColumn(ColumnDescriptor column, Consumer<Long> compressedSi
this.compressedSizeAccumulator = compressedSizeAccumulator;
this.newValuesWriter = parquetProperties.newValuesWriter(columnDesc);
this.dataWriter = createWritingBridge(columnDesc.getPrimitiveType().getPrimitiveTypeName());
this.statistics = Statistics.createStats(column.getPrimitiveType());
}

@Override
Expand All @@ -478,6 +495,17 @@ public ColumnDescriptor getColumnDesc() {

@Override
public void writeDataPagesTo(ParquetFileWriter writer) {
if(pages.isEmpty()) {
return;
}
Statistics destinationStatistics = (pages.get(0) instanceof DataPageV1) ?
((DataPageV1)pages.get(0)).getStatistics() :
((DataPageV2)pages.get(0)).getStatistics();
// Has statistics been read into the pages?
if(destinationStatistics.getNumNulls() < 0) {
destinationStatistics.incrementNumNulls(); // Set this to 0
destinationStatistics.mergeStatistics(statistics);
}
pages.forEach(page -> writePageTo(page, writer));
}

Expand Down

0 comments on commit 1f585e5

Please sign in to comment.