Skip to content

Commit

Permalink
Sparse serialization suport
Browse files Browse the repository at this point in the history
  • Loading branch information
den818 committed Nov 13, 2022
1 parent 574da81 commit ccb2736
Show file tree
Hide file tree
Showing 34 changed files with 1,310 additions and 229 deletions.
2 changes: 2 additions & 0 deletions clickhouse/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ SET ( clickhouse-cpp-lib-src
columns/lowcardinalityadaptor.h
columns/nullable.cpp
columns/numeric.cpp
columns/serialization.cpp
columns/string.cpp
columns/tuple.cpp
columns/uuid.cpp
Expand Down Expand Up @@ -115,6 +116,7 @@ INSTALL(FILES columns/itemview.h DESTINATION include/clickhouse/columns/)
INSTALL(FILES columns/lowcardinality.h DESTINATION include/clickhouse/columns/)
INSTALL(FILES columns/nullable.h DESTINATION include/clickhouse/columns/)
INSTALL(FILES columns/numeric.h DESTINATION include/clickhouse/columns/)
INSTALL(FILES columns/serialization.h DESTINATION include/clickhouse/columns/)
INSTALL(FILES columns/string.h DESTINATION include/clickhouse/columns/)
INSTALL(FILES columns/tuple.h DESTINATION include/clickhouse/columns/)
INSTALL(FILES columns/utils.h DESTINATION include/clickhouse/columns/)
Expand Down
37 changes: 36 additions & 1 deletion clickhouse/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@
#define DBMS_MIN_REVISION_WITH_DISTRIBUTED_DEPTH 54448
#define DBMS_MIN_REVISION_WITH_INITIAL_QUERY_START_TIME 54449
#define DBMS_MIN_REVISION_WITH_INCREMENTAL_PROFILE_EVENTS 54451
#define DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS 54453
#define DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION 54454

#define REVISION DBMS_MIN_REVISION_WITH_INCREMENTAL_PROFILE_EVENTS
#define REVISION DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION

namespace clickhouse {

Expand Down Expand Up @@ -552,7 +554,19 @@ bool Client::Impl::ReadBlock(InputStream& input, Block* block) {
return false;
}

uint8_t has_custom_serialization = 0;
if (REVISION >= DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION) {
if (!WireFormat::ReadFixed(input, &has_custom_serialization)) {
return false;
}
}

if (ColumnRef col = CreateColumnByType(type, create_column_settings)) {

if (has_custom_serialization) {
col->LoadSerializationKind(&input);
}

if (num_rows && !col->Load(&input, num_rows)) {
throw ProtocolError("can't load column '" + name + "' of type " + type);
}
Expand Down Expand Up @@ -708,6 +722,16 @@ void Client::Impl::SendQuery(const Query& query) {
throw UnimplementedError(std::string("Can't send open telemetry tracing context to a server, server version is too old"));
}
}

if (server_info_.revision >= DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS)
{
// collaborate_with_initiator
WireFormat::WriteUInt64 (*output_, 0u);
// count_participating_replicas
WireFormat::WriteUInt64 (*output_, 0u);
// number_of_current_replica
WireFormat::WriteUInt64 (*output_, 0u);
}
}

/// Per query settings
Expand Down Expand Up @@ -757,6 +781,17 @@ void Client::Impl::WriteBlock(const Block& block, OutputStream& output) {
WireFormat::WriteString(output, bi.Name());
WireFormat::WriteString(output, bi.Type()->GetName());

bool has_custom = bi.Column()->HasCustomSerialization();
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION) {
WireFormat::WriteFixed(output, static_cast<uint8_t>(has_custom));
if (has_custom) {
bi.Column()->SaveSerializationKind(&output);
}
} else {
// Current implementation works only for server version >= v22.1.2.2-stable
throw UnimplementedError(std::string("Can't send column with custom serialisation to a server, server version is too old"));
}

// Empty columns are not serialized and occupy exactly 0 bytes.
// ref https://github.com/ClickHouse/ClickHouse/blob/39b37a3240f74f4871c8c1679910e065af6bea19/src/Formats/NativeWriter.cpp#L163
const bool containsData = block.GetRowCount() > 0;
Expand Down
29 changes: 20 additions & 9 deletions clickhouse/columns/array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ ColumnArray::ColumnArray(ColumnRef data)
}

