Skip to content

Commit

Permalink
Merge branch 'branch-24.06' into update-fmt-and-spdlog
Browse files Browse the repository at this point in the history
  • Loading branch information
jameslamb authored Apr 29, 2024
2 parents 9255aee + 064dd7b commit c06d26b
Show file tree
Hide file tree
Showing 17 changed files with 172 additions and 69 deletions.
18 changes: 17 additions & 1 deletion cpp/src/io/parquet/compact_protocol_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "compact_protocol_reader.hpp"

#include "parquet.hpp"
#include "parquet_common.hpp"

#include <cudf/utilities/error.hpp>

Expand Down Expand Up @@ -652,6 +653,9 @@ void CompactProtocolReader::read(ColumnChunkMetaData* c)
{
using optional_size_statistics =
parquet_field_optional<SizeStatistics, parquet_field_struct<SizeStatistics>>;
using optional_list_enc_stats =
parquet_field_optional<std::vector<PageEncodingStats>,
parquet_field_struct_list<PageEncodingStats>>;
auto op = std::make_tuple(parquet_field_enum<Type>(1, c->type),
parquet_field_enum_list(2, c->encodings),
parquet_field_string_list(3, c->path_in_schema),
Expand All @@ -663,6 +667,7 @@ void CompactProtocolReader::read(ColumnChunkMetaData* c)
parquet_field_int64(10, c->index_page_offset),
parquet_field_int64(11, c->dictionary_page_offset),
parquet_field_struct(12, c->statistics),
optional_list_enc_stats(13, c->encoding_stats),
optional_size_statistics(16, c->size_statistics));
function_builder(this, op);
}
Expand Down Expand Up @@ -758,13 +763,16 @@ void CompactProtocolReader::read(Statistics* s)
{
using optional_binary = parquet_field_optional<std::vector<uint8_t>, parquet_field_binary>;
using optional_int64 = parquet_field_optional<int64_t, parquet_field_int64>;
using optional_bool = parquet_field_optional<bool, parquet_field_bool>;

auto op = std::make_tuple(optional_binary(1, s->max),
optional_binary(2, s->min),
optional_int64(3, s->null_count),
optional_int64(4, s->distinct_count),
optional_binary(5, s->max_value),
optional_binary(6, s->min_value));
optional_binary(6, s->min_value),
optional_bool(7, s->is_max_value_exact),
optional_bool(8, s->is_min_value_exact));
function_builder(this, op);
}

Expand All @@ -774,6 +782,14 @@ void CompactProtocolReader::read(ColumnOrder* c)
function_builder(this, op);
}

void CompactProtocolReader::read(PageEncodingStats* s)
{
auto op = std::make_tuple(parquet_field_enum<PageType>(1, s->page_type),
parquet_field_enum<Encoding>(2, s->encoding),
parquet_field_int32(3, s->count));
function_builder(this, op);
}

void CompactProtocolReader::read(SortingColumn* s)
{
auto op = std::make_tuple(parquet_field_int32(1, s->column_idx),
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/compact_protocol_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class CompactProtocolReader {
void read(ColumnIndex* c);
void read(Statistics* s);
void read(ColumnOrder* c);
void read(PageEncodingStats* s);
void read(SortingColumn* s);

public:
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/io/parquet/compact_protocol_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ size_t CompactProtocolWriter::write(ColumnChunkMetaData const& s)
if (s.index_page_offset != 0) { c.field_int(10, s.index_page_offset); }
if (s.dictionary_page_offset != 0) { c.field_int(11, s.dictionary_page_offset); }
c.field_struct(12, s.statistics);
if (s.encoding_stats.has_value()) { c.field_struct_list(13, s.encoding_stats.value()); }
if (s.size_statistics.has_value()) { c.field_struct(16, s.size_statistics.value()); }
return c.value();
}
Expand All @@ -201,6 +202,8 @@ size_t CompactProtocolWriter::write(Statistics const& s)
if (s.distinct_count.has_value()) { c.field_int(4, s.distinct_count.value()); }
if (s.max_value.has_value()) { c.field_binary(5, s.max_value.value()); }
if (s.min_value.has_value()) { c.field_binary(6, s.min_value.value()); }
if (s.is_max_value_exact.has_value()) { c.field_bool(7, s.is_max_value_exact.value()); }
if (s.is_min_value_exact.has_value()) { c.field_bool(8, s.is_min_value_exact.value()); }
return c.value();
}

Expand Down Expand Up @@ -248,6 +251,15 @@ size_t CompactProtocolWriter::write(ColumnOrder const& co)
return c.value();
}

