Skip to content

Commit

Permalink
feat: initial commit to add velox as a consumer
Browse files Browse the repository at this point in the history
fix: update java lib path for linux

fix: use double instead of decimal in duckdb

fix: update substrait to use doubles and varchar

feat: velox consumer support for extension function tests

fix: add updated schema for isthmus

fix: update rounding functions

fix: update boolean function tests

fix: update schemas for consumers
  • Loading branch information
richtia committed Jan 19, 2023
1 parent b0c9a87 commit 80f35a0
Show file tree
Hide file tree
Showing 19 changed files with 594 additions and 356 deletions.
10 changes: 5 additions & 5 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import duckdb
import pytest

from filelock import FileLock
from tests.consumers import AceroConsumer, DuckDBConsumer

from tests.consumers import AceroConsumer, DuckDBConsumer, VeloxConsumer
from tests.producers import DuckDBProducer, IbisProducer, IsthmusProducer


Expand Down Expand Up @@ -32,19 +32,19 @@ def pytest_addoption(parser):
action="store",
default=",".join([x.__name__ for x in CONSUMERS]),
help=f"A comma separated list of consumers to run against.",
choices=[x.__name__ for x in CONSUMERS]
choices=[x.__name__ for x in CONSUMERS],
)
parser.addoption(
"--producer",
action="store",
default=",".join([x.__name__ for x in PRODUCERS]),
help="A comma separated list of producers to run against.",
choices=[x.__name__ for x in PRODUCERS]
choices=[x.__name__ for x in PRODUCERS],
)


PRODUCERS = [DuckDBProducer, IbisProducer, IsthmusProducer]
CONSUMERS = [AceroConsumer, DuckDBConsumer]
CONSUMERS = [AceroConsumer, DuckDBConsumer, VeloxConsumer]


def _get_consumers():
Expand Down
47 changes: 46 additions & 1 deletion tests/consumers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.substrait as substrait
import velox

from tests.common import SubstraitUtils
from tests.schema_updates import PA_SCHEMA, TABLE_TO_RECREATE


