Skip to content

Commit

Permalink
ARROW-6180: [C++][Parquet] Add RandomAccessFile::GetStream that retur…
Browse files Browse the repository at this point in the history
…ns InputStream that reads a file segment independent of the file's state, fix concurrent buffered Parquet column reads

This enables different functions to read portions of a `RandomAccessFile` as an InputStream without interfering with each other.

This also addresses PARQUET-1636 and adds a unit test for buffered column chunk reads. In the refactor to use the Arrow IO interfaces, I broke this by allowing the raw RandomAccessFile to be passed into multiple `BufferedInputStream` at once, so the file position was being manipulated by different column readers. We didn't catch the problem because we didn't have any unit tests, so this patch addresses that deficiency.

Closes #5085 from wesm/ARROW-6180 and squashes the following commits:

e4ad370 <Wes McKinney> Code review comments
2645bec <Wes McKinney> Add unit test that exhibits PARQUET-1636
76dc71c <Wes McKinney> stub
3eb0136 <Wes McKinney> Finish basic unit tests
4fd3d61 <Wes McKinney> Start implementation

Authored-by: Wes McKinney <[email protected]>
Signed-off-by: Wes McKinney <[email protected]>
  • Loading branch information
wesm committed Aug 15, 2019
1 parent 4e51f98 commit 2c808a2
Show file tree
Hide file tree
Showing 7 changed files with 262 additions and 19 deletions.
66 changes: 66 additions & 0 deletions cpp/src/arrow/io/interfaces.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@

#include "arrow/io/interfaces.h"

#include <algorithm>
#include <cstdint>
#include <memory>
#include <mutex>
#include <utility>

#include "arrow/buffer.h"
#include "arrow/status.h"
#include "arrow/util/logging.h"
#include "arrow/util/string_view.h"

