Skip to content

Commit

Permalink
columnar data record structure
Browse files Browse the repository at this point in the history
  • Loading branch information
Myrrolinz committed Sep 28, 2024
1 parent 2e1f857 commit 359dc0a
Showing 1 changed file with 335 additions and 0 deletions.
335 changes: 335 additions & 0 deletions src/cypher/resultset/record.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
#pragma once

#include <utility>
#include <unordered_map>
#include "core/data_type.h" // lgraph::FieldData
#include "cypher/cypher_types.h"
#include "parser/data_typedef.h"
#include "graph/node.h"
#include "graph/relationship.h"
#include "cypher/resultset/column_vector.h"

namespace cypher {

Expand Down Expand Up @@ -321,4 +323,337 @@ struct Record {
return *this;
}
};

struct DataChunk {
std::unordered_map<std::string, std::unique_ptr<ColumnVector>> columnar_data_;
std::unordered_map<std::string, std::unique_ptr<ColumnVector>> string_columns_;
std::unordered_map<std::string, uint32_t> property_positions_;
std::unordered_map<std::string, std::vector<uint32_t>> property_vids_;

DataChunk() = default;

void CopyColumn(const std::string& column_name, const DataChunk& source_record,
bool overwrite_existing = true) {
if (source_record.columnar_data_.find(column_name) != source_record.columnar_data_.end()) {
const auto& src_vector = source_record.columnar_data_.at(column_name);
if (overwrite_existing || columnar_data_.find(column_name) == columnar_data_.end()) {
auto dst_vector = std::make_unique<ColumnVector>(*src_vector);
columnar_data_[column_name] = std::move(dst_vector);
property_positions_[column_name] =
source_record.property_positions_.at(column_name);
property_vids_[column_name] =
source_record.property_vids_.at(column_name);
}
} else if (source_record.string_columns_.find(column_name) !=
source_record.string_columns_.end()) {
const auto& src_vector = source_record.string_columns_.at(column_name);
if (overwrite_existing || string_columns_.find(column_name) ==
string_columns_.end()) {
auto dst_vector = std::make_unique<ColumnVector>(*src_vector);
string_columns_[column_name] = std::move(dst_vector);
property_positions_[column_name] =
source_record.property_positions_.at(column_name);
property_vids_[column_name] = source_record.property_vids_.at(column_name);
}
}
}

void MergeColumn(const DataChunk& source_record, bool overwrite_existing = true) {
for (const auto& pair : source_record.columnar_data_) {
CopyColumn(pair.first, source_record, overwrite_existing);
}
for (const auto& pair : source_record.string_columns_) {
CopyColumn(pair.first, source_record, overwrite_existing);
}
}

void TruncateData(int usable_r) {
std::set<uint32_t> sorted_vids;
for (const auto& pair : property_vids_) {
const auto& vids = pair.second;
sorted_vids.insert(vids.begin(), vids.end());
}

std::vector<uint32_t> selected_vids;
for (auto it = sorted_vids.begin(); it != sorted_vids.end() &&
selected_vids.size() < static_cast<size_t>(usable_r); ++it) {
selected_vids.push_back(*it);
}
for (auto& pair : columnar_data_) {
const std::string& column_name = pair.first;
auto& column_vector = pair.second;
const auto& vids = property_vids_.at(column_name);
auto new_vector = std::make_unique<ColumnVector>(
column_vector->GetElementSize(), usable_r);
std::vector<uint32_t> new_vids;
uint32_t new_pos = 0;
for (uint32_t selected_vid : selected_vids) {
auto it = std::find(vids.begin(), vids.end(), selected_vid);
if (it != vids.end()) {
uint32_t original_pos = std::distance(vids.begin(), it);
if (!column_vector->IsNull(original_pos)) {
std::memcpy(new_vector->data() + new_pos * new_vector->GetElementSize(),
column_vector->data() + original_pos *
column_vector->GetElementSize(),
column_vector->GetElementSize());
new_vids.push_back(selected_vid);
}
new_pos++;
}
}
column_vector = std::move(new_vector);
property_vids_[column_name] = new_vids;
property_positions_[column_name] = new_pos;
}

for (auto& pair : string_columns_) {
const std::string& column_name = pair.first;
auto& column_vector = pair.second;
const auto& vids = property_vids_.at(column_name);
auto new_vector = std::make_unique<ColumnVector>(sizeof(cypher_string_t), usable_r);
std::vector<uint32_t> new_vids;
uint32_t new_pos = 0;
for (uint32_t selected_vid : selected_vids) {
auto it = std::find(vids.begin(), vids.end(), selected_vid);
if (it != vids.end()) {
uint32_t original_pos = std::distance(vids.begin(), it);
if (!column_vector->IsNull(original_pos)) {
StringColumn::AddString(new_vector.get(), new_pos,
column_vector->GetValue<cypher_string_t>(original_pos).GetAsString());
new_vids.push_back(selected_vid);
}
new_pos++;
}
}
column_vector = std::move(new_vector);
property_vids_[column_name] = new_vids;
property_positions_[column_name] = new_pos;
}
}

void AppendToEnd(const DataChunk& source_record) {
for (const auto& pair : source_record.string_columns_) {
const std::string& column_name = pair.first;
const auto& src_vector = pair.second;
uint32_t size = source_record.property_positions_.at(column_name);
// PrintStringColumnData(column_name, *src_vector);
if (string_columns_.find(column_name) == string_columns_.end()) {
string_columns_[column_name] = std::make_unique<ColumnVector>(*src_vector);
property_positions_[column_name] =
source_record.property_positions_.at(column_name);
property_vids_[column_name] = source_record.property_vids_.at(column_name);
} else {
auto& dst_vector = string_columns_[column_name];
uint32_t old_size = property_vids_[column_name].size();
uint32_t new_size = old_size + size;
auto new_vector = std::make_unique<ColumnVector>(sizeof(cypher_string_t), new_size);
for (uint32_t i = 0; i < old_size; ++i) {
const cypher_string_t& dst_string = dst_vector->GetValue<cypher_string_t>(i);
StringColumn::AddString(new_vector.get(), i, dst_string.GetAsString());
}
for (uint32_t i = 0; i < size; ++i) {
const cypher_string_t& src_string = src_vector->GetValue<cypher_string_t>(i);
StringColumn::AddString(new_vector.get(),
old_size + i, src_string.GetAsString());
}
string_columns_[column_name] = std::move(new_vector);
property_positions_[column_name] +=
source_record.property_positions_.at(column_name);
auto& dst_vids = property_vids_[column_name];
const auto& src_vids = source_record.property_vids_.at(column_name);
dst_vids.insert(dst_vids.end(), src_vids.begin(), src_vids.end());
}
}

for (const auto& pair : source_record.columnar_data_) {
const std::string& column_name = pair.first;
const auto& src_vector = pair.second;
auto size = source_record.property_positions_.at(column_name);
if (columnar_data_.find(column_name) == columnar_data_.end()) {
columnar_data_[column_name] = std::make_unique<ColumnVector>(*src_vector);
property_positions_[column_name] =
source_record.property_positions_.at(column_name);
property_vids_[column_name] = source_record.property_vids_.at(column_name);
} else {
auto& dst_vector = columnar_data_[column_name];
uint32_t old_size = property_vids_[column_name].size();
uint32_t new_size = old_size + size;
auto new_vector = std::make_unique<ColumnVector>(
dst_vector->GetElementSize(), new_size);
std::memcpy(new_vector->data(), dst_vector->data(),
dst_vector->GetElementSize() * old_size);
std::memcpy(new_vector->data() + old_size * new_vector->GetElementSize(),
src_vector->data(), src_vector->GetElementSize() * size);
columnar_data_[column_name] = std::move(new_vector);
property_positions_[column_name] +=
source_record.property_positions_.at(column_name);
auto& dst_vids = property_vids_[column_name];
const auto& src_vids = source_record.property_vids_.at(column_name);
dst_vids.insert(dst_vids.end(), src_vids.begin(), src_vids.end());
}
}
}

void Print() const {
std::cout << "DataChunk contents:\n";
std::cout << "Columnar Data:\n";
for (const auto& pair : columnar_data_) {
std::cout << " Column Name: " << pair.first << "\n";
std::cout << " Element Size: " << pair.second->GetElementSize() << "\n";
std::cout << " Capacity: " << pair.second->GetCapacity() << "\n";
PrintColumnData(pair.first, *pair.second);
}

std::cout << "String Columns:\n";
for (const auto& pair : string_columns_) {
std::cout << " Column Name: " << pair.first << "\n";
std::cout << " Element Size: " << pair.second->GetElementSize() << "\n";
std::cout << " Capacity: " << pair.second->GetCapacity() << "\n";
PrintStringColumnData(pair.first, *pair.second);
}

std::cout << "Property Positions:\n";
for (const auto& pair : property_positions_) {
std::cout << " Property Name: " << pair.first
<< ", End Position: " << pair.second << "\n";
}

std::cout << "Property VIDs:\n";
for (const auto& pair : property_vids_) {
std::cout << " Property Name: " << pair.first << ", VIDs: [";
const auto& vids = pair.second;
for (size_t i = 0; i < vids.size(); ++i) {
std::cout << vids[i];
if (i < vids.size() - 1) {
std::cout << ", ";
}
}
std::cout << "]\n";
}
}

template<typename T>
void PrintColumnData(const std::string& column_name, const ColumnVector& column) const {
for (uint32_t i = 0; i < column.GetCapacity(); ++i) {
if (!column.IsNull(i)) {
std::cout << " Data[" << i << "]: " << column.GetValue<T>(i) << "\n";
} else {
std::cout << " Data[" << i << "]: NULL\n";
}
}
}

void PrintColumnData(const std::string& column_name, const ColumnVector& column) const {
size_t element_size = column.GetElementSize();
if (element_size == sizeof(int8_t)) {
PrintColumnData<int8_t>(column_name, column);
} else if (element_size == sizeof(int16_t)) {
PrintColumnData<int16_t>(column_name, column);
} else if (element_size == sizeof(int32_t)) {
PrintColumnData<int32_t>(column_name, column);
} else if (element_size == sizeof(int64_t)) {
PrintColumnData<int64_t>(column_name, column);
} else if (element_size == sizeof(float)) {
PrintColumnData<float>(column_name, column);
} else if (element_size == sizeof(double)) {
PrintColumnData<double>(column_name, column);
} else {
std::cout << "Unknown element size: " << element_size << "\n";
}
}

void PrintStringColumnData(const std::string& column_name, const ColumnVector& column) const {
for (uint32_t i = 0; i < column.GetCapacity(); ++i) {
if (!column.IsNull(i)) {
const cypher_string_t& value = column.GetValue<cypher_string_t>(i);
std::cout << " Data[" << i << "]: " << value.GetAsString() << "\n";
} else {
std::cout << " Data[" << i << "]: NULL\n";
}
}
}

std::string Dump(bool is_standard) const {
json arr = json::array();
std::vector<std::string> column_names;
for (const auto& pair : columnar_data_) {
column_names.push_back(pair.first);
}
for (const auto& pair : string_columns_) {
if (std::find(column_names.begin(), column_names.end(),
pair.first) == column_names.end()) {
column_names.push_back(pair.first);
}
}

std::set<uint32_t> common_vids;
for (const auto& pair : property_vids_) {
const auto& vids = pair.second;
common_vids.insert(vids.begin(), vids.end());
}
// for (uint32_t vid : common_vids) {
// std::cout << "Common VID: " << vid << "\n";
// }
for (uint32_t vid : common_vids) {
json j;
j["vid"] = vid;
for (const auto& column_name : column_names) {
if (property_vids_.find(column_name) != property_vids_.end()) {
const auto& vids = property_vids_.at(column_name);
auto it = std::find(vids.begin(), vids.end(), vid);
if (it != vids.end()) {
uint32_t column_pos = std::distance(vids.begin(), it);
if (columnar_data_.find(column_name) != columnar_data_.end()) {
const auto& column_vector = columnar_data_.at(column_name);
if (!column_vector->IsNull(column_pos)) {
if (column_vector->GetElementSize() == sizeof(bool)) {
j[column_name] = column_vector->GetValue<bool>(column_pos);
} else if (column_vector->GetElementSize() == sizeof(int8_t)) {
j[column_name] = column_vector->GetValue<int8_t>(column_pos);
} else if (column_vector->GetElementSize() == sizeof(int16_t)) {
j[column_name] = column_vector->GetValue<int16_t>(column_pos);
} else if (column_vector->GetElementSize() == sizeof(int32_t)) {
j[column_name] = column_vector->GetValue<int32_t>(column_pos);
} else if (column_vector->GetElementSize() == sizeof(int64_t)) {
j[column_name] = column_vector->GetValue<int64_t>(column_pos);
} else if (column_vector->GetElementSize() == sizeof(float)) {
j[column_name] = column_vector->GetValue<float>(column_pos);
} else if (column_vector->GetElementSize() == sizeof(double)) {
j[column_name] = column_vector->GetValue<double>(column_pos);
} else {
throw std::runtime_error(
"Unsupported data type in columnar_data_");
}
}
}
if (string_columns_.find(column_name) != string_columns_.end()) {
const auto& column_vector = string_columns_.at(column_name);
if (!column_vector->IsNull(column_pos)) {
j[column_name] =
column_vector->GetValue<cypher_string_t>(column_pos).GetAsString();
}
}
}
}
}
if (j.is_null()) {
throw std::runtime_error(
"DataChunk has a null row! Maybe your new record is not a reference.");
}
arr.emplace_back(j);
}
// std::cout << "Dump Result: " << arr.dump(4) << std::endl;
if (is_standard) {
json output;
output["header"] = column_names;
output["is_standard"] = true;
output["data"] = arr;
return output.dump();
} else {
return arr.dump();
}
}
};


} // namespace cypher

0 comments on commit 359dc0a

Please sign in to comment.