Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse connection to Extract,Consume and execute substrait query plans #113

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 52 additions & 15 deletions src/from_substrait.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,6 @@

#include "duckdb/common/types/value.hpp"
#include "duckdb/parser/expression/list.hpp"
#include "duckdb/main/relation/join_relation.hpp"
#include "duckdb/main/relation/cross_product_relation.hpp"

#include "duckdb/main/relation/limit_relation.hpp"
#include "duckdb/main/relation/projection_relation.hpp"
#include "duckdb/main/relation/setop_relation.hpp"
#include "duckdb/main/relation/aggregate_relation.hpp"
#include "duckdb/main/relation/filter_relation.hpp"
#include "duckdb/main/relation/order_relation.hpp"
#include "duckdb/main/connection.hpp"
#include "duckdb/parser/parser.hpp"
#include "duckdb/common/exception.hpp"
Expand All @@ -25,7 +16,24 @@
#include "google/protobuf/util/json_util.h"
#include "substrait/plan.pb.h"

#include "duckdb/main/table_description.hpp"

#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
#include "duckdb/common/helper.hpp"

#include "duckdb/main/relation.hpp"
#include "duckdb/main/relation/table_relation.hpp"
#include "duckdb/main/relation/table_function_relation.hpp"
#include "duckdb/main/relation/value_relation.hpp"
#include "duckdb/main/relation/view_relation.hpp"
#include "duckdb/main/relation/aggregate_relation.hpp"
#include "duckdb/main/relation/cross_product_relation.hpp"
#include "duckdb/main/relation/filter_relation.hpp"
#include "duckdb/main/relation/join_relation.hpp"
#include "duckdb/main/relation/limit_relation.hpp"
#include "duckdb/main/relation/order_relation.hpp"
#include "duckdb/main/relation/projection_relation.hpp"
#include "duckdb/main/relation/setop_relation.hpp"