namespace arrow {
Expand Down Expand Up @@ -70,5 +74,67 @@ Status Writable::Write(const std::string& data) {

Status Writable::Flush() { return Status::OK(); }

class FileSegmentReader : public InputStream {
public:
FileSegmentReader(std::shared_ptr<RandomAccessFile> file, int64_t file_offset,
int64_t nbytes)
: file_(std::move(file)),
closed_(false),
position_(0),
file_offset_(file_offset),
nbytes_(nbytes) {
FileInterface::set_mode(FileMode::READ);
}

Status CheckOpen() const {
if (closed_) {
return Status::IOError("Stream is closed");
}
return Status::OK();
}

Status Close() override {
closed_ = true;
return Status::OK();
}

Status Tell(int64_t* position) const override {
RETURN_NOT_OK(CheckOpen());
*position = position_;
return Status::OK();
}

bool closed() const override { return closed_; }

Status Read(int64_t nbytes, int64_t* bytes_read, void* out) override {
RETURN_NOT_OK(CheckOpen());
int64_t bytes_to_read = std::min(nbytes, nbytes_ - position_);
RETURN_NOT_OK(
file_->ReadAt(file_offset_ + position_, bytes_to_read, bytes_read, out));
position_ += *bytes_read;
return Status::OK();
}

Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override {
RETURN_NOT_OK(CheckOpen());
int64_t bytes_to_read = std::min(nbytes, nbytes_ - position_);
RETURN_NOT_OK(file_->ReadAt(file_offset_ + position_, bytes_to_read, out));
position_ += (*out)->size();
return Status::OK();
}

private:
std::shared_ptr<RandomAccessFile> file_;
bool closed_;
int64_t position_;
int64_t file_offset_;
int64_t nbytes_;
};

std::shared_ptr<InputStream> RandomAccessFile::GetStream(
std::shared_ptr<RandomAccessFile> file, int64_t file_offset, int64_t nbytes) {
return std::make_shared<FileSegmentReader>(std::move(file), file_offset, nbytes);
}

} // namespace io
} // namespace arrow
10 changes: 10 additions & 0 deletions cpp/src/arrow/io/interfaces.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,16 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable {
/// Necessary because we hold a std::unique_ptr
~RandomAccessFile() override;

/// \brief Create an isolated InputStream that reads a segment of a
/// RandomAccessFile. Multiple such stream can be created and used
/// independently without interference
/// \param[in] file a file instance
/// \param[in] file_offset the starting position in the file
/// \param[in] nbytes the extent of bytes to read. The file should have
/// sufficient bytes available
static std::shared_ptr<InputStream> GetStream(std::shared_ptr<RandomAccessFile> file,
int64_t file_offset, int64_t nbytes);

virtual Status GetSize(int64_t* size) = 0;

/// \brief Read nbytes at position, provide default implementations using
Expand Down
67 changes: 67 additions & 0 deletions cpp/src/arrow/io/memory-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,73 @@ TEST(TestBufferReader, RetainParentReference) {
ASSERT_EQ(0, std::memcmp(slice2->data(), data.c_str() + 4, 6));
}

TEST(TestRandomAccessFile, GetStream) {
std::string data = "data1data2data3data4data5";

auto buf = std::make_shared<Buffer>(data);
auto file = std::make_shared<BufferReader>(buf);

std::shared_ptr<InputStream> stream1, stream2;

stream1 = RandomAccessFile::GetStream(file, 0, 10);
stream2 = RandomAccessFile::GetStream(file, 9, 16);

int64_t position = -1;
ASSERT_OK(stream1->Tell(&position));
ASSERT_EQ(0, position);

std::shared_ptr<Buffer> buf2;
uint8_t buf3[20];

int64_t bytes_read = -1;
ASSERT_OK(stream2->Read(4, &bytes_read, buf3));
ASSERT_EQ(4, bytes_read);
ASSERT_EQ(0, std::memcmp(buf3, "2dat", 4));
ASSERT_OK(stream2->Tell(&position));
ASSERT_EQ(4, position);

ASSERT_OK(stream1->Read(6, &bytes_read, buf3));
ASSERT_EQ(6, bytes_read);
ASSERT_EQ(0, std::memcmp(buf3, "data1d", 6));
ASSERT_OK(stream1->Tell(&position));
ASSERT_EQ(6, position);

ASSERT_OK(stream1->Read(2, &buf2));
ASSERT_TRUE(SliceBuffer(buf, 6, 2)->Equals(*buf2));

// Read to end of each stream
ASSERT_OK(stream1->Read(4, &bytes_read, buf3));
ASSERT_EQ(2, bytes_read);
ASSERT_EQ(0, std::memcmp(buf3, "a2", 2));
ASSERT_OK(stream1->Tell(&position));
ASSERT_EQ(10, position);

ASSERT_OK(stream1->Read(1, &bytes_read, buf3));
ASSERT_EQ(0, bytes_read);
ASSERT_OK(stream1->Tell(&position));
ASSERT_EQ(10, position);

// stream2 had its extent limited
ASSERT_OK(stream2->Read(20, &buf2));
ASSERT_TRUE(SliceBuffer(buf, 13, 12)->Equals(*buf2));

ASSERT_OK(stream2->Read(1, &buf2));
ASSERT_EQ(0, buf2->size());
ASSERT_OK(stream2->Tell(&position));
ASSERT_EQ(16, position);

ASSERT_OK(stream1->Close());

// idempotent
ASSERT_OK(stream1->Close());
ASSERT_TRUE(stream1->closed());

// Check whether closed
ASSERT_RAISES(IOError, stream1->Tell(&position));
ASSERT_RAISES(IOError, stream1->Read(1, &buf2));
ASSERT_RAISES(IOError, stream1->Read(1, &bytes_read, buf3));
}

TEST(TestMemcopy, ParallelMemcopy) {
#if defined(ARROW_VALGRIND)
// Compensate for Valgrind's slowness
Expand Down
33 changes: 17 additions & 16 deletions cpp/src/arrow/testing/random.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ARROW_EXPORT RandomArrayGenerator {
///
/// \return a generated Array
std::shared_ptr<arrow::Array> Boolean(int64_t size, double probability,
double null_probability);
double null_probability = 0);

/// \brief Generates a random UInt8Array
///
Expand All @@ -61,7 +61,7 @@ class ARROW_EXPORT RandomArrayGenerator {
///
/// \return a generated Array
std::shared_ptr<arrow::Array> UInt8(int64_t size, uint8_t min, uint8_t max,
double null_probability);
double null_probability = 0);

/// \brief Generates a random Int8Array
///
Expand All @@ -72,7 +72,7 @@ class ARROW_EXPORT RandomArrayGenerator {
///
/// \return a generated Array
std::shared_ptr<arrow::Array> Int8(int64_t size, int8_t min, int8_t max,
double null_probability);
double null_probability = 0);

/// \brief Generates a random UInt16Array
///
Expand All @@ -83,7 +83,7 @@ class ARROW_EXPORT RandomArrayGenerator {
///
/// \return a generated Array
std::shared_ptr<arrow::Array> UInt16(int64_t size, uint16_t min, uint16_t max,
double null_probability);
double null_probability = 0);

/// \brief Generates a random Int16Array
///
Expand All @@ -94,7 +94,7 @@ class ARROW_EXPORT RandomArrayGenerator {
///
/// \return a generated Array
std::shared_ptr<arrow::Array> Int16(int64_t size, int16_t min, int16_t max,
double null_probability);
double null_probability = 0);

/// \brief Generates a random UInt32Array
///
Expand All @@ -105,7 +105,7 @@ class ARROW_EXPORT RandomArrayGenerator {
///
/// \return a generated Array
std::shared_ptr<arrow::Array> UInt32(int64_t size, uint32_t min, uint32_t max,
double null_probability);
double null_probability = 0);

/// \brief Generates a random Int32Array
///
Expand All @@ -116,7 +116,7 @@ class ARROW_EXPORT RandomArrayGenerator {
///
/// \return a generated Array
std::shared_ptr<arrow::Array> Int32(int64_t size, int32_t min, int32_t max,
double null_probability);
double null_probability = 0);

/// \brief Generates a random UInt64Array
///
Expand All @@ -127,7 +127,7 @@ class ARROW_EXPORT RandomArrayGenerator {
///
/// \return a generated Array
std::shared_ptr<arrow::Array> UInt64(int64_t size, uint64_t min, uint64_t max,
double null_probability);
double null_probability = 0);

/// \brief Generates a random Int64Array
///
Expand All @@ -138,7 +138,7 @@ class ARROW_EXPORT RandomArrayGenerator {
///
/// \return a generated Array
std::shared_ptr<arrow::Array> Int64(int64_t size, int64_t min, int64_t max,
double null_probability);
double null_probability = 0);

/// \brief Generates a random FloatArray
///
Expand All @@ -149,7 +149,7 @@ class ARROW_EXPORT RandomArrayGenerator {
///
/// \return a generated Array
std::shared_ptr<arrow::Array> Float32(int64_t size, float min, float max,
double null_probability);
double null_probability = 0);

/// \brief Generates a random DoubleArray
///
Expand All @@ -160,11 +160,11 @@ class ARROW_EXPORT RandomArrayGenerator {
///
/// \return a generated Array
std::shared_ptr<arrow::Array> Float64(int64_t size, double min, double max,
double null_probability);
double null_probability = 0);

template <typename ArrowType, typename CType = typename ArrowType::c_type>
std::shared_ptr<arrow::Array> Numeric(int64_t size, CType min, CType max,
double null_probability) {
double null_probability = 0) {
switch (ArrowType::type_id) {
case Type::UINT8:
return UInt8(size, static_cast<uint8_t>(min), static_cast<uint8_t>(max),
Expand Down Expand Up @@ -212,7 +212,7 @@ class ARROW_EXPORT RandomArrayGenerator {
///
/// \return a generated Array
std::shared_ptr<arrow::Array> String(int64_t size, int32_t min_length,
int32_t max_length, double null_probability);
int32_t max_length, double null_probability = 0);

/// \brief Generates a random LargeStringArray
///
Expand All @@ -225,7 +225,8 @@ class ARROW_EXPORT RandomArrayGenerator {
///
/// \return a generated Array
std::shared_ptr<arrow::Array> LargeString(int64_t size, int32_t min_length,
int32_t max_length, double null_probability);
int32_t max_length,
double null_probability = 0);

/// \brief Generates a random StringArray with repeated values
///
Expand All @@ -241,12 +242,12 @@ class ARROW_EXPORT RandomArrayGenerator {
/// \return a generated Array
std::shared_ptr<arrow::Array> StringWithRepeats(int64_t size, int64_t unique,
int32_t min_length, int32_t max_length,
double null_probability);
double null_probability = 0);

/// \brief Like StringWithRepeats but return BinaryArray
std::shared_ptr<arrow::Array> BinaryWithRepeats(int64_t size, int64_t unique,
int32_t min_length, int32_t max_length,
double null_probability);
double null_probability = 0);

SeedType seed() { return seed_distribution_(seed_rng_); }

Expand Down
7 changes: 5 additions & 2 deletions cpp/src/parquet/properties.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@ namespace parquet {
std::shared_ptr<ArrowInputStream> ReaderProperties::GetStream(
std::shared_ptr<ArrowInputFile> source, int64_t start, int64_t num_bytes) {
if (buffered_stream_enabled_) {
// ARROW-6180 / PARQUET-1636 Create isolated reader that references segment
// of source
std::shared_ptr<::arrow::io::InputStream> safe_stream =
::arrow::io::RandomAccessFile::GetStream(source, start, num_bytes);
std::shared_ptr<::arrow::io::BufferedInputStream> stream;
PARQUET_THROW_NOT_OK(source->Seek(start));
PARQUET_THROW_NOT_OK(::arrow::io::BufferedInputStream::Create(
buffer_size_, pool_, source, &stream, num_bytes));
buffer_size_, pool_, safe_stream, &stream, num_bytes));
return std::move(stream);
} else {
std::shared_ptr<Buffer> data;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ struct ParquetVersion {
enum type { PARQUET_1_0, PARQUET_2_0 };
};

static int64_t DEFAULT_BUFFER_SIZE = 0;
static int64_t DEFAULT_BUFFER_SIZE = 1024;
static bool DEFAULT_USE_BUFFERED_STREAM = false;

class PARQUET_EXPORT ReaderProperties {
Expand Down
Loading

0 comments on commit 2c808a2

Please sign in to comment.