ColumnArray::ColumnArray(ColumnRef data, std::shared_ptr<ColumnUInt64> offsets)
: Column(Type::CreateArray(data->Type()))
: Column(Type::CreateArray(data->Type()), Serialization::MakeDefault(this))
, data_(data)
, offsets_(offsets)
{
}

ColumnArray::ColumnArray(ColumnArray&& other)
: Column(other.Type())
: Column(other.Type(), Serialization::MakeDefault(this))
, data_(std::move(other.data_))
, offsets_(std::move(other.offsets_))
{
Expand Down Expand Up @@ -73,30 +73,29 @@ bool ColumnArray::LoadPrefix(InputStream* input, size_t rows) {
if (!rows) {
return true;
}

return data_->LoadPrefix(input, rows);
return data_->GetSerialization()->LoadPrefix(data_.get(), input, rows);
}

bool ColumnArray::LoadBody(InputStream* input, size_t rows) {
if (!rows) {
return true;
}
if (!offsets_->LoadBody(input, rows)) {
if (!offsets_->GetSerialization()->LoadBody(offsets_.get(), input, rows)) {
return false;
}
if (!data_->LoadBody(input, (*offsets_)[rows - 1])) {
if (!data_->GetSerialization()->LoadBody(data_.get(), input, (*offsets_)[rows - 1])) {
return false;
}
return true;
}

void ColumnArray::SavePrefix(OutputStream* output) {
data_->SavePrefix(output);
data_->GetSerialization()->SavePrefix(data_.get(), output);
}

void ColumnArray::SaveBody(OutputStream* output) {
offsets_->SaveBody(output);
data_->SaveBody(output);
offsets_->GetSerialization()->SaveBody(offsets_.get(), output);
data_->GetSerialization()->SaveBody(data_.get(), output);
}

void ColumnArray::Clear() {
Expand All @@ -114,6 +113,18 @@ void ColumnArray::Swap(Column& other) {
offsets_.swap(col.offsets_);
}

void ColumnArray::SetSerializationKind(Serialization::Kind kind) {
switch (kind)
{
case Serialization::Kind::DEFAULT:
serialization_ = Serialization::MakeDefault(this);
break;
default:
throw UnimplementedError("Serialization kind:" + std::to_string(static_cast<int>(kind))
+ " is not supported for column of " + type_->GetName());
}
}

void ColumnArray::OffsetsIncrease(size_t n) {
offsets_->Append(n);
}
Expand Down
28 changes: 16 additions & 12 deletions clickhouse/columns/array.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,6 @@ class ColumnArray : public Column {
/// Appends content of given column to the end of current one.
void Append(ColumnRef column) override;

/// Loads column prefix from input stream.
bool LoadPrefix(InputStream* input, size_t rows) override;

/// Loads column data from input stream.
bool LoadBody(InputStream* input, size_t rows) override;

/// Saves column prefix to output stream.
void SavePrefix(OutputStream* output) override;

/// Saves column data to output stream.
void SaveBody(OutputStream* output) override;

/// Clear column data .
void Clear() override;

Expand All @@ -72,6 +60,8 @@ class ColumnArray : public Column {
ColumnRef CloneEmpty() const override;
void Swap(Column&) override;

void SetSerializationKind(Serialization::Kind kind) override;

void OffsetsIncrease(size_t);

protected:
Expand All @@ -86,6 +76,20 @@ class ColumnArray : public Column {
void Reset();

private:
/// Loads column prefix from input stream.
bool LoadPrefix(InputStream* input, size_t rows);

/// Loads column data from input stream.
bool LoadBody(InputStream* input, size_t rows);

/// Saves column prefix to output stream.
void SavePrefix(OutputStream* output);

/// Saves column data to output stream.
void SaveBody(OutputStream* output);

friend SerializationDefault<ColumnArray>;

ColumnRef data_;
std::shared_ptr<ColumnUInt64> offsets_;
};
Expand Down
40 changes: 30 additions & 10 deletions clickhouse/columns/column.cpp
Original file line number Diff line number Diff line change
@@ -1,24 +1,44 @@
#include "column.h"

#include "../base/wire_format.h"

namespace clickhouse {

bool Column::LoadPrefix(InputStream*, size_t) {
/// does nothing by default
bool Column::Load(InputStream* input, size_t rows) {
assert(serialization_);
return serialization_->LoadPrefix(this, input, rows)
&& serialization_->LoadBody(this, input, rows);
}

/// Saves column data to output stream.
void Column::Save(OutputStream* output) {
assert(serialization_);
serialization_->SavePrefix(this, output);
serialization_->SaveBody(this,output);
}

bool Column::LoadSerializationKind(InputStream* input) {
uint8_t kind;
if (!WireFormat::ReadFixed(*input, &kind)) {
return false;
}
SetSerializationKind(static_cast<Serialization::Kind>(kind));
return true;
}

bool Column::Load(InputStream* input, size_t rows) {
return LoadPrefix(input, rows) && LoadBody(input, rows);
void Column::SaveSerializationKind(OutputStream* output) {
assert(serialization_);
WireFormat::WriteFixed(*output, static_cast<uint8_t>(serialization_->GetKind()));
}

void Column::SavePrefix(OutputStream*) {
/// does nothing by default
SerializationRef Column::GetSerialization() {
assert(serialization_);
return serialization_;
}

/// Saves column data to output stream.
void Column::Save(OutputStream* output) {
SavePrefix(output);
SaveBody(output);
bool Column::HasCustomSerialization() const {
assert(serialization_);
return serialization_->GetKind() != Serialization::Kind::DEFAULT;
}

}
30 changes: 17 additions & 13 deletions clickhouse/columns/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "../types/types.h"
#include "../columns/itemview.h"
#include "../columns/serialization.h"
#include "../exceptions.h"

#include <memory>
Expand All @@ -19,7 +20,11 @@ using ColumnRef = std::shared_ptr<class Column>;
*/
class Column : public std::enable_shared_from_this<Column> {
public:
explicit inline Column(TypeRef type) : type_(type) {}
explicit inline Column(TypeRef type, SerializationRef serialization)
: type_(std::move(type))
, serialization_(std::move(serialization))
{
}

virtual ~Column() {}

Expand Down Expand Up @@ -56,18 +61,6 @@ class Column : public std::enable_shared_from_this<Column> {
/// Should be called only once from the client. Derived classes should not call it.
bool Load(InputStream* input, size_t rows);

/// Loads column prefix from input stream.
virtual bool LoadPrefix(InputStream* input, size_t rows);

/// Loads column data from input stream.
virtual bool LoadBody(InputStream* input, size_t rows) = 0;

/// Saves column prefix to output stream. Column types with prefixes must implement it.
virtual void SavePrefix(OutputStream* output);

/// Saves column body to output stream.
virtual void SaveBody(OutputStream* output) = 0;

/// Template method to save to output stream. It'll call SavePrefix and SaveBody respectively
/// Should be called only once from the client. Derived classes should not call it.
/// Save is split in Prefix and Body because some data types require prefixes and specific serialization order.
Expand All @@ -93,12 +86,23 @@ class Column : public std::enable_shared_from_this<Column> {
throw UnimplementedError("GetItem() is not supported for column of " + type_->GetName());
}

virtual bool LoadSerializationKind(InputStream* input);

virtual void SaveSerializationKind(OutputStream* output);

virtual void SetSerializationKind(Serialization::Kind kind) = 0;

SerializationRef GetSerialization();

virtual bool HasCustomSerialization() const;

friend void swap(Column& left, Column& right) {
left.Swap(right);
}

protected:
TypeRef type_;
SerializationRef serialization_;
};

} // namespace clickhouse
Loading

0 comments on commit ccb2736

Please sign in to comment.