size_t CompactProtocolWriter::write(PageEncodingStats const& enc)
{
CompactProtocolFieldWriter c(*this);
c.field_int(1, static_cast<int32_t>(enc.page_type));
c.field_int(2, static_cast<int32_t>(enc.encoding));
c.field_int(3, enc.count);
return c.value();
}

size_t CompactProtocolWriter::write(SortingColumn const& sc)
{
CompactProtocolFieldWriter c(*this);
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/compact_protocol_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class CompactProtocolWriter {
size_t write(OffsetIndex const&);
size_t write(SizeStatistics const&);
size_t write(ColumnOrder const&);
size_t write(PageEncodingStats const&);
size_t write(SortingColumn const&);

protected:
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -2944,6 +2944,9 @@ __device__ uint8_t* EncodeStatistics(uint8_t* start,
auto const [min_ptr, min_size] =
get_extremum(&s->min_value, dtype, scratch, true, NO_TRUNC_STATS);
encoder.field_binary(6, min_ptr, min_size);
// cudf min/max statistics are always exact (i.e. not truncated)
encoder.field_bool(7, true);
encoder.field_bool(8, true);
}
encoder.end(&end);
return end;
Expand Down
50 changes: 39 additions & 11 deletions cpp/src/io/parquet/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,10 @@ struct Statistics {
thrust::optional<std::vector<uint8_t>> max_value;
// min value for column determined by ColumnOrder
thrust::optional<std::vector<uint8_t>> min_value;
// If true, max_value is the actual maximum value for a column
thrust::optional<bool> is_max_value_exact;
// If true, min_value is the actual minimum value for a column
thrust::optional<bool> is_min_value_exact;
};

/**
Expand Down Expand Up @@ -322,6 +326,15 @@ struct ColumnIndex {
thrust::optional<std::vector<int64_t>> definition_level_histogram;
};

/**
* @brief Thrift-derived struct describing page encoding statistics
*/
struct PageEncodingStats {
PageType page_type; // The page type (data/dic/...)
Encoding encoding; // Encoding of the page
int32_t count; // Number of pages of this type with this encoding
};

/**
* @brief Thrift-derived struct describing column sort order
*/
Expand All @@ -335,21 +348,36 @@ struct SortingColumn {
* @brief Thrift-derived struct describing a column chunk
*/
struct ColumnChunkMetaData {
// Type of this column
Type type = BOOLEAN;
// Set of all encodings used for this column. The purpose is to validate
// whether we can decode those pages.
std::vector<Encoding> encodings;
// Path in schema
std::vector<std::string> path_in_schema;
Compression codec = UNCOMPRESSED;
// Compression codec
Compression codec = UNCOMPRESSED;
// Number of values in this column
int64_t num_values = 0;
int64_t total_uncompressed_size =
0; // total byte size of all uncompressed pages in this column chunk (including the headers)
int64_t total_compressed_size =
0; // total byte size of all compressed pages in this column chunk (including the headers)
int64_t data_page_offset = 0; // Byte offset from beginning of file to first data page
int64_t index_page_offset = 0; // Byte offset from beginning of file to root index page
int64_t dictionary_page_offset =
0; // Byte offset from the beginning of file to first (only) dictionary page
Statistics statistics; // Encoded chunk-level statistics
thrust::optional<SizeStatistics> size_statistics; // Size statistics for the chunk
// Total byte size of all uncompressed pages in this column chunk (including the headers)
int64_t total_uncompressed_size = 0;
// Total byte size of all compressed pages in this column chunk (including the headers)
int64_t total_compressed_size = 0;
// Byte offset from beginning of file to first data page
int64_t data_page_offset = 0;
// Byte offset from beginning of file to root index page
int64_t index_page_offset = 0;
// Byte offset from the beginning of file to first (only) dictionary page
int64_t dictionary_page_offset = 0;
// Optional statistics for this column chunk
Statistics statistics;
// Set of all encodings used for pages in this column chunk. This information can be used to
// determine if all data pages are dictionary encoded for example.
thrust::optional<std::vector<PageEncodingStats>> encoding_stats;
// Optional statistics to help estimate total memory when converted to in-memory representations.
// The histograms contained in these statistics can also be useful in some cases for more
// fine-grained nullability/list length filter pushdown.
thrust::optional<SizeStatistics> size_statistics;
};

/**
Expand Down
50 changes: 50 additions & 0 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include "compact_protocol_reader.hpp"
#include "compact_protocol_writer.hpp"
#include "io/comp/nvcomp_adapter.hpp"
#include "io/parquet/parquet.hpp"
#include "io/parquet/parquet_gpu.hpp"
#include "io/statistics/column_statistics.cuh"
#include "io/utilities/column_utils.cuh"
#include "io/utilities/config_utils.hpp"
Expand Down Expand Up @@ -214,6 +216,53 @@ void update_chunk_encodings(std::vector<Encoding>& encodings, uint32_t enc_mask)
}
}

/**
* @brief Update the encoding_stats field in the column chunk metadata.
*
* @param chunk_meta The `ColumnChunkMetaData` struct for the column chunk
* @param ck The column chunk to summarize stats for
* @param is_v2 True if V2 page headers are used
*/
void update_chunk_encoding_stats(ColumnChunkMetaData& chunk_meta,
EncColumnChunk const& ck,
bool is_v2)
{
// don't set encoding stats if there are no pages
if (ck.num_pages == 0) { return; }

// NOTE: since cudf doesn't use mixed encodings for a chunk, we really only need to account
// for the dictionary page (if there is one), and the encoding used for the data pages. We can
// examine the chunk's encodings field to figure out the encodings without having to examine
// the page data.
auto const num_data_pages = static_cast<int32_t>(ck.num_data_pages());
auto const data_page_type = is_v2 ? PageType::DATA_PAGE_V2 : PageType::DATA_PAGE;

std::vector<PageEncodingStats> result;
if (ck.use_dictionary) {
// For dictionary encoding, if V1 then both data and dictionary use PLAIN_DICTIONARY. For V2
// the dictionary uses PLAIN and the data RLE_DICTIONARY.
auto const dict_enc = is_v2 ? Encoding::PLAIN : Encoding::PLAIN_DICTIONARY;
auto const data_enc = is_v2 ? Encoding::RLE_DICTIONARY : Encoding::PLAIN_DICTIONARY;
result.push_back({PageType::DICTIONARY_PAGE, dict_enc, 1});
if (num_data_pages > 0) { result.push_back({data_page_type, data_enc, num_data_pages}); }
} else {
// No dictionary page, the pages are encoded with something other than RLE (unless it's a
// boolean column).
for (auto const enc : chunk_meta.encodings) {
if (enc != Encoding::RLE) {
result.push_back({data_page_type, enc, num_data_pages});
break;
}
}
// if result is empty and we're using V2 headers, then assume the data is RLE as well
if (result.empty() and is_v2 and (ck.encodings & encoding_to_mask(Encoding::RLE)) != 0) {
result.push_back({data_page_type, Encoding::RLE, num_data_pages});
}
}

if (not result.empty()) { chunk_meta.encoding_stats = std::move(result); }
}

/**
* @brief Compute size (in bytes) of the data stored in the given column.
*
Expand Down Expand Up @@ -2144,6 +2193,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
max_write_size = std::max(max_write_size, ck.compressed_size);

update_chunk_encodings(column_chunk_meta.encodings, ck.encodings);
update_chunk_encoding_stats(column_chunk_meta, ck, write_v2_headers);

if (ck.ck_stat_size != 0) {
std::vector<uint8_t> const stats_blob = cudf::detail::make_std_vector_sync(
Expand Down
19 changes: 18 additions & 1 deletion cpp/tests/io/parquet_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,12 @@ TEST_F(ParquetWriterTest, CheckColumnIndexTruncation)
ASSERT_TRUE(stats.min_value.has_value());
ASSERT_TRUE(stats.max_value.has_value());

// check that min and max for the column chunk are exact (i.e. not truncated)
ASSERT_TRUE(stats.is_max_value_exact.has_value());
EXPECT_TRUE(stats.is_max_value_exact.value());
ASSERT_TRUE(stats.is_min_value_exact.has_value());
EXPECT_TRUE(stats.is_min_value_exact.value());

// check trunc(page.min) <= stats.min && trun(page.max) >= stats.max
auto const ptype = fmd.schema[c + 1].type;
auto const ctype = fmd.schema[c + 1].converted_type;
Expand Down Expand Up @@ -1674,7 +1680,18 @@ TEST_F(ParquetWriterTest, UserRequestedEncodings)
// no nulls and no repetition, so the only encoding used should be for the data.
// since we're writing v1, both dict and data pages should use PLAIN_DICTIONARY.
auto const expect_enc = [&fmd](int idx, cudf::io::parquet::detail::Encoding enc) {
EXPECT_EQ(fmd.row_groups[0].columns[idx].meta_data.encodings[0], enc);
auto const& col_meta = fmd.row_groups[0].columns[idx].meta_data;
EXPECT_EQ(col_meta.encodings[0], enc);

// also check encoding stats are written properly
ASSERT_TRUE(col_meta.encoding_stats.has_value());
auto const& enc_stats = col_meta.encoding_stats.value();
for (auto const& ec : enc_stats) {
if (ec.page_type == cudf::io::parquet::detail::PageType::DATA_PAGE) {
EXPECT_EQ(ec.encoding, enc);
EXPECT_EQ(ec.count, 1);
}
}
};

// requested plain
Expand Down
2 changes: 1 addition & 1 deletion python/cudf/cudf/core/column/string.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ def contains(
Returning an Index of booleans using only a literal pattern.
>>> data = ['Mouse', 'dog', 'house and parrot', '23.0', np.NaN]
>>> data = ['Mouse', 'dog', 'house and parrot', '23.0', np.nan]
>>> idx = cudf.Index(data)
>>> idx
Index(['Mouse', 'dog', 'house and parrot', '23.0', None], dtype='object')
Expand Down
12 changes: 6 additions & 6 deletions python/cudf/cudf/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -1077,7 +1077,7 @@ def isna(self):
>>> import cudf
>>> import numpy as np
>>> import pandas as pd
>>> df = cudf.DataFrame({'age': [5, 6, np.NaN],
>>> df = cudf.DataFrame({'age': [5, 6, np.nan],
... 'born': [pd.NaT, pd.Timestamp('1939-05-27'),
... pd.Timestamp('1940-04-25')],
... 'name': ['Alfred', 'Batman', ''],
Expand All @@ -1095,7 +1095,7 @@ def isna(self):
Show which entries in a Series are NA.
>>> ser = cudf.Series([5, 6, np.NaN, np.inf, -np.inf])
>>> ser = cudf.Series([5, 6, np.nan, np.inf, -np.inf])
>>> ser
0 5.0
1 6.0
Expand All @@ -1113,7 +1113,7 @@ def isna(self):
Show which entries in an Index are NA.
>>> idx = cudf.Index([1, 2, None, np.NaN, 0.32, np.inf])
>>> idx = cudf.Index([1, 2, None, np.nan, 0.32, np.inf])
>>> idx
Index([1.0, 2.0, <NA>, <NA>, 0.32, Inf], dtype='float64')
>>> idx.isna()
Expand Down Expand Up @@ -1156,7 +1156,7 @@ def notna(self):
>>> import cudf
>>> import numpy as np
>>> import pandas as pd
>>> df = cudf.DataFrame({'age': [5, 6, np.NaN],
>>> df = cudf.DataFrame({'age': [5, 6, np.nan],
... 'born': [pd.NaT, pd.Timestamp('1939-05-27'),
... pd.Timestamp('1940-04-25')],
... 'name': ['Alfred', 'Batman', ''],
Expand All @@ -1174,7 +1174,7 @@ def notna(self):
Show which entries in a Series are NA.
>>> ser = cudf.Series([5, 6, np.NaN, np.inf, -np.inf])
>>> ser = cudf.Series([5, 6, np.nan, np.inf, -np.inf])
>>> ser
0 5.0
1 6.0
Expand All @@ -1192,7 +1192,7 @@ def notna(self):
Show which entries in an Index are NA.
>>> idx = cudf.Index([1, 2, None, np.NaN, 0.32, np.inf])
>>> idx = cudf.Index([1, 2, None, np.nan, 0.32, np.inf])
>>> idx
Index([1.0, 2.0, <NA>, <NA>, 0.32, Inf], dtype='float64')
>>> idx.notna()
Expand Down
6 changes: 5 additions & 1 deletion python/cudf/cudf/core/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
from cudf.core.single_column_frame import SingleColumnFrame
from cudf.utils.docutils import copy_docstring
from cudf.utils.dtypes import (
_NUMPY_SCTYPES,
_maybe_convert_to_default_type,
find_common_type,
is_mixed_with_object_dtype,
Expand Down Expand Up @@ -344,7 +345,10 @@ def _data(self):
@_cudf_nvtx_annotate
def __contains__(self, item):
if isinstance(item, bool) or not isinstance(
item, tuple(np.sctypes["int"] + np.sctypes["float"] + [int, float])
item,
tuple(
_NUMPY_SCTYPES["int"] + _NUMPY_SCTYPES["float"] + [int, float]
),
):
return False
try:
Expand Down
Loading

0 comments on commit c06d26b

Please sign in to comment.