Skip to content

Commit

Permalink
move code into parquet API
Browse files Browse the repository at this point in the history
  • Loading branch information
alkis committed Jul 19, 2024
1 parent a2ba60f commit aadb316
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 143 deletions.
63 changes: 63 additions & 0 deletions cpp/src/parquet/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <cinttypes>
#include <memory>
#include <ostream>
#include <random>
#include <sstream>
#include <string>
#include <string_view>
#include <utility>
Expand All @@ -29,6 +31,7 @@
#include "arrow/io/memory.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "arrow/util/pcg_random.h"
#include "parquet/encryption/encryption_internal.h"
#include "parquet/encryption/internal_file_decryptor.h"
#include "parquet/exception.h"
Expand Down Expand Up @@ -599,6 +602,47 @@ std::vector<SortingColumn> RowGroupMetaData::sorting_columns() const {
return impl_->sorting_columns();
}

static void Scrub(std::string* s) {
static ::arrow::random::pcg64 rng;
std::uniform_int_distribution<> caps(65, 90);
for (auto& c : *s) c = caps(rng);
}

static void Scrub(format::FileMetaData* md) {
for (auto& s : md->schema) {
Scrub(&s.name);
}
for (auto& r : md->row_groups) {
for (auto& c : r.columns) {
Scrub(&c.file_path);
if (c.__isset.meta_data) {
auto& m = c.meta_data;
for (auto& p : m.path_in_schema) Scrub(&p);
for (auto& kv : m.key_value_metadata) {
Scrub(&kv.key);
Scrub(&kv.value);
}
Scrub(&m.statistics.max_value);
Scrub(&m.statistics.min_value);
Scrub(&m.statistics.min);
Scrub(&m.statistics.max);
}

if (c.crypto_metadata.__isset.ENCRYPTION_WITH_COLUMN_KEY) {
auto& m = c.crypto_metadata.ENCRYPTION_WITH_COLUMN_KEY;
for (auto& p : m.path_in_schema) Scrub(&p);
Scrub(&m.key_metadata);
}
Scrub(&c.encrypted_column_metadata);
}
}
for (auto& kv : md->key_value_metadata) {
Scrub(&kv.key);
Scrub(&kv.value);
}
Scrub(&md->footer_signing_key_metadata);
}

// file metadata
class FileMetaData::FileMetaDataImpl {
public:
Expand Down Expand Up @@ -822,6 +866,21 @@ class FileMetaData::FileMetaDataImpl {
return out;
}

std::string SerializeUnencrypted(bool scrub, bool json) const {
auto md = *metadata_;
if (scrub) Scrub(&md);
if (json) {
std::ostringstream ss;
md.printTo(ss);
return ss.str();
} else {
ThriftSerializer serializer;
std::string out;
serializer.SerializeToString(&md, &out);
return out;
}
}

void set_file_decryptor(std::shared_ptr<InternalFileDecryptor> file_decryptor) {
file_decryptor_ = std::move(file_decryptor);
}
Expand Down Expand Up @@ -993,6 +1052,10 @@ std::shared_ptr<FileMetaData> FileMetaData::Subset(
return impl_->Subset(row_groups);
}

std::string FileMetaData::SerializeUnencrypted(bool scrub, bool json) const {
return impl_->SerializeUnencrypted(scrub, json);
}

void FileMetaData::WriteTo(::arrow::io::OutputStream* dst,
const std::shared_ptr<Encryptor>& encryptor) const {
return impl_->WriteTo(dst, encryptor);
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/parquet/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,13 @@ class PARQUET_EXPORT FileMetaData {
/// FileMetaData.
std::shared_ptr<FileMetaData> Subset(const std::vector<int>& row_groups) const;

/// \brief Serializes metadata unencrypted to a string.
///
/// \param[in] scrub removes sensitive information from the metadata.
/// \param[in] json indicates if the metadata should be serialized as JSON, otherwise
/// thrift.
std::string SerializeUnencrypted(bool scrub, bool json) const;

private:
friend FileMetaDataBuilder;
friend class SerializedFile;
Expand Down
13 changes: 1 addition & 12 deletions cpp/tools/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.

if(PARQUET_BUILD_EXECUTABLES)
set(PARQUET_TOOLS parquet-dump-schema parquet-reader parquet-scan)
set(PARQUET_TOOLS parquet-dump-schema parquet-reader parquet-scan parquet-dump-footer)

foreach(TOOL ${PARQUET_TOOLS})
string(REGEX REPLACE "-" "_" TOOL_SOURCE ${TOOL})
Expand All @@ -32,16 +32,5 @@ if(PARQUET_BUILD_EXECUTABLES)
RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR})
endforeach(TOOL)

# Only build parquet-dump-footer when statically linking.
if(ARROW_BUILD_SHARED)

else()
add_executable(parquet-dump-footer parquet-dump-footer.cc)
target_link_libraries(parquet-dump-footer parquet_static thrift::thrift)
set_target_properties(parquet-dump-footer PROPERTIES INSTALL_RPATH_USE_LINK_PATH TRUE)
install(TARGETS parquet-dump-footer ${INSTALL_IS_OPTIONAL}
RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR})
endif()