class DuckDBConsumer:
Expand Down Expand Up @@ -68,6 +70,14 @@ def load_tables_from_parquet(
create_table_sql = f"CREATE TABLE {table_name} AS SELECT * FROM read_parquet('{file_path}');"
self.db_connection.execute(create_table_sql)
created_tables.add(table_name)
if table_name in TABLE_TO_RECREATE.keys():
self.db_connection.query(
f"ALTER TABLE {table_name} RENAME TO {table_name}_orig"
)
self.db_connection.query(f"{TABLE_TO_RECREATE[table_name]}")
self.db_connection.query(
f"insert into {table_name} select * from {table_name}_orig"
)
table_names.append(table_name)

return table_names
Expand All @@ -93,7 +103,9 @@ def setup(self, db_connection, file_names: Iterable[str]):
)
if table_name not in self.created_tables:
self.created_tables.add(table_name)
self.tables[table_name] = pq.read_table(file_path)
self.tables[table_name] = pq.read_table(
file_path, schema=PA_SCHEMA[table_name]
)
else:
table = pa.table(
{
Expand Down Expand Up @@ -144,3 +156,36 @@ def run_substrait_query(self, substrait_query: bytes) -> pa.Table:
result = reader.read_all()

return result


class VeloxConsumer:
"""
Adapts the Velox Substrait consumer to the test framework.
"""

def __init__(self):
self.created_tables = set()
self.tables = {}
self.table_provider = lambda names: self.tables[names[0]]

def setup(self, db_connection, file_names: Iterable[str]):
pass

def run_substrait_query(self, substrait_query: bytes) -> pa.Table:
"""
Run the substrait plan against Velox.
Parameters:
substrait_query:
A json formatted byte representation of the substrait query plan.
Returns:
A pyarrow table resulting from running the substrait query plan.
"""
velox_result = velox.from_json(substrait_query)

record_batches = []
for vec in velox_result:
record_batches.append(vec.to_arrow())

return pa.Table.from_batches(record_batches)
2 changes: 1 addition & 1 deletion tests/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
REPO_DIR = Path(__file__).parent.parent
from com.google.protobuf.util import JsonFormat as json_formatter

schema_file = Path.joinpath(REPO_DIR, "tests/data/tpch_parquet/schema.sql")
schema_file = Path.joinpath(REPO_DIR, "tests/data/schema.sql")


def produce_isthmus_substrait(sql_string, schema_list):
Expand Down
8 changes: 8 additions & 0 deletions tests/data/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CREATE TABLE lineitem(l_orderkey INTEGER, l_partkey INTEGER, l_suppkey INTEGER, l_linenumber INTEGER, l_quantity INTEGER, l_extendedprice DOUBLE, l_discount DOUBLE, l_tax DOUBLE, l_returnflag VARCHAR, l_linestatus VARCHAR, l_shipdate DATE, l_commitdate DATE, l_receiptdate DATE, l_shipinstruct VARCHAR, l_shipmode VARCHAR, l_comment VARCHAR);
CREATE TABLE orders(o_orderkey INTEGER, o_custkey INTEGER, o_orderstatus VARCHAR, o_totalprice DOUBLE, o_orderdate DATE, o_orderpriority VARCHAR, o_clerk VARCHAR, o_shippriority INTEGER, o_comment VARCHAR);
CREATE TABLE partsupp(ps_partkey INTEGER, ps_suppkey INTEGER, ps_availqty INTEGER, ps_supplycost DOUBLE, ps_comment VARCHAR);
CREATE TABLE part(p_partkey INTEGER, p_name VARCHAR, p_mfgr VARCHAR, p_brand VARCHAR, p_type VARCHAR, p_size INTEGER, p_container VARCHAR, p_retailprice DOUBLE, p_comment VARCHAR);
CREATE TABLE customer(c_custkey INTEGER, c_name VARCHAR, c_address VARCHAR, c_nationkey INTEGER, c_phone VARCHAR, c_acctbal DOUBLE, c_mktsegment VARCHAR, c_comment VARCHAR);
CREATE TABLE supplier(s_suppkey INTEGER, s_name VARCHAR, s_address VARCHAR, s_nationkey INTEGER, s_phone VARCHAR, s_acctbal DOUBLE, s_comment VARCHAR);
CREATE TABLE nation(n_nationkey INTEGER, n_name VARCHAR, n_regionkey INTEGER, n_comment VARCHAR);
CREATE TABLE region(r_regionkey INTEGER, r_name VARCHAR, r_comment VARCHAR);
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def test_boolean_functions(
ibis_expr: Callable[[Table], Table],
producer,
consumer,
partsupp
) -> None:
substrait_function_test(
self.db_connection,
Expand All @@ -55,6 +54,5 @@ def test_boolean_functions(
ibis_expr,
producer,
consumer,
partsupp,
self.table_t,
)
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
def ceil_expr(partsupp):
new_col = partsupp.ps_supplycost.ceil().name("CEIL_SUPPLYCOST")
return partsupp[partsupp.ps_supplycost, new_col]
return partsupp[new_col]


def floor_expr(partsupp):
new_col = partsupp.ps_supplycost.floor().name("FLOOR_SUPPLYCOST")
return partsupp[partsupp.ps_supplycost, new_col]
return partsupp[new_col]


IBIS_SCALAR = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@
SQL_AGGREGATE = {
"sum": (
"""
SELECT sum(L_EXTENDEDPRICE) AS SUM_EXTENDEDPRICE
SELECT CAST(sum(L_EXTENDEDPRICE) AS DECIMAL(15,2)) AS SUM_EXTENDEDPRICE
FROM '{}';
""",
[DuckDBProducer],
),
"avg": (
"""
SELECT avg(L_EXTENDEDPRICE) AS AVG_EXTENDEDPRICE
SELECT CAST(avg(L_EXTENDEDPRICE) AS DECIMAL(15,2)) AVG_EXTENDEDPRICE
FROM '{}';
""",
[DuckDBProducer],
Expand Down
4 changes: 2 additions & 2 deletions tests/functional/queries/sql/rounding_functions_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
SQL_SCALAR = {
"ceil": (
"""
SELECT PS_SUPPLYCOST, ceil(PS_SUPPLYCOST) AS CEIL_SUPPLYCOST
SELECT ceil(PS_SUPPLYCOST) AS CEIL_SUPPLYCOST
FROM '{}';
""",
[DuckDBProducer],
),
"floor": (
"""
SELECT PS_SUPPLYCOST, floor(PS_SUPPLYCOST) AS FLOOR_SUPPLYCOST
SELECT floor(PS_SUPPLYCOST) AS FLOOR_SUPPLYCOST
FROM '{}';
""",
[DuckDBProducer],
Expand Down
12 changes: 6 additions & 6 deletions tests/integration/queries/tpch_sql/q1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ SELECT
l_returnflag,
l_linestatus,
sum(l_quantity) AS sum_qty,
sum(l_extendedprice) AS sum_base_price,
sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
avg(l_quantity) AS avg_qty,
avg(l_extendedprice) AS avg_price,
avg(l_discount) AS avg_disc,
round(sum(l_extendedprice), 2) AS sum_base_price,
round(sum(l_extendedprice * (1 - l_discount)), 2) AS sum_disc_price,
round(sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)), 2) AS sum_charge,
round(avg(l_quantity), 2) AS avg_qty,
round(avg(l_extendedprice), 2) AS avg_price,
round(avg(l_discount), 2) AS avg_disc,
count(*) AS count_order
FROM
'{}'
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/queries/tpch_sql/q5.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
SELECT
n_name,
sum(l_extendedprice * (1 - l_discount)) AS revenue
round(sum(l_extendedprice * (1 - l_discount)), 2) AS revenue
FROM
'{}', '{}', '{}', '{}', '{}', '{}'
WHERE
Expand Down
Loading

0 comments on commit 80f35a0

Please sign in to comment.