Skip to content

Commit

Permalink
GH-44010: [C++] Add RecordBatch::MakeStatisticsArray()
Browse files Browse the repository at this point in the history
It's a convenient function that converts `arrow::ArrayStatistics` in a
`arrow::RecordBatch` to `arrow::Array` for the Arrow C data interface.
  • Loading branch information
kou committed Sep 30, 2024
1 parent 6f64af5 commit b194430
Show file tree
Hide file tree
Showing 6 changed files with 651 additions and 1 deletion.
2 changes: 1 addition & 1 deletion cpp/src/arrow/array/array_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ class ARROW_EXPORT Array {
/// This just delegates to calling statistics on the underlying ArrayData
/// object which backs this Array.
///
/// \return const ArrayStatistics&
/// \return std::shared_ptr<ArrayStatistics>
std::shared_ptr<ArrayStatistics> statistics() const { return data_->statistics; }

protected:
Expand Down
21 changes: 21 additions & 0 deletions cpp/src/arrow/array/statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <string>
#include <variant>

#include "arrow/type_fwd.h"
#include "arrow/util/visibility.h"

namespace arrow {
Expand All @@ -34,6 +35,22 @@ namespace arrow {
struct ARROW_EXPORT ArrayStatistics {
using ValueType = std::variant<bool, int64_t, uint64_t, double, std::string>;

static const std::shared_ptr<DataType>& ValueToArrowType(
const std::optional<ValueType>& value) {
if (!value.has_value()) {
return null();
}

struct Visitor {
const std::shared_ptr<DataType>& operator()(const bool&) { return boolean(); }
const std::shared_ptr<DataType>& operator()(const int64_t&) { return int64(); }
const std::shared_ptr<DataType>& operator()(const uint64_t&) { return uint64(); }
const std::shared_ptr<DataType>& operator()(const double&) { return float64(); }
const std::shared_ptr<DataType>& operator()(const std::string&) { return utf8(); }
} visitor;
return std::visit(visitor, value.value());
}

/// \brief The number of null values, may not be set
std::optional<int64_t> null_count = std::nullopt;

Expand All @@ -43,12 +60,16 @@ struct ARROW_EXPORT ArrayStatistics {
/// \brief The minimum value, may not be set
std::optional<ValueType> min = std::nullopt;

const std::shared_ptr<DataType>& MinArrowType() { return ValueToArrowType(min); }

/// \brief Whether the minimum value is exact or not
bool is_min_exact = false;

/// \brief The maximum value, may not be set
std::optional<ValueType> max = std::nullopt;

const std::shared_ptr<DataType>& MaxArrowType() { return ValueToArrowType(max); }

/// \brief Whether the maximum value is exact or not
bool is_max_exact = false;

Expand Down
18 changes: 18 additions & 0 deletions cpp/src/arrow/c/abi.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,24 @@ struct ArrowArray {
void* private_data;
};

# define ARROW_STATISTICS_KEY_AVERAGE_BYTE_WIDTH_EXACT "ARROW:average_byte_width:exact"
# define ARROW_STATISTICS_KEY_AVERAGE_BYTE_WIDTH_APPROXIMATE \
"ARROW:average_byte_width:approximate"
# define ARROW_STATISTICS_KEY_DISTINCT_COUNT_EXACT "ARROW:distinct_count:exact"
# define ARROW_STATISTICS_KEY_DISTINCT_COUNT_APPROXIMATE \
"ARROW:distinct_count:approximate"
# define ARROW_STATISTICS_KEY_MAX_BYTE_WIDTH_EXACT "ARROW:max_byte_width:exact"
# define ARROW_STATISTICS_KEY_MAX_BYTE_WIDTH_APPROXIMATE \
"ARROW:max_byte_width:approximate"
# define ARROW_STATISTICS_KEY_MAX_VALUE_EXACT "ARROW:max_value:exact"
# define ARROW_STATISTICS_KEY_MAX_VALUE_APPROXIMATE "ARROW:max_value:approximate"
# define ARROW_STATISTICS_KEY_MIN_VALUE_EXACT "ARROW:min_value:exact"
# define ARROW_STATISTICS_KEY_MIN_VALUE_APPROXIMATE "ARROW:min_value:approximate"
# define ARROW_STATISTICS_KEY_NULL_COUNT_EXACT "ARROW:null_count:exact"
# define ARROW_STATISTICS_KEY_NULL_COUNT_APPROXIMATE "ARROW:null_count:approximate"
# define ARROW_STATISTICS_KEY_ROW_COUNT_EXACT "ARROW:row_count:exact"
# define ARROW_STATISTICS_KEY_ROW_COUNT_APPROXIMATE "ARROW:row_count:approximate"

#endif // ARROW_C_DATA_INTERFACE

#ifndef ARROW_C_DEVICE_DATA_INTERFACE
Expand Down
163 changes: 163 additions & 0 deletions cpp/src/arrow/record_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,13 @@
#include <utility>

#include "arrow/array.h"
#include "arrow/array/builder_binary.h"
#include "arrow/array/builder_dict.h"
#include "arrow/array/builder_nested.h"
#include "arrow/array/builder_union.h"
#include "arrow/array/concatenate.h"
#include "arrow/array/validate.h"
#include "arrow/c/abi.h"
#include "arrow/pretty_print.h"
#include "arrow/status.h"
#include "arrow/table.h"
Expand Down Expand Up @@ -465,6 +470,164 @@ Result<std::shared_ptr<RecordBatch>> RecordBatch::ViewOrCopyTo(
return Make(schema_, num_rows(), std::move(copied_columns));
}

Result<std::shared_ptr<Array>> RecordBatch::MakeStatisticsArray(
MemoryPool* memory_pool) const {
auto enumerate_statistics =
[&](std::function<Status(int nth_statistics, bool start_new_column,
std::optional<int32_t> nth_column, const char* key,
const std::shared_ptr<DataType>& type,
const ArrayStatistics::ValueType& value)>
yield) {
int nth_statistics = 0;
RETURN_NOT_OK(yield(nth_statistics++, true, std::nullopt,
ARROW_STATISTICS_KEY_ROW_COUNT_EXACT, int64(),
ArrayStatistics::ValueType{num_rows_}));

int num_fields = schema_->num_fields();
for (int nth_column = 0; nth_column < num_fields; ++nth_column) {
auto statistics = column(nth_column)->statistics();
if (!statistics) {
continue;
}

bool start_new_column = true;
if (statistics->null_count.has_value()) {
RETURN_NOT_OK(yield(
nth_statistics++, start_new_column, std::optional<int32_t>(nth_column),
ARROW_STATISTICS_KEY_NULL_COUNT_EXACT, int64(),
ArrayStatistics::ValueType{statistics->null_count.value()}));
start_new_column = false;
}

if (statistics->distinct_count.has_value()) {
RETURN_NOT_OK(yield(
nth_statistics++, start_new_column, std::optional<int32_t>(nth_column),
ARROW_STATISTICS_KEY_DISTINCT_COUNT_EXACT, int64(),
ArrayStatistics::ValueType{statistics->distinct_count.value()}));
start_new_column = false;
}

if (statistics->min.has_value()) {
if (statistics->is_min_exact) {
RETURN_NOT_OK(yield(nth_statistics++, start_new_column,
std::optional<int32_t>(nth_column),
ARROW_STATISTICS_KEY_MIN_VALUE_EXACT,
statistics->MinArrowType(), statistics->min.value()));
} else {
RETURN_NOT_OK(yield(nth_statistics++, start_new_column,
std::optional<int32_t>(nth_column),
ARROW_STATISTICS_KEY_MIN_VALUE_APPROXIMATE,
statistics->MinArrowType(), statistics->min.value()));
}
start_new_column = false;
}

if (statistics->max.has_value()) {
if (statistics->is_max_exact) {
RETURN_NOT_OK(yield(nth_statistics++, start_new_column,
std::optional<int32_t>(nth_column),
ARROW_STATISTICS_KEY_MAX_VALUE_EXACT,
statistics->MaxArrowType(), statistics->max.value()));
} else {
RETURN_NOT_OK(yield(nth_statistics++, start_new_column,
std::optional<int32_t>(nth_column),
ARROW_STATISTICS_KEY_MAX_VALUE_APPROXIMATE,
statistics->MaxArrowType(), statistics->max.value()));
}
start_new_column = false;
}
}
return Status::OK();
};

std::vector<std::shared_ptr<Field>> values_types;
std::vector<int8_t> values_type_indexes;
RETURN_NOT_OK(enumerate_statistics(
[&](int nth_statistics, bool start_new_column, std::optional<int32_t> nth_column,
const char* key, const std::shared_ptr<DataType>& type,
const ArrayStatistics::ValueType& value) {
int8_t i = 0;
for (const auto& field : values_types) {
if (field->type()->id() == type->id()) {
break;
}
i++;
}
if (i == static_cast<int8_t>(values_types.size())) {
values_types.push_back(field(type->name(), type));
}
values_type_indexes.push_back(i);
return Status::OK();
}));

auto keys_type = dictionary(int32(), utf8(), false);
auto values_type = dense_union(values_types);
auto statistics_type =
struct_({field("column", int32()),
field("statistics", map(keys_type, values_type, false))});

std::vector<std::shared_ptr<ArrayBuilder>> field_builders;
auto columns_builder = std::make_shared<Int32Builder>(memory_pool);
field_builders.push_back(std::static_pointer_cast<ArrayBuilder>(columns_builder));
auto keys_builder = std::make_shared<StringDictionary32Builder>();
std::vector<std::shared_ptr<ArrayBuilder>> values_builders;
for (const auto& values_type : values_types) {
std::unique_ptr<ArrayBuilder> values_builder;
RETURN_NOT_OK(MakeBuilder(memory_pool, values_type->type(), &values_builder));
values_builders.push_back(std::shared_ptr<ArrayBuilder>(std::move(values_builder)));
}
auto items_builder = std::make_shared<DenseUnionBuilder>(
memory_pool, std::move(values_builders), values_type);
auto values_builder = std::make_shared<MapBuilder>(
memory_pool, std::static_pointer_cast<ArrayBuilder>(keys_builder),
std::static_pointer_cast<ArrayBuilder>(items_builder));
field_builders.push_back(std::static_pointer_cast<ArrayBuilder>(values_builder));
StructBuilder builder(statistics_type, memory_pool, std::move(field_builders));

RETURN_NOT_OK(enumerate_statistics(
[&](int nth_statistics, bool start_new_column, std::optional<int32_t> nth_column,
const char* key, const std::shared_ptr<DataType>& type,
const ArrayStatistics::ValueType& value) {
if (start_new_column) {
RETURN_NOT_OK(builder.Append());
if (nth_column.has_value()) {
RETURN_NOT_OK(columns_builder->Append(nth_column.value()));
} else {
RETURN_NOT_OK(columns_builder->AppendNull());
}
RETURN_NOT_OK(values_builder->Append());
}
RETURN_NOT_OK(keys_builder->Append(key, strlen(key)));
const auto values_type_index = values_type_indexes[nth_statistics];
RETURN_NOT_OK(items_builder->Append(values_type_index));
struct Visitor {
ArrayBuilder* builder;

Status operator()(const bool& value) {
return static_cast<BooleanBuilder*>(builder)->Append(value);
}
Status operator()(const int64_t& value) {
return static_cast<Int64Builder*>(builder)->Append(value);
}
Status operator()(const uint64_t& value) {
return static_cast<UInt64Builder*>(builder)->Append(value);
}
Status operator()(const double& value) {
return static_cast<DoubleBuilder*>(builder)->Append(value);
}
Status operator()(const std::string& value) {
return static_cast<StringBuilder*>(builder)->Append(value.data(),
value.size());
}
} visitor;
visitor.builder = values_builders[values_type_index].get();
RETURN_NOT_OK(std::visit(visitor, value));
return Status::OK();
}));

return builder.Finish();
}

Status RecordBatch::Validate() const {
return ValidateBatch(*this, /*full_validation=*/false);
}
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/arrow/record_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,18 @@ class ARROW_EXPORT RecordBatch {

virtual DeviceAllocationType device_type() const = 0;

/// \brief Create a statistics array of this record batch
///
/// The created array follows the C data interface statistics
/// specification. See
/// https://arrow.apache.org/docs/format/CDataInterfaceStatistics.html
/// for details.
///
/// \param[in] pool the memory pool to allocate memory from
/// \return the statistics array of this record batch
Result<std::shared_ptr<Array>> MakeStatisticsArray(
MemoryPool* pool = default_memory_pool()) const;

protected:
RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows);

Expand Down
Loading

0 comments on commit b194430

Please sign in to comment.