Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-44338: [Python][Parquet] Read encrypted parquet datasets via _metadata #44339

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "arrow/util/checked_cast.h"
#include "arrow/util/future.h"
#include "arrow/util/iterator.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "arrow/util/range.h"
#include "arrow/util/tracing_internal.h"
Expand All @@ -42,6 +43,7 @@
#include "parquet/arrow/writer.h"
#include "parquet/encryption/crypto_factory.h"
#include "parquet/encryption/encryption.h"
#include "parquet/encryption/internal_file_decryptor.h"
#include "parquet/encryption/kms_client.h"
#include "parquet/file_reader.h"
#include "parquet/properties.h"
Expand Down Expand Up @@ -1080,9 +1082,18 @@ Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(

std::vector<std::pair<std::string, std::vector<int>>> paths_with_row_group_ids;
std::unordered_map<std::string, int> paths_to_index;
const bool metadata_only_file = metadata_source.path() == "_metadata" &&
metadata->key_value_metadata();

for (int i = 0; i < metadata->num_row_groups(); i++) {
auto row_group = metadata->RowGroup(i);
std::unique_ptr<parquet::RowGroupMetaData> row_group;

if (metadata_only_file) {
PARQUET_ASSIGN_OR_THROW(auto aad,
metadata->key_value_metadata()->Get("row_group_aad_" + std::to_string(i)));
metadata->set_file_decryptor_aad(aad);
}
row_group = metadata->RowGroup(i);
ARROW_ASSIGN_OR_RAISE(auto path,
FileFromRowGroup(filesystem.get(), base_path, *row_group,
options.validate_column_chunk_paths));
Expand Down
234 changes: 233 additions & 1 deletion cpp/src/arrow/dataset/file_parquet_encryption_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@
#include "arrow/table.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/type.h"
#include "arrow/util/key_value_metadata.h"
#include "parquet/arrow/reader.h"
#include "parquet/arrow/schema.h"
#include "parquet/encryption/crypto_factory.h"
#include "parquet/encryption/encryption_internal.h"
#include "parquet/encryption/kms_client.h"
#include "parquet/encryption/test_in_memory_kms.h"
#include "parquet/file_writer.h"

constexpr std::string_view kFooterKeyMasterKey = "0123456789012345";
constexpr std::string_view kFooterKeyMasterKeyId = "footer_key";
Expand All @@ -51,6 +53,62 @@ using arrow::internal::checked_pointer_cast;
namespace arrow {
namespace dataset {

class DatasetTestBase : public ::testing::Test {
public:
void SetUp() override {
// Creates a mock file system using the current time point.
EXPECT_OK_AND_ASSIGN(file_system_, fs::internal::MockFileSystem::Make(
std::chrono::system_clock::now(), {}));
ASSERT_OK(file_system_->CreateDir(std::string(kBaseDir)));

// Init dataset and partitioning.
ASSERT_NO_FATAL_FAILURE(PrepareTableAndPartitioning());

auto file_format = std::make_shared<ParquetFileFormat>();
auto parquet_file_write_options =
checked_pointer_cast<ParquetFileWriteOptions>(file_format->DefaultWriteOptions());

// Write dataset.
auto dataset = std::make_shared<InMemoryDataset>(table_);
EXPECT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
EXPECT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());

FileSystemDatasetWriteOptions write_options;
write_options.file_write_options = parquet_file_write_options;
write_options.filesystem = file_system_;
write_options.base_dir = kBaseDir;
write_options.partitioning = partitioning_;
write_options.basename_template = "part{i}.parquet";
ASSERT_OK(FileSystemDataset::Write(write_options, std::move(scanner)));
}

void PrepareTableAndPartitioning() {
// Prepare table data.
auto table_schema = schema({field("a", int64()), field("c", int64()),
field("e", int64()), field("part", utf8())});
table_ = TableFromJSON(table_schema, {R"([
[ 0, 9, 1, "a" ],
[ 1, 8, 2, "a" ],
[ 2, 7, 1, "a" ],
[ 3, 6, 2, "a" ],
[ 4, 5, 1, "a" ],
[ 5, 4, 2, "a" ],
[ 6, 3, 1, "b" ],
[ 7, 2, 2, "b" ],
[ 8, 1, 1, "b" ],
[ 9, 0, 2, "b" ]
])"});

// Use a Hive-style partitioning scheme.
partitioning_ = std::make_shared<HivePartitioning>(schema({field("part", utf8())}));
}

protected:
std::shared_ptr<fs::FileSystem> file_system_;
std::shared_ptr<Table> table_;
std::shared_ptr<Partitioning> partitioning_;
};

// Base class to test writing and reading encrypted dataset.
class DatasetEncryptionTestBase : public ::testing::Test {
public:
Expand Down Expand Up @@ -240,6 +298,180 @@ TEST_F(DatasetEncryptionTest, ReadSingleFile) {
ASSERT_EQ(checked_pointer_cast<Int64Array>(table->column(2)->chunk(0))->GetView(0), 1);
}

Result<std::vector<std::shared_ptr<parquet::FileMetaData>>> ReadMetadata(
const std::vector<std::string>& paths, std::shared_ptr<fs::FileSystem>& file_system,
const parquet::ReaderProperties& reader_properties) {
if (paths.empty()) {
return Status::Invalid("No files to read metadata from");
}

std::vector<std::shared_ptr<parquet::FileMetaData>> metadata;

for (const auto& path : paths) {
PARQUET_ASSIGN_OR_THROW(auto input, file_system->OpenInputFile(path));
auto parquet_reader = parquet::ParquetFileReader::Open(input, reader_properties);
auto file_metadata = parquet_reader->metadata();
file_metadata->set_file_path(path);
metadata.push_back(file_metadata);
}
return metadata;
}

// Write encrypted _metadata file and read the dataset using the metadata.
TEST_F(DatasetEncryptionTest, ReadDatasetFromEncryptedMetadata) {
// Create decryption properties.
auto decryption_config =
std::make_shared<parquet::encryption::DecryptionConfiguration>();
auto parquet_decryption_config = std::make_shared<ParquetDecryptionConfig>();
parquet_decryption_config->crypto_factory = crypto_factory_;
parquet_decryption_config->kms_connection_config = kms_connection_config_;
parquet_decryption_config->decryption_config = std::move(decryption_config);

// Set scan options.
auto parquet_scan_options = std::make_shared<ParquetFragmentScanOptions>();
parquet_scan_options->parquet_decryption_config = std::move(parquet_decryption_config);

auto file_format = std::make_shared<ParquetFileFormat>();
file_format->default_fragment_scan_options = std::move(parquet_scan_options);

auto reader_properties = parquet::default_reader_properties();
decryption_config = std::make_shared<parquet::encryption::DecryptionConfiguration>();
reader_properties.file_decryption_properties(
crypto_factory_->GetFileDecryptionProperties(*kms_connection_config_,
*decryption_config));
auto encryption_config = std::make_shared<parquet::encryption::EncryptionConfiguration>(
std::string(kFooterKeyName));
encryption_config->column_keys = kColumnKeyMapping;

auto parquet_encryption_config = std::make_shared<ParquetEncryptionConfig>();
// Directly assign shared_ptr objects to ParquetEncryptionConfig members
parquet_encryption_config->crypto_factory = crypto_factory_;
parquet_encryption_config->kms_connection_config = kms_connection_config_;
parquet_encryption_config->encryption_config = encryption_config;

auto parquet_file_write_options =
checked_pointer_cast<ParquetFileWriteOptions>(file_format->DefaultWriteOptions());
parquet_file_write_options->parquet_encryption_config =
std::move(parquet_encryption_config);

std::vector<std::string> paths = {"part=a/part0.parquet", "part=c/part0.parquet",
"part=e/part0.parquet", "part=g/part0.parquet",
"part=i/part0.parquet"};

auto file_encryption_properties = crypto_factory_->GetFileEncryptionProperties(
*kms_connection_config_, *encryption_config);

auto table_schema =
schema({field("a", int64()), field("c", int64()), field("e", int64())});
std::shared_ptr<parquet::SchemaDescriptor> schema;
auto writer_props = parquet::WriterProperties::Builder().build();

PARQUET_ASSIGN_OR_THROW(auto metadata_list,
ReadMetadata(paths, file_system_, reader_properties));
PARQUET_ASSIGN_OR_THROW(auto metadata,
parquet::FileMetaData::CoalesceMetadata(metadata_list, writer_props));

// TODO: Make sure plaintext footer mode works
// encryption_config->plaintext_footer = true;
file_encryption_properties = crypto_factory_->GetFileEncryptionProperties(
*kms_connection_config_, *encryption_config);

// Write metadata to _metadata file.
std::string metadata_path = "_metadata";
ASSERT_OK_AND_ASSIGN(auto stream, file_system_->OpenOutputStream(metadata_path));
WriteEncryptedMetadataFile(*metadata, stream, file_encryption_properties);
ARROW_EXPECT_OK(stream->Close());

// Set scan options.
decryption_config = std::make_shared<parquet::encryption::DecryptionConfiguration>();
parquet_decryption_config = std::make_shared<ParquetDecryptionConfig>();
parquet_decryption_config->crypto_factory = crypto_factory_;
parquet_decryption_config->kms_connection_config = kms_connection_config_;
parquet_decryption_config->decryption_config = std::move(decryption_config);

parquet_scan_options = std::make_shared<ParquetFragmentScanOptions>();
parquet_scan_options->parquet_decryption_config = std::move(parquet_decryption_config);

file_format = std::make_shared<ParquetFileFormat>();
file_format->default_fragment_scan_options = std::move(parquet_scan_options);

ParquetFactoryOptions parquet_factory_options;
parquet_factory_options.partitioning = partitioning_;
parquet_factory_options.partition_base_dir = kBaseDir;

ASSERT_OK_AND_ASSIGN(auto input, file_system_->OpenInputFile(metadata_path));
auto parquet_reader = parquet::ParquetFileReader::Open(input, reader_properties);
auto file_metadata = parquet_reader->metadata();

ASSERT_TRUE(file_metadata->Equals(*metadata));

// Create parquet dataset factory
ASSERT_OK_AND_ASSIGN(auto parquet_dataset_factory,
ParquetDatasetFactory::Make(metadata_path, file_system_,
file_format, parquet_factory_options));

// Create the dataset
ASSERT_OK_AND_ASSIGN(auto dataset, parquet_dataset_factory->Finish());

// Read dataset into table
ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
ASSERT_OK_AND_ASSIGN(auto read_table, scanner->ToTable());

// Verify the data was read correctly
ASSERT_OK_AND_ASSIGN(auto combined_table, read_table->CombineChunks());

// Validate the table
ASSERT_OK(combined_table->ValidateFull());
AssertTablesEqual(*table_, *combined_table);
}

// Write _metadata file and read the dataset using the metadata.
TEST_F(DatasetTestBase, ReadDatasetFromMetadata) {
auto reader_properties = parquet::default_reader_properties();

std::vector<std::string> paths = {"part=a/part0.parquet", "part=b/part0.parquet"};
std::vector<std::shared_ptr<parquet::FileMetaData>> metadata;

for (const auto& path : paths) {
ASSERT_OK_AND_ASSIGN(auto input, file_system_->OpenInputFile(path));
auto parquet_reader = parquet::ParquetFileReader::Open(input, reader_properties);
auto file_metadata = parquet_reader->metadata();
// Make sure file_paths are stored in metadata
file_metadata->set_file_path(path);
metadata.push_back(file_metadata);
}
metadata[0]->AppendRowGroups(*metadata[1]);

std::string metadata_path = "_metadata";
ASSERT_OK_AND_ASSIGN(auto stream, file_system_->OpenOutputStream(metadata_path));
WriteMetaDataFile(*metadata[0], stream.get());
ARROW_EXPECT_OK(stream->Close());

auto file_format = std::make_shared<ParquetFileFormat>();
ParquetFactoryOptions factory_options;
factory_options.partitioning = partitioning_;
factory_options.partition_base_dir = kBaseDir;
ASSERT_OK_AND_ASSIGN(auto dataset_factory,
ParquetDatasetFactory::Make(metadata_path, file_system_,
file_format, factory_options));

// Create the dataset
ASSERT_OK_AND_ASSIGN(auto dataset, dataset_factory->Finish());

// Read dataset into table
ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
ASSERT_OK_AND_ASSIGN(auto read_table, scanner->ToTable());

// Verify the data was read correctly
ASSERT_OK_AND_ASSIGN(auto combined_table, read_table->CombineChunks());

// Validate the table
ASSERT_OK(combined_table->ValidateFull());
AssertTablesEqual(*combined_table, *table_);
}

// GH-39444: This test covers the case where parquet dataset scanner crashes when
// processing encrypted datasets over 2^15 rows in multi-threaded mode.
class LargeRowEncryptionTest : public DatasetEncryptionTestBase {
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/parquet/arrow/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,14 @@ Status WriteMetaDataFile(const FileMetaData& file_metadata,
return Status::OK();
}

Status WriteEncryptedMetadataFile(
const FileMetaData& file_metadata, std::shared_ptr<::arrow::io::OutputStream> sink,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties) {
PARQUET_CATCH_NOT_OK(::parquet::WriteEncryptedMetadataFile(file_metadata, sink,
file_encryption_properties));
return Status::OK();
}

Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool,
std::shared_ptr<::arrow::io::OutputStream> sink, int64_t chunk_size,
std::shared_ptr<WriterProperties> properties,
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/parquet/arrow/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ PARQUET_EXPORT
::arrow::Status WriteMetaDataFile(const FileMetaData& file_metadata,
::arrow::io::OutputStream* sink);

/// \brief Write encrypted metadata-only Parquet file to indicated Arrow OutputStream
PARQUET_EXPORT
::arrow::Status WriteEncryptedMetadataFile(
const FileMetaData& file_metadata, std::shared_ptr<::arrow::io::OutputStream> sink,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties);

/// \brief Write a Table to Parquet.
///
/// This writes one table in a single shot. To write a Parquet file with
Expand Down
40 changes: 40 additions & 0 deletions cpp/src/parquet/file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,46 @@ void WriteEncryptedFileMetadata(const FileMetaData& file_metadata,
}
}

void WriteEncryptedMetadataFile(
const FileMetaData& metadata, std::shared_ptr<::arrow::io::OutputStream> sink,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties) {
auto file_encryptor = std::make_unique<InternalFileEncryptor>(
file_encryption_properties.get(), ::arrow::default_memory_pool());

if (file_encryption_properties->encrypted_footer()) {
PARQUET_THROW_NOT_OK(sink->Write(kParquetEMagic, 4));

PARQUET_ASSIGN_OR_THROW(int64_t position, sink->Tell());
auto metadata_start = static_cast<uint64_t>(position);

auto writer_props = parquet::WriterProperties::Builder()
.encryption(file_encryption_properties)
->build();
auto builder = FileMetaDataBuilder::Make(metadata.schema(), writer_props);

auto footer_metadata = builder->Finish(metadata.key_value_metadata());
auto crypto_metadata = builder->GetCryptoMetaData();

WriteFileCryptoMetaData(*crypto_metadata, sink.get());

auto footer_encryptor = file_encryptor->GetFooterEncryptor();
WriteEncryptedFileMetadata(metadata, sink.get(), footer_encryptor, true);

PARQUET_ASSIGN_OR_THROW(position, sink->Tell());
auto footer_and_crypto_len = static_cast<uint32_t>(position - metadata_start);
PARQUET_THROW_NOT_OK(
sink->Write(reinterpret_cast<uint8_t*>(&footer_and_crypto_len), 4));
PARQUET_THROW_NOT_OK(sink->Write(kParquetEMagic, 4));
} else {
// Encrypted file with plaintext footer mode.
PARQUET_THROW_NOT_OK(sink->Write(kParquetMagic, 4));
auto footer_signing_encryptor = file_encryptor->GetFooterSigningEncryptor();
WriteEncryptedFileMetadata(metadata, sink.get(), footer_signing_encryptor, false);
}

file_encryptor->WipeOutEncryptionKeys();
}

void WriteFileCryptoMetaData(const FileCryptoMetaData& crypto_metadata,
ArrowOutputStream* sink) {
crypto_metadata.WriteTo(sink);
Expand Down
8 changes: 7 additions & 1 deletion cpp/src/parquet/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ PARQUET_EXPORT
void WriteMetaDataFile(const FileMetaData& file_metadata,
::arrow::io::OutputStream* sink);

PARQUET_EXPORT
void WriteEncryptedMetadataFile(
const FileMetaData& file_metadata, std::shared_ptr<::arrow::io::OutputStream> sink,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties);

PARQUET_EXPORT
void WriteEncryptedFileMetadata(const FileMetaData& file_metadata,
ArrowOutputStream* sink,
Expand All @@ -125,9 +130,10 @@ void WriteEncryptedFileMetadata(const FileMetaData& file_metadata,

PARQUET_EXPORT
void WriteEncryptedFileMetadata(const FileMetaData& file_metadata,
::arrow::io::OutputStream* sink,
std::shared_ptr<::arrow::io::OutputStream> sink,
const std::shared_ptr<Encryptor>& encryptor = NULLPTR,
bool encrypt_footer = false);

PARQUET_EXPORT
void WriteFileCryptoMetaData(const FileCryptoMetaData& crypto_metadata,
::arrow::io::OutputStream* sink);
Expand Down
Loading
Loading