add_dependencies(parquet ${PARQUET_TOOLS})
endif()
149 changes: 18 additions & 131 deletions cpp/tools/parquet/parquet_dump_footer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,14 @@

#include <cstdint>
#include <cstring>
#include <exception>
#include <fstream>
#include <iostream>
#include <limits>
#include <optional>
#include <random>

#include <thrift/protocol/TCompactProtocol.h>
#include <thrift/transport/TBufferTransports.h>

#include "arrow/filesystem/filesystem.h"
#include "arrow/util/endian.h"
#include "arrow/util/ubsan.h"
#include "generated/parquet_types.h"
#include "parquet/thrift_internal.h"

using apache::thrift::protocol::TCompactProtocol;
using apache::thrift::transport::TMemoryBuffer;
using apache::thrift::transport::TTransport;
#include "parquet/metadata.h"

namespace parquet {
namespace {
Expand All @@ -49,90 +38,6 @@ void AppendLE32(uint32_t v, std::string* out) {
out->append(reinterpret_cast<const char*>(&v), sizeof(v));
}

template <typename T>
bool Deserialize(const char* data, uint32_t len, T* obj) {
parquet::ThriftDeserializer des(/*string_size_limit=*/1 << 30,
/*container_size_limit=*/1 << 30);
try {
des.DeserializeMessage(reinterpret_cast<const uint8_t*>(data), &len, obj);
return true;
} catch (const std::exception& e) {
std::cerr << "Failed to deserialize: " << e.what() << "\n";
return false;
}
}

template <typename T>
bool Serialize(const T& obj, std::string* out) {
parquet::ThriftSerializer ser(/*initial_buffer_size=*/10 << 20);
try {
ser.SerializeToString(&obj, out);
return true;
} catch (const std::exception& e) {
std::cerr << "Failed to serialize: " << e.what() << "\n";
return false;
}
}

// Replace the contents of s with random data of the same length.
void Scrub(std::string* s) {
static std::mt19937 rng(std::random_device{}());
std::uniform_int_distribution<> caps(65, 90);
for (auto& c : *s) c = caps(rng);
}

void Scrub(parquet::format::FileMetaData* md) {
for (auto& s : md->schema) {
Scrub(&s.name);
}
for (auto& r : md->row_groups) {
for (auto& c : r.columns) {
Scrub(&c.file_path);
if (c.__isset.meta_data) {
auto& m = c.meta_data;
for (auto& p : m.path_in_schema) Scrub(&p);
for (auto& kv : m.key_value_metadata) {
Scrub(&kv.key);
Scrub(&kv.value);
}
Scrub(&m.statistics.max_value);
Scrub(&m.statistics.min_value);
Scrub(&m.statistics.min);
Scrub(&m.statistics.max);
}

if (c.crypto_metadata.__isset.ENCRYPTION_WITH_COLUMN_KEY) {
auto& m = c.crypto_metadata.ENCRYPTION_WITH_COLUMN_KEY;
for (auto& p : m.path_in_schema) Scrub(&p);
Scrub(&m.key_metadata);
}
Scrub(&c.encrypted_column_metadata);
}
}
for (auto& kv : md->key_value_metadata) {
Scrub(&kv.key);
Scrub(&kv.value);
}
Scrub(&md->footer_signing_key_metadata);
}

// Returns:
// - 0 on success
// - -1 on error
// - the size of the footer if tail is too small
int64_t ParseFooter(const std::string& tail, parquet::format::FileMetaData* md) {
if (tail.size() > std::numeric_limits<int32_t>::max()) return -1;

const char* p = tail.data();
const int32_t n = static_cast<int32_t>(tail.size());
int32_t len = ReadLE32(p + n - 8);
if (len > n - 8) return len;

if (!Deserialize(tail.data() + n - 8 - len, len, md)) return -1;
return 0;
}
} // namespace

int DoIt(std::string in, bool scrub, bool json, std::string out) {
std::string path;
auto fs = ::arrow::fs::FileSystemFromUriOrPath(in, &path).ValueOrDie();
Expand All @@ -147,56 +52,38 @@ int DoIt(std::string in, bool scrub, bool json, std::string out) {
tail.resize(tail_len);
char* data = tail.data();
file->ReadAt(file_len - tail_len, tail_len, data).ValueOrDie();
if (ReadLE32(data + tail_len - 4) != ReadLE32("PAR1")) {
if (auto magic = ReadLE32(data + tail_len - 4); magic != ReadLE32("PAR1")) {
std::cerr << "Not a Parquet file: " << in << "\n";
return 4;
}
parquet::format::FileMetaData md;
int64_t res = ParseFooter(tail, &md);
if (res < 0) {
std::cerr << "Failed to parse footer: " << in << "\n";
return 5;
} else if (res > 0) {
if (res > file_len) {
uint32_t metadata_len = ReadLE32(data + tail_len - 8);
if (metadata_len > tail_len - 8) {
if (metadata_len > file_len) {
std::cerr << "File too short: " << in << "\n";
return 6;
return 5;
}
tail_len = res + 8;
tail_len = metadata_len + 8;
tail.resize(tail_len);
data = tail.data();
file->ReadAt(file_len - tail_len, tail_len, data).ValueOrDie();
}
if (ParseFooter(tail, &md) != 0) {
std::cerr << "Failed to parse footer: " << in << "\n";
return 7;
}

if (scrub) Scrub(&md);

std::optional<std::fstream> fout;
if (json) {
if (!out.empty()) fout.emplace(out, std::ios::out);
std::ostream& os = fout ? *fout : std::cout;
md.printTo(os);
} else {
if (!out.empty()) fout.emplace(out, std::ios::out | std::ios::binary);
std::ostream& os = fout ? *fout : std::cout;
if (!os) {
std::cerr << "Failed to open output file: " << out << "\n";
return 8;
}
std::string ser;
if (!Serialize(md, &ser)) return 6;
auto md = FileMetaData::Make(tail.data(), &metadata_len);
std::string ser = md->SerializeUnencrypted(scrub, json);
if (!json) {
AppendLE32(static_cast<uint32_t>(ser.size()), &ser);
ser.append("PAR1", 4);
if (!os.write(ser.data(), ser.size())) {
std::cerr << "Failed to write to output file: " << out << "\n";
return 9;
}
}
std::optional<std::fstream> fout;
if (!out.empty()) fout.emplace(out, std::ios::out);
std::ostream& os = fout ? *fout : std::cout;
if (!os.write(ser.data(), ser.size())) {
std::cerr << "Failed to write to output file: " << out << "\n";
return 6;
}

return 0;
}
} // namespace
} // namespace parquet

static int PrintHelp() {
Expand Down

0 comments on commit aadb316

Please sign in to comment.