Skip to content

Commit

Permalink
feat: make substrait optional to allow avoiding libgit2 (#2168)
Browse files Browse the repository at this point in the history
Hide substrait behind a feature. This will remove the libgit2 dependency
in the short term. In the medium term this won't be needed because
datafusion's next release should include substrait 0.29.3 which removes
libgit2. However, we still might want to keep substrait behind a feature
just to stick to a general spirit of keeping things lean.
  • Loading branch information
westonpace authored Apr 8, 2024
1 parent 38fc5a5 commit 8797164
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 28 deletions.
2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ datafusion-sql = "36.0"
datafusion-expr = "36.0"
datafusion-execution = "36.0"
datafusion-physical-expr = "36.0"
datafusion-substrait = "36.0"
either = "1.0"
futures = "0.3"
http = "0.2.9"
Expand All @@ -119,7 +118,6 @@ serde = { version = "^1" }
serde_json = { version = "1" }
shellexpand = "3.0"
snafu = "0.7.4"
substrait-expr = "0.2.0"
tempfile = "3"
tokio = { version = "1.23", features = [
"rt-multi-thread",
Expand Down
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ half = { version = "2.3", default-features = false, features = [
"num-traits",
"std",
] }
lance = { path = "../rust/lance", features = ["tensorflow", "dynamodb"] }
lance = { path = "../rust/lance", features = ["tensorflow", "dynamodb", "substrait"] }
lance-arrow = { path = "../rust/lance-arrow" }
lance-core = { path = "../rust/lance-core" }
lance-datagen = { path = "../rust/lance-datagen", optional = true }
Expand Down
6 changes: 4 additions & 2 deletions rust/lance-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async-trait.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-physical-expr.workspace = true
datafusion-substrait.workspace = true
datafusion-substrait = { version = "36.0", optional = true }
futures.workspace = true
lance-arrow.workspace = true
lance-core = { workspace = true, features = ["datafusion"] }
Expand All @@ -27,5 +27,7 @@ snafu.workspace = true
tokio.workspace = true

[dev-dependencies]
substrait-expr.workspace = true
substrait-expr = { version = "0.2.1" }

[features]
substrait = ["dep:datafusion-substrait"]
54 changes: 32 additions & 22 deletions rust/lance-datafusion/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,29 @@ use std::sync::Arc;

use arrow::compute::cast;
use arrow_array::{cast::AsArray, ArrayRef};
use arrow_schema::{DataType, Schema, TimeUnit};
use datafusion::{
datasource::empty::EmptyTable, execution::context::SessionContext, logical_expr::Expr,
};
use datafusion_common::{
tree_node::{Transformed, TreeNode},
Column, DataFusionError, ScalarValue, TableReference,
};
use prost::Message;
use snafu::{location, Location};
use arrow_schema::{DataType, TimeUnit};
use datafusion_common::ScalarValue;

use datafusion_substrait::substrait::proto::{
expression_reference::ExprType,
plan_rel::RelType,
read_rel::{NamedTable, ReadType},
rel, ExtendedExpression, Plan, PlanRel, ProjectRel, ReadRel, Rel, RelRoot,
#[cfg(feature = "substrait")]
use {
arrow_schema::Schema,
datafusion::{
datasource::empty::EmptyTable, execution::context::SessionContext, logical_expr::Expr,
},
datafusion_common::{
tree_node::{Transformed, TreeNode},
Column, DataFusionError, TableReference,
},
datafusion_substrait::substrait::proto::{
expression_reference::ExprType,
plan_rel::RelType,
read_rel::{NamedTable, ReadType},
rel, ExtendedExpression, Plan, PlanRel, ProjectRel, ReadRel, Rel, RelRoot,
},
lance_core::{Error, Result},
prost::Message,
snafu::{location, Location},
};
use lance_core::{Error, Result};

const MS_PER_DAY: i64 = 86400000;

Expand Down Expand Up @@ -422,6 +427,7 @@ pub fn safe_coerce_scalar(value: &ScalarValue, ty: &DataType) -> Option<ScalarVa
/// Convert a Substrait ExtendedExpressions message into a DF Expr
///
/// The ExtendedExpressions message must contain a single scalar expression
#[cfg(feature = "substrait")]
pub async fn parse_substrait(expr: &[u8], input_schema: Arc<Schema>) -> Result<Expr> {
let envelope = ExtendedExpression::decode(expr)?;
if envelope.referred_expr.is_empty() {
Expand Down Expand Up @@ -541,12 +547,15 @@ pub async fn parse_substrait(expr: &[u8], input_schema: Arc<Schema>) -> Result<E
mod tests {
use super::*;

use arrow_schema::Field;
use datafusion::logical_expr::{BinaryExpr, Operator};
use substrait_expr::{
builder::{schema::SchemaBuildersExt, BuilderParams, ExpressionsBuilder},
functions::functions_comparison::FunctionsComparisonExt,
helpers::{literals::literal, schema::SchemaInfo},
#[cfg(feature = "substrait")]
use {
arrow_schema::Field,
datafusion::logical_expr::{BinaryExpr, Operator},
substrait_expr::{
builder::{schema::SchemaBuildersExt, BuilderParams, ExpressionsBuilder},
functions::functions_comparison::FunctionsComparisonExt,
helpers::{literals::literal, schema::SchemaInfo},
},
};

#[test]
Expand Down Expand Up @@ -824,6 +833,7 @@ mod tests {
);
}

#[cfg(feature = "substrait")]
#[tokio::test]
async fn test_substrait_conversion() {
let schema = SchemaInfo::new_full()
Expand Down
1 change: 1 addition & 0 deletions rust/lance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ cli = ["clap", "lzma-sys/static"]
tensorflow = ["tfrecord"]
dynamodb = ["lance-table/dynamodb", "aws-sdk-dynamodb"]
dynamodb_tests = ["dynamodb"]
substrait = ["lance-datafusion/substrait"]

[[bin]]
name = "lq"
Expand Down
5 changes: 4 additions & 1 deletion rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ use futures::TryStreamExt;
use lance_arrow::floats::{coerce_float_vector, FloatType};
use lance_core::{ROW_ID, ROW_ID_FIELD};
use lance_datafusion::exec::{execute_plan, LanceExecutionOptions};
use lance_datafusion::expr::parse_substrait;
use lance_index::vector::{Query, DIST_COL};
use lance_index::{scalar::expression::ScalarIndexExpr, DatasetIndexExt};
use lance_io::stream::RecordBatchStream;
Expand All @@ -66,6 +65,9 @@ use crate::io::exec::{
use crate::{Error, Result};
use snafu::{location, Location};

#[cfg(feature = "substrait")]
use lance_datafusion::expr::parse_substrait;

pub const DEFAULT_BATCH_SIZE: usize = 8192;

// Same as pyarrow Dataset::scanner()
Expand Down Expand Up @@ -345,6 +347,7 @@ impl Scanner {
///
/// The message must contain exactly one expression and that expression
/// must be a scalar expression whose return type is boolean.
#[cfg(feature = "substrait")]
pub async fn filter_substrait(&mut self, filter: &[u8]) -> Result<&mut Self> {
let schema = Arc::new(ArrowSchema::from(self.dataset.schema()));
let expr = parse_substrait(filter, schema.clone()).await?;
Expand Down

0 comments on commit 8797164

Please sign in to comment.