From cd8830b43fa16d6017ff36282ebeb705a83eb709 Mon Sep 17 00:00:00 2001 From: Nic Crane Date: Mon, 14 Aug 2023 14:45:11 +0100 Subject: [PATCH] GH-36990: [R] Expose Parquet ReaderProperties (#36992) ### Rationale for this change Expose the ReaderProperties class in R so that the thrift size settings can be altered. ### What changes are included in this PR? Add R6 class, link it up to the C++ class, use it when reading Parquet files. ### Are these changes tested? Yes ### Are there any user-facing changes? Nope * Closes: #36990 Authored-by: Nic Crane Signed-off-by: Nic Crane --- r/NAMESPACE | 1 + r/R/arrowExports.R | 28 ++++++-- r/R/dataset-format.R | 16 ++++- r/R/parquet.R | 48 +++++++++++++- r/_pkgdown.yml | 1 + r/man/FragmentScanOptions.Rd | 7 ++ r/man/ParquetFileReader.Rd | 1 + r/man/ParquetReaderProperties.Rd | 27 ++++++++ r/src/arrowExports.cpp | 106 ++++++++++++++++++++++++++++--- r/src/arrow_types.h | 1 + r/src/dataset.cpp | 7 +- r/src/parquet.cpp | 34 +++++++++- r/tests/testthat/test-dataset.R | 14 ++++ r/tests/testthat/test-parquet.R | 40 ++++++++++++ 14 files changed, 311 insertions(+), 20 deletions(-) create mode 100644 r/man/ParquetReaderProperties.Rd diff --git a/r/NAMESPACE b/r/NAMESPACE index 7eaa51bc5771f..f4799176425d7 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -246,6 +246,7 @@ export(ParquetFileFormat) export(ParquetFileReader) export(ParquetFileWriter) export(ParquetFragmentScanOptions) +export(ParquetReaderProperties) export(ParquetVersionType) export(ParquetWriterProperties) export(Partitioning) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 10732100cd58a..f4ff3ef894532 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -748,8 +748,8 @@ dataset___JsonFragmentScanOptions__Make <- function(parse_options, read_options) .Call(`_arrow_dataset___JsonFragmentScanOptions__Make`, parse_options, read_options) } -dataset___ParquetFragmentScanOptions__Make <- function(use_buffered_stream, buffer_size, pre_buffer) { - .Call(`_arrow_dataset___ParquetFragmentScanOptions__Make`, use_buffered_stream, buffer_size, pre_buffer) +dataset___ParquetFragmentScanOptions__Make <- function(use_buffered_stream, buffer_size, pre_buffer, thrift_string_size_limit, thrift_container_size_limit) { + .Call(`_arrow_dataset___ParquetFragmentScanOptions__Make`, use_buffered_stream, buffer_size, pre_buffer, thrift_string_size_limit, thrift_container_size_limit) } dataset___DirectoryPartitioning <- function(schm, segment_encoding) { @@ -1592,6 +1592,26 @@ parquet___arrow___ArrowReaderProperties__Make <- function(use_threads) { .Call(`_arrow_parquet___arrow___ArrowReaderProperties__Make`, use_threads) } +parquet___arrow___ReaderProperties__Make <- function() { + .Call(`_arrow_parquet___arrow___ReaderProperties__Make`) +} + +parquet___arrow___ReaderProperties__get_thrift_string_size_limit <- function(properties) { + .Call(`_arrow_parquet___arrow___ReaderProperties__get_thrift_string_size_limit`, properties) +} + +parquet___arrow___ReaderProperties__set_thrift_string_size_limit <- function(properties, size) { + invisible(.Call(`_arrow_parquet___arrow___ReaderProperties__set_thrift_string_size_limit`, properties, size)) +} + +parquet___arrow___ReaderProperties__get_thrift_container_size_limit <- function(properties) { + .Call(`_arrow_parquet___arrow___ReaderProperties__get_thrift_container_size_limit`, properties) +} + +parquet___arrow___ReaderProperties__set_thrift_container_size_limit <- function(properties, size) { + invisible(.Call(`_arrow_parquet___arrow___ReaderProperties__set_thrift_container_size_limit`, properties, size)) +} + parquet___arrow___ArrowReaderProperties__set_use_threads <- function(properties, use_threads) { invisible(.Call(`_arrow_parquet___arrow___ArrowReaderProperties__set_use_threads`, properties, use_threads)) } @@ -1616,8 +1636,8 @@ parquet___arrow___ArrowReaderProperties__get_coerce_int96_timestamp_unit <- func .Call(`_arrow_parquet___arrow___ArrowReaderProperties__get_coerce_int96_timestamp_unit`, properties) } -parquet___arrow___FileReader__OpenFile <- function(file, props) { - .Call(`_arrow_parquet___arrow___FileReader__OpenFile`, file, props) +parquet___arrow___FileReader__OpenFile <- function(file, props, reader_props) { + .Call(`_arrow_parquet___arrow___FileReader__OpenFile`, file, props, reader_props) } parquet___arrow___FileReader__ReadTable1 <- function(reader) { diff --git a/r/R/dataset-format.R b/r/R/dataset-format.R index 8798e0248e0f6..9c7e332f5ea2c 100644 --- a/r/R/dataset-format.R +++ b/r/R/dataset-format.R @@ -518,6 +518,13 @@ csv_file_format_read_opts <- function(schema = NULL, ...) { #' * `buffer_size`: Size of buffered stream, if enabled. Default is 8KB. #' * `pre_buffer`: Pre-buffer the raw Parquet data. This can improve performance #' on high-latency filesystems. Disabled by default. +#' * `thrift_string_size_limit`: Maximum string size allocated for decoding thrift +#' strings. May need to be increased in order to read +#' files with especially large headers. Default value +#' 100000000. +#' * `thrift_container_size_limit`: Maximum size of thrift containers. May need to be +#' increased in order to read files with especially large +#' headers. Default value 1000000. # #' `format = "text"`: see [CsvConvertOptions]. Note that options can only be #' specified with the Arrow C++ library naming. Also, "block_size" from @@ -571,8 +578,13 @@ CsvFragmentScanOptions$create <- function(..., ParquetFragmentScanOptions <- R6Class("ParquetFragmentScanOptions", inherit = FragmentScanOptions) ParquetFragmentScanOptions$create <- function(use_buffered_stream = FALSE, buffer_size = 8196, - pre_buffer = TRUE) { - dataset___ParquetFragmentScanOptions__Make(use_buffered_stream, buffer_size, pre_buffer) + pre_buffer = TRUE, + thrift_string_size_limit = 100000000, + thrift_container_size_limit = 1000000) { + dataset___ParquetFragmentScanOptions__Make( + use_buffered_stream, buffer_size, pre_buffer, thrift_string_size_limit, + thrift_container_size_limit + ) } #' @usage NULL diff --git a/r/R/parquet.R b/r/R/parquet.R index db224a41e4019..a58f7810b6ec7 100644 --- a/r/R/parquet.R +++ b/r/R/parquet.R @@ -457,6 +457,7 @@ ParquetFileWriter$create <- function(schema, #' (e.g. `RandomAccessFile`). #' - `props` Optional [ParquetArrowReaderProperties] #' - `mmap` Logical: whether to memory-map the file (default `TRUE`) +#' - `reader_props` Optional [ParquetReaderProperties] #' - `...` Additional arguments, currently ignored #' #' @section Methods: @@ -541,12 +542,13 @@ ParquetFileReader <- R6Class("ParquetFileReader", ParquetFileReader$create <- function(file, props = ParquetArrowReaderProperties$create(), mmap = TRUE, + reader_props = ParquetReaderProperties$create(), ...) { file <- make_readable_file(file, mmap) assert_is(props, "ParquetArrowReaderProperties") assert_is(file, "RandomAccessFile") - parquet___arrow___FileReader__OpenFile(file, props) + parquet___arrow___FileReader__OpenFile(file, props, reader_props) } #' @title ParquetArrowReaderProperties class @@ -625,3 +627,47 @@ calculate_chunk_size <- function(rows, columns, chunk_size } + +#' @title ParquetReaderProperties class +#' @rdname ParquetReaderProperties +#' @name ParquetReaderProperties +#' @docType class +#' @usage NULL +#' @format NULL +#' @description This class holds settings to control how a Parquet file is read +#' by [ParquetFileReader]. +#' +#' @section Factory: +#' +#' The `ParquetReaderProperties$create()` factory method instantiates the object +#' and takes no arguments. +#' +#' @section Methods: +#' +#' - `$thrift_string_size_limit()` +#' - `$set_thrift_string_size_limit()` +#' - `$thrift_container_size_limit()` +#' - `$set_thrift_container_size_limit()` +#' +#' @export +ParquetReaderProperties <- R6Class("ParquetReaderProperties", + inherit = ArrowObject, + public = list( + thrift_string_size_limit = function() { + parquet___arrow___ReaderProperties__get_thrift_string_size_limit(self) + }, + set_thrift_string_size_limit = function(size) { + parquet___arrow___ReaderProperties__set_thrift_string_size_limit(self, size) + }, + thrift_container_size_limit = function() { + parquet___arrow___ReaderProperties__get_thrift_container_size_limit(self) + }, + set_thrift_container_size_limit = function(size) { + parquet___arrow___ReaderProperties__set_thrift_container_size_limit(self, size) + } + ) +) + +ParquetReaderProperties$create <- function() { + parquet___arrow___ReaderProperties__Make() +} diff --git a/r/_pkgdown.yml b/r/_pkgdown.yml index 9facce9d1b28b..10323a4796602 100644 --- a/r/_pkgdown.yml +++ b/r/_pkgdown.yml @@ -278,6 +278,7 @@ reference: - title: File read/writer interface contents: - ParquetFileReader + - ParquetReaderProperties - ParquetArrowReaderProperties - ParquetFileWriter - ParquetWriterProperties diff --git a/r/man/FragmentScanOptions.Rd b/r/man/FragmentScanOptions.Rd index 79bb3ea3c3158..7b597c2f017be 100644 --- a/r/man/FragmentScanOptions.Rd +++ b/r/man/FragmentScanOptions.Rd @@ -29,6 +29,13 @@ to reduce memory overhead. Disabled by default. \item \code{buffer_size}: Size of buffered stream, if enabled. Default is 8KB. \item \code{pre_buffer}: Pre-buffer the raw Parquet data. This can improve performance on high-latency filesystems. Disabled by default. +\item \code{thrift_string_size_limit}: Maximum string size allocated for decoding thrift +strings. May need to be increased in order to read +files with especially large headers. Default value +100000000. +\item \code{thrift_container_size_limit}: Maximum size of thrift containers. May need to be +increased in order to read files with especially large +headers. Default value 1000000. \code{format = "text"}: see \link{CsvConvertOptions}. Note that options can only be specified with the Arrow C++ library naming. Also, "block_size" from \link{CsvReadOptions} may be given. diff --git a/r/man/ParquetFileReader.Rd b/r/man/ParquetFileReader.Rd index 30d0725a4984f..59ec0d9a3bc55 100644 --- a/r/man/ParquetFileReader.Rd +++ b/r/man/ParquetFileReader.Rd @@ -17,6 +17,7 @@ takes the following arguments: (e.g. \code{RandomAccessFile}). \item \code{props} Optional \link{ParquetArrowReaderProperties} \item \code{mmap} Logical: whether to memory-map the file (default \code{TRUE}) +\item \code{reader_props} Optional \link{ParquetReaderProperties} \item \code{...} Additional arguments, currently ignored } } diff --git a/r/man/ParquetReaderProperties.Rd b/r/man/ParquetReaderProperties.Rd new file mode 100644 index 0000000000000..1779fffb14805 --- /dev/null +++ b/r/man/ParquetReaderProperties.Rd @@ -0,0 +1,27 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/parquet.R +\docType{class} +\name{ParquetReaderProperties} +\alias{ParquetReaderProperties} +\title{ParquetReaderProperties class} +\description{ +This class holds settings to control how a Parquet file is read +by \link{ParquetFileReader}. +} +\section{Factory}{ + + +The \code{ParquetReaderProperties$create()} factory method instantiates the object +and takes no arguments. +} + +\section{Methods}{ + +\itemize{ +\item \verb{$thrift_string_size_limit()} +\item \verb{$set_thrift_string_size_limit()} +\item \verb{$thrift_container_size_limit()} +\item \verb{$set_thrift_container_size_limit()} +} +} + diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 1d617b252e284..790207efce1d2 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -2024,17 +2024,19 @@ extern "C" SEXP _arrow_dataset___JsonFragmentScanOptions__Make(SEXP parse_option // dataset.cpp #if defined(ARROW_R_WITH_DATASET) -std::shared_ptr dataset___ParquetFragmentScanOptions__Make(bool use_buffered_stream, int64_t buffer_size, bool pre_buffer); -extern "C" SEXP _arrow_dataset___ParquetFragmentScanOptions__Make(SEXP use_buffered_stream_sexp, SEXP buffer_size_sexp, SEXP pre_buffer_sexp){ +std::shared_ptr dataset___ParquetFragmentScanOptions__Make(bool use_buffered_stream, int64_t buffer_size, bool pre_buffer, int64_t thrift_string_size_limit, int64_t thrift_container_size_limit); +extern "C" SEXP _arrow_dataset___ParquetFragmentScanOptions__Make(SEXP use_buffered_stream_sexp, SEXP buffer_size_sexp, SEXP pre_buffer_sexp, SEXP thrift_string_size_limit_sexp, SEXP thrift_container_size_limit_sexp){ BEGIN_CPP11 arrow::r::Input::type use_buffered_stream(use_buffered_stream_sexp); arrow::r::Input::type buffer_size(buffer_size_sexp); arrow::r::Input::type pre_buffer(pre_buffer_sexp); - return cpp11::as_sexp(dataset___ParquetFragmentScanOptions__Make(use_buffered_stream, buffer_size, pre_buffer)); + arrow::r::Input::type thrift_string_size_limit(thrift_string_size_limit_sexp); + arrow::r::Input::type thrift_container_size_limit(thrift_container_size_limit_sexp); + return cpp11::as_sexp(dataset___ParquetFragmentScanOptions__Make(use_buffered_stream, buffer_size, pre_buffer, thrift_string_size_limit, thrift_container_size_limit)); END_CPP11 } #else -extern "C" SEXP _arrow_dataset___ParquetFragmentScanOptions__Make(SEXP use_buffered_stream_sexp, SEXP buffer_size_sexp, SEXP pre_buffer_sexp){ +extern "C" SEXP _arrow_dataset___ParquetFragmentScanOptions__Make(SEXP use_buffered_stream_sexp, SEXP buffer_size_sexp, SEXP pre_buffer_sexp, SEXP thrift_string_size_limit_sexp, SEXP thrift_container_size_limit_sexp){ Rf_error("Cannot call dataset___ParquetFragmentScanOptions__Make(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif @@ -4062,6 +4064,84 @@ extern "C" SEXP _arrow_parquet___arrow___ArrowReaderProperties__Make(SEXP use_th } #endif +// parquet.cpp +#if defined(ARROW_R_WITH_PARQUET) +std::shared_ptr parquet___arrow___ReaderProperties__Make(); +extern "C" SEXP _arrow_parquet___arrow___ReaderProperties__Make(){ +BEGIN_CPP11 + return cpp11::as_sexp(parquet___arrow___ReaderProperties__Make()); +END_CPP11 +} +#else +extern "C" SEXP _arrow_parquet___arrow___ReaderProperties__Make(){ + Rf_error("Cannot call parquet___arrow___ReaderProperties__Make(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + +// parquet.cpp +#if defined(ARROW_R_WITH_PARQUET) +int parquet___arrow___ReaderProperties__get_thrift_string_size_limit(const std::shared_ptr& properties); +extern "C" SEXP _arrow_parquet___arrow___ReaderProperties__get_thrift_string_size_limit(SEXP properties_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type properties(properties_sexp); + return cpp11::as_sexp(parquet___arrow___ReaderProperties__get_thrift_string_size_limit(properties)); +END_CPP11 +} +#else +extern "C" SEXP _arrow_parquet___arrow___ReaderProperties__get_thrift_string_size_limit(SEXP properties_sexp){ + Rf_error("Cannot call parquet___arrow___ReaderProperties__get_thrift_string_size_limit(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + +// parquet.cpp +#if defined(ARROW_R_WITH_PARQUET) +void parquet___arrow___ReaderProperties__set_thrift_string_size_limit(const std::shared_ptr& properties, int size); +extern "C" SEXP _arrow_parquet___arrow___ReaderProperties__set_thrift_string_size_limit(SEXP properties_sexp, SEXP size_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type properties(properties_sexp); + arrow::r::Input::type size(size_sexp); + parquet___arrow___ReaderProperties__set_thrift_string_size_limit(properties, size); + return R_NilValue; +END_CPP11 +} +#else +extern "C" SEXP _arrow_parquet___arrow___ReaderProperties__set_thrift_string_size_limit(SEXP properties_sexp, SEXP size_sexp){ + Rf_error("Cannot call parquet___arrow___ReaderProperties__set_thrift_string_size_limit(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + +// parquet.cpp +#if defined(ARROW_R_WITH_PARQUET) +int parquet___arrow___ReaderProperties__get_thrift_container_size_limit(const std::shared_ptr& properties); +extern "C" SEXP _arrow_parquet___arrow___ReaderProperties__get_thrift_container_size_limit(SEXP properties_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type properties(properties_sexp); + return cpp11::as_sexp(parquet___arrow___ReaderProperties__get_thrift_container_size_limit(properties)); +END_CPP11 +} +#else +extern "C" SEXP _arrow_parquet___arrow___ReaderProperties__get_thrift_container_size_limit(SEXP properties_sexp){ + Rf_error("Cannot call parquet___arrow___ReaderProperties__get_thrift_container_size_limit(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + +// parquet.cpp +#if defined(ARROW_R_WITH_PARQUET) +void parquet___arrow___ReaderProperties__set_thrift_container_size_limit(const std::shared_ptr& properties, int size); +extern "C" SEXP _arrow_parquet___arrow___ReaderProperties__set_thrift_container_size_limit(SEXP properties_sexp, SEXP size_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type properties(properties_sexp); + arrow::r::Input::type size(size_sexp); + parquet___arrow___ReaderProperties__set_thrift_container_size_limit(properties, size); + return R_NilValue; +END_CPP11 +} +#else +extern "C" SEXP _arrow_parquet___arrow___ReaderProperties__set_thrift_container_size_limit(SEXP properties_sexp, SEXP size_sexp){ + Rf_error("Cannot call parquet___arrow___ReaderProperties__set_thrift_container_size_limit(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + // parquet.cpp #if defined(ARROW_R_WITH_PARQUET) void parquet___arrow___ArrowReaderProperties__set_use_threads(const std::shared_ptr& properties, bool use_threads); @@ -4163,16 +4243,17 @@ extern "C" SEXP _arrow_parquet___arrow___ArrowReaderProperties__get_coerce_int96 // parquet.cpp #if defined(ARROW_R_WITH_PARQUET) -std::shared_ptr parquet___arrow___FileReader__OpenFile(const std::shared_ptr& file, const std::shared_ptr& props); -extern "C" SEXP _arrow_parquet___arrow___FileReader__OpenFile(SEXP file_sexp, SEXP props_sexp){ +std::shared_ptr parquet___arrow___FileReader__OpenFile(const std::shared_ptr& file, const std::shared_ptr& props, const std::shared_ptr& reader_props); +extern "C" SEXP _arrow_parquet___arrow___FileReader__OpenFile(SEXP file_sexp, SEXP props_sexp, SEXP reader_props_sexp){ BEGIN_CPP11 arrow::r::Input&>::type file(file_sexp); arrow::r::Input&>::type props(props_sexp); - return cpp11::as_sexp(parquet___arrow___FileReader__OpenFile(file, props)); + arrow::r::Input&>::type reader_props(reader_props_sexp); + return cpp11::as_sexp(parquet___arrow___FileReader__OpenFile(file, props, reader_props)); END_CPP11 } #else -extern "C" SEXP _arrow_parquet___arrow___FileReader__OpenFile(SEXP file_sexp, SEXP props_sexp){ +extern "C" SEXP _arrow_parquet___arrow___FileReader__OpenFile(SEXP file_sexp, SEXP props_sexp, SEXP reader_props_sexp){ Rf_error("Cannot call parquet___arrow___FileReader__OpenFile(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif @@ -5774,7 +5855,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___FragmentScanOptions__type_name", (DL_FUNC) &_arrow_dataset___FragmentScanOptions__type_name, 1}, { "_arrow_dataset___CsvFragmentScanOptions__Make", (DL_FUNC) &_arrow_dataset___CsvFragmentScanOptions__Make, 2}, { "_arrow_dataset___JsonFragmentScanOptions__Make", (DL_FUNC) &_arrow_dataset___JsonFragmentScanOptions__Make, 2}, - { "_arrow_dataset___ParquetFragmentScanOptions__Make", (DL_FUNC) &_arrow_dataset___ParquetFragmentScanOptions__Make, 3}, + { "_arrow_dataset___ParquetFragmentScanOptions__Make", (DL_FUNC) &_arrow_dataset___ParquetFragmentScanOptions__Make, 5}, { "_arrow_dataset___DirectoryPartitioning", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning, 2}, { "_arrow_dataset___DirectoryPartitioning__MakeFactory", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning__MakeFactory, 2}, { "_arrow_dataset___HivePartitioning", (DL_FUNC) &_arrow_dataset___HivePartitioning, 3}, @@ -5985,13 +6066,18 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ipc___MessageReader__ReadNextMessage", (DL_FUNC) &_arrow_ipc___MessageReader__ReadNextMessage, 1}, { "_arrow_ipc___ReadMessage", (DL_FUNC) &_arrow_ipc___ReadMessage, 1}, { "_arrow_parquet___arrow___ArrowReaderProperties__Make", (DL_FUNC) &_arrow_parquet___arrow___ArrowReaderProperties__Make, 1}, + { "_arrow_parquet___arrow___ReaderProperties__Make", (DL_FUNC) &_arrow_parquet___arrow___ReaderProperties__Make, 0}, + { "_arrow_parquet___arrow___ReaderProperties__get_thrift_string_size_limit", (DL_FUNC) &_arrow_parquet___arrow___ReaderProperties__get_thrift_string_size_limit, 1}, + { "_arrow_parquet___arrow___ReaderProperties__set_thrift_string_size_limit", (DL_FUNC) &_arrow_parquet___arrow___ReaderProperties__set_thrift_string_size_limit, 2}, + { "_arrow_parquet___arrow___ReaderProperties__get_thrift_container_size_limit", (DL_FUNC) &_arrow_parquet___arrow___ReaderProperties__get_thrift_container_size_limit, 1}, + { "_arrow_parquet___arrow___ReaderProperties__set_thrift_container_size_limit", (DL_FUNC) &_arrow_parquet___arrow___ReaderProperties__set_thrift_container_size_limit, 2}, { "_arrow_parquet___arrow___ArrowReaderProperties__set_use_threads", (DL_FUNC) &_arrow_parquet___arrow___ArrowReaderProperties__set_use_threads, 2}, { "_arrow_parquet___arrow___ArrowReaderProperties__get_use_threads", (DL_FUNC) &_arrow_parquet___arrow___ArrowReaderProperties__get_use_threads, 2}, { "_arrow_parquet___arrow___ArrowReaderProperties__get_read_dictionary", (DL_FUNC) &_arrow_parquet___arrow___ArrowReaderProperties__get_read_dictionary, 2}, { "_arrow_parquet___arrow___ArrowReaderProperties__set_read_dictionary", (DL_FUNC) &_arrow_parquet___arrow___ArrowReaderProperties__set_read_dictionary, 3}, { "_arrow_parquet___arrow___ArrowReaderProperties__set_coerce_int96_timestamp_unit", (DL_FUNC) &_arrow_parquet___arrow___ArrowReaderProperties__set_coerce_int96_timestamp_unit, 2}, { "_arrow_parquet___arrow___ArrowReaderProperties__get_coerce_int96_timestamp_unit", (DL_FUNC) &_arrow_parquet___arrow___ArrowReaderProperties__get_coerce_int96_timestamp_unit, 1}, - { "_arrow_parquet___arrow___FileReader__OpenFile", (DL_FUNC) &_arrow_parquet___arrow___FileReader__OpenFile, 2}, + { "_arrow_parquet___arrow___FileReader__OpenFile", (DL_FUNC) &_arrow_parquet___arrow___FileReader__OpenFile, 3}, { "_arrow_parquet___arrow___FileReader__ReadTable1", (DL_FUNC) &_arrow_parquet___arrow___FileReader__ReadTable1, 1}, { "_arrow_parquet___arrow___FileReader__ReadTable2", (DL_FUNC) &_arrow_parquet___arrow___FileReader__ReadTable2, 2}, { "_arrow_parquet___arrow___FileReader__ReadRowGroup1", (DL_FUNC) &_arrow_parquet___arrow___FileReader__ReadRowGroup1, 2}, diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index 5f82275fe9c08..fadc39c75fc06 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -270,6 +270,7 @@ R6_CLASS_NAME(arrow::csv::WriteOptions, "CsvWriteOptions"); #if defined(ARROW_R_WITH_PARQUET) R6_CLASS_NAME(parquet::ArrowReaderProperties, "ParquetArrowReaderProperties"); +R6_CLASS_NAME(parquet::ReaderProperties, "ParquetReaderProperties"); R6_CLASS_NAME(parquet::ArrowWriterProperties, "ParquetArrowWriterProperties"); R6_CLASS_NAME(parquet::WriterProperties, "ParquetWriterProperties"); R6_CLASS_NAME(parquet::arrow::FileReader, "ParquetFileReader"); diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index 3a4958e4cb955..83c430fb634d3 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -342,7 +342,9 @@ std::shared_ptr dataset___JsonFragmentScanOptions__ // [[dataset::export]] std::shared_ptr dataset___ParquetFragmentScanOptions__Make(bool use_buffered_stream, int64_t buffer_size, - bool pre_buffer) { + bool pre_buffer, + int64_t thrift_string_size_limit, + int64_t thrift_container_size_limit) { auto options = std::make_shared(); if (use_buffered_stream) { options->reader_properties->enable_buffered_stream(); @@ -355,6 +357,9 @@ dataset___ParquetFragmentScanOptions__Make(bool use_buffered_stream, int64_t buf options->arrow_reader_properties->set_cache_options( arrow::io::CacheOptions::LazyDefaults()); } + options->reader_properties->set_thrift_string_size_limit(thrift_string_size_limit); + options->reader_properties->set_thrift_container_size_limit( + thrift_container_size_limit); return options; } diff --git a/r/src/parquet.cpp b/r/src/parquet.cpp index 3a263f6527db8..f9f7b0447c17b 100644 --- a/r/src/parquet.cpp +++ b/r/src/parquet.cpp @@ -44,6 +44,35 @@ parquet___arrow___ArrowReaderProperties__Make(bool use_threads) { return std::make_shared(use_threads); } +// [[parquet::export]] +std::shared_ptr parquet___arrow___ReaderProperties__Make() { + return std::make_shared(); +} + +// [[parquet::export]] +int parquet___arrow___ReaderProperties__get_thrift_string_size_limit( + const std::shared_ptr& properties) { + return properties->thrift_string_size_limit(); +} + +// [[parquet::export]] +void parquet___arrow___ReaderProperties__set_thrift_string_size_limit( + const std::shared_ptr& properties, int size) { + properties->set_thrift_string_size_limit(size); +} + +// [[parquet::export]] +int parquet___arrow___ReaderProperties__get_thrift_container_size_limit( + const std::shared_ptr& properties) { + return properties->thrift_container_size_limit(); +} + +// [[parquet::export]] +void parquet___arrow___ReaderProperties__set_thrift_container_size_limit( + const std::shared_ptr& properties, int size) { + properties->set_thrift_container_size_limit(size); +} + // [[parquet::export]] void parquet___arrow___ArrowReaderProperties__set_use_threads( const std::shared_ptr& properties, bool use_threads) { @@ -86,10 +115,11 @@ parquet___arrow___ArrowReaderProperties__get_coerce_int96_timestamp_unit( // [[parquet::export]] std::shared_ptr parquet___arrow___FileReader__OpenFile( const std::shared_ptr& file, - const std::shared_ptr& props) { + const std::shared_ptr& props, + const std::shared_ptr& reader_props) { std::unique_ptr reader; parquet::arrow::FileReaderBuilder builder; - PARQUET_THROW_NOT_OK(builder.Open(file)); + PARQUET_THROW_NOT_OK(builder.Open(file, *reader_props)); PARQUET_THROW_NOT_OK( builder.memory_pool(gc_memory_pool())->properties(*props)->Build(&reader)); return std::move(reader); diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index b9972901a706e..cbeb081d0bae6 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -1516,3 +1516,17 @@ test_that("can add in augmented fields", { list.files(hive_dir, full.names = TRUE, recursive = TRUE) ) }) + +test_that("can set thrift size string and container limits for datasets", { + expect_r6_class(open_dataset(dataset_dir, thrift_string_size_limit = 1000000), "FileSystemDataset") + expect_error( + open_dataset(dataset_dir, thrift_string_size_limit = 1), + "TProtocolException: Exceeded size limit" + ) + + expect_r6_class(open_dataset(dataset_dir, thrift_container_size_limit = 1000000), "FileSystemDataset") + expect_error( + open_dataset(dataset_dir, thrift_container_size_limit = 1), + "TProtocolException: Exceeded size limit" + ) +}) diff --git a/r/tests/testthat/test-parquet.R b/r/tests/testthat/test-parquet.R index 7c5d9ebef6593..dbc4a0473561f 100644 --- a/r/tests/testthat/test-parquet.R +++ b/r/tests/testthat/test-parquet.R @@ -484,3 +484,43 @@ test_that("Can read Parquet files from a URL", { expect_true(tibble::is_tibble(pu)) expect_identical(dim(pu), c(10L, 11L)) }) + +test_that("thrift string and container size can be specified when reading Parquet files", { + + tf <- tempfile() + on.exit(unlink(tf)) + table <- arrow_table(example_data) + write_parquet(table, tf) + file <- make_readable_file(tf) + on.exit(file$close()) + + # thrift string size + reader_props <- ParquetReaderProperties$create() + reader_props$set_thrift_string_size_limit(1) + expect_identical(reader_props$thrift_string_size_limit(), 1L) + + # We get an error if we set the Thrift string size limit too small + expect_error(ParquetFileReader$create(file, reader_props = reader_props), "TProtocolException: Exceeded size limit") + + # Increase the size and we can read successfully + reader_props$set_thrift_string_size_limit(10000) + reader <- ParquetFileReader$create(file, reader_props = reader_props) + data <- reader$ReadTable() + expect_identical(collect.ArrowTabular(data), example_data) + + # thrift container size + reader_props_container <- ParquetReaderProperties$create() + reader_props_container$set_thrift_container_size_limit(1) + expect_identical(reader_props_container$thrift_container_size_limit(), 1L) + + expect_error( + ParquetFileReader$create(file, reader_props = reader_props_container), + "TProtocolException: Exceeded size limit" + ) + + reader_props_container$set_thrift_container_size_limit(100) + + reader_container <- ParquetFileReader$create(file, reader_props = reader_props_container) + data <- reader_container$ReadTable() + expect_identical(collect.ArrowTabular(data), example_data) +})