namespace duckdb {
const std::unordered_map<std::string, std::string> SubstraitToDuckDB::function_names_remap = {
Expand All @@ -40,7 +48,7 @@ const case_insensitive_set_t SubstraitToDuckDB::valid_extract_subfields = {
"quarter", "microsecond", "milliseconds", "second", "minute", "hour"};

string SubstraitToDuckDB::RemapFunctionName(const string &function_name) {
// Lets first drop any extension id
// Let's first drop any extension id
string name;
for (auto &c : function_name) {
if (c == ':') {
Expand All @@ -67,7 +75,9 @@ string SubstraitToDuckDB::RemoveExtension(const string &function_name) {
return name;
}

SubstraitToDuckDB::SubstraitToDuckDB(Connection &con_p, const string &serialized, bool json) : con(con_p) {
SubstraitToDuckDB::SubstraitToDuckDB(shared_ptr<ClientContext> &context_p, const string &serialized, bool json,
bool acquire_lock_p)
: context(context_p), acquire_lock(acquire_lock_p) {
if (!json) {
if (!plan.ParseFromString(serialized)) {
throw std::runtime_error("Was not possible to convert binary into Substrait plan");
Expand Down Expand Up @@ -511,16 +521,38 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformAggregateOp(const substrait::Re
return make_shared_ptr<AggregateRelation>(TransformOp(sop.aggregate().input()), std::move(expressions),
std::move(groups));
}
unique_ptr<TableDescription> TableInfo(ClientContext &context, const string &schema_name, const string &table_name) {
unique_ptr<TableDescription> result;
// obtain the table info
auto table = Catalog::GetEntry<TableCatalogEntry>(context, INVALID_CATALOG, schema_name, table_name,
OnEntryNotFound::RETURN_NULL);
if (!table) {
return {};
}
// write the table info to the result
result = make_uniq<TableDescription>();
result->schema = schema_name;
result->table = table_name;
for (auto &column : table->GetColumns().Logical()) {
result->columns.emplace_back(column.Copy());
}
return result;
}

shared_ptr<Relation> SubstraitToDuckDB::TransformReadOp(const substrait::Rel &sop) {
auto &sget = sop.read();
shared_ptr<Relation> scan;
if (sget.has_named_table()) {
auto table_name = sget.named_table().names(0);
// If we can't find a table with that name, let's try a view.
try {
scan = con.Table(sget.named_table().names(0));
auto table_info = TableInfo(*context, DEFAULT_SCHEMA, table_name);
if (!table_info) {
throw CatalogException("Table '%s' does not exist!", table_name);
}
scan = make_shared_ptr<TableRelation>(context, std::move(table_info), acquire_lock);
} catch (...) {
scan = con.View(sget.named_table().names(0));
scan = make_shared_ptr<ViewRelation>(context, DEFAULT_SCHEMA, table_name, acquire_lock);
}
} else if (sget.has_local_files()) {
vector<Value> parquet_files;
Expand All @@ -541,7 +573,11 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformReadOp(const substrait::Rel &so
}
string name = "parquet_" + StringUtil::GenerateRandomName();
named_parameter_map_t named_parameters({{"binary_as_string", Value::BOOLEAN(false)}});
scan = con.TableFunction("parquet_scan", {Value::LIST(parquet_files)}, named_parameters)->Alias(name);
vector<Value> parameters {Value::LIST(parquet_files)};
auto scan_rel = make_shared_ptr<TableFunctionRelation>(
context, "parquet_scan", parameters, std::move(named_parameters), nullptr, true, acquire_lock);
auto rel = static_cast<Relation *>(scan_rel.get());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably beyond the scope of this PR but does this handle emitting multiple names for struct types?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey David,
I don't think this PR will alter anything related to multiple names for structs. The goal here is to support temporary objects (e.g., a pyarrow object that has been registered in the python client) in substrait!

scan = rel->Alias(name);
} else if (sget.has_virtual_table()) {
// We need to handle a virtual table as a LogicalExpressionGet
auto literal_values = sget.virtual_table().values();
Expand All @@ -554,7 +590,8 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformReadOp(const substrait::Rel &so
}
expression_rows.emplace_back(expression_row);
}
scan = con.Values(expression_rows);
vector<string> column_names;
scan = make_shared_ptr<ValueRelation>(context, expression_rows, column_names, "values", acquire_lock);
} else {
throw NotImplementedException("Unsupported type of read operator for substrait");
}
Expand Down
17 changes: 14 additions & 3 deletions src/include/from_substrait.hpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
//===----------------------------------------------------------------------===//
// DuckDB
//
// from_substrait.hpp
//
//
//===----------------------------------------------------------------------===//

#pragma once

#include <string>
Expand All @@ -10,7 +18,8 @@ namespace duckdb {

class SubstraitToDuckDB {
public:
SubstraitToDuckDB(Connection &con_p, const string &serialized, bool json = false);
SubstraitToDuckDB(shared_ptr<ClientContext> &context_p, const string &serialized, bool json = false,
bool acquire_lock = false);
//! Transforms Substrait Plan to DuckDB Relation
shared_ptr<Relation> TransformPlan();

Expand Down Expand Up @@ -48,8 +57,8 @@ class SubstraitToDuckDB {

//! Transform Substrait Sort Order to DuckDB Order
OrderByNode TransformOrder(const substrait::SortField &sordf);
//! DuckDB Connection
Connection &con;
//! DuckDB Client Context
shared_ptr<ClientContext> context;
//! Substrait Plan
substrait::Plan plan;
//! Variable used to register functions
Expand All @@ -59,5 +68,7 @@ class SubstraitToDuckDB {
static const unordered_map<std::string, std::string> function_names_remap;
static const case_insensitive_set_t valid_extract_subfields;
vector<ParsedExpression *> struct_expressions;
//! If we should acquire a client context lock when creating the relatiosn
const bool acquire_lock;
};
} // namespace duckdb
8 changes: 8 additions & 0 deletions src/include/to_substrait.hpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
//===----------------------------------------------------------------------===//
// DuckDB
//
// to_substrait.hpp
//
//
//===----------------------------------------------------------------------===//

#pragma once

#include "custom_extensions/custom_extensions.hpp"
Expand Down
Loading
Loading