Skip to content

Commit

Permalink
feat(rust/driver/datafusion): install protoc, fix clippy errors
Browse files Browse the repository at this point in the history
  • Loading branch information
tokoko committed Oct 23, 2024
1 parent c48dcee commit 14a5cfc
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 49 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ jobs:
run: |
rustup toolchain install stable --no-self-update
rustup default stable
- name: Install Protoc
uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions/download-artifact@v4
with:
name: driver-manager-${{ matrix.os }}
Expand Down
91 changes: 45 additions & 46 deletions rust/drivers/datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

#![allow(refining_impl_trait)]

use datafusion::datasource::TableType;
use datafusion::prelude::*;
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
Expand Down Expand Up @@ -94,7 +96,7 @@ impl Iterator for DataFusionReader {
type Item = std::result::Result<RecordBatch, ArrowError>;

fn next(&mut self) -> Option<Self::Item> {
self.batches.next().map(|b| Ok(b))
self.batches.next().map(Ok)
}
}

Expand Down Expand Up @@ -144,26 +146,26 @@ impl Optionable for DataFusionDatabase {

fn set_option(
&mut self,
key: Self::Option,
value: adbc_core::options::OptionValue,
_key: Self::Option,
_value: adbc_core::options::OptionValue,
) -> adbc_core::error::Result<()> {
self.options.insert(key, value);
self.options.insert(_key, _value);
Ok(())
}

fn get_option_string(&self, key: Self::Option) -> adbc_core::error::Result<String> {
fn get_option_string(&self, _key: Self::Option) -> adbc_core::error::Result<String> {
todo!()
}

fn get_option_bytes(&self, key: Self::Option) -> adbc_core::error::Result<Vec<u8>> {
fn get_option_bytes(&self, _key: Self::Option) -> adbc_core::error::Result<Vec<u8>> {
todo!()
}

fn get_option_int(&self, key: Self::Option) -> adbc_core::error::Result<i64> {
fn get_option_int(&self, _key: Self::Option) -> adbc_core::error::Result<i64> {
todo!()
}

fn get_option_double(&self, key: Self::Option) -> adbc_core::error::Result<f64> {
fn get_option_double(&self, _key: Self::Option) -> adbc_core::error::Result<f64> {
todo!()
}
}
Expand All @@ -187,7 +189,7 @@ impl Database for DataFusionDatabase {

fn new_connection_with_opts(
&mut self,
opts: impl IntoIterator<
_opts: impl IntoIterator<
Item = (
adbc_core::options::OptionConnection,
adbc_core::options::OptionValue,
Expand Down Expand Up @@ -218,25 +220,25 @@ impl Optionable for DataFusionConnection {

fn set_option(
&mut self,
key: Self::Option,
value: adbc_core::options::OptionValue,
_key: Self::Option,
_value: adbc_core::options::OptionValue,
) -> adbc_core::error::Result<()> {
todo!()
}

fn get_option_string(&self, key: Self::Option) -> adbc_core::error::Result<String> {
fn get_option_string(&self, _key: Self::Option) -> adbc_core::error::Result<String> {
todo!()
}

fn get_option_bytes(&self, key: Self::Option) -> adbc_core::error::Result<Vec<u8>> {
fn get_option_bytes(&self, _key: Self::Option) -> adbc_core::error::Result<Vec<u8>> {
todo!()
}

fn get_option_int(&self, key: Self::Option) -> adbc_core::error::Result<i64> {
fn get_option_int(&self, _key: Self::Option) -> adbc_core::error::Result<i64> {
todo!()
}

fn get_option_double(&self, key: Self::Option) -> adbc_core::error::Result<f64> {
fn get_option_double(&self, _key: Self::Option) -> adbc_core::error::Result<f64> {
todo!()
}
}
Expand Down Expand Up @@ -370,7 +372,7 @@ impl GetObjectsBuilder {
self.catalog_names.append(&mut catalogs);

self.catalog_names.iter().for_each(|cat| {
let catalog_provider = ctx.catalog(&cat).unwrap();
let catalog_provider = ctx.catalog(cat).unwrap();
let schema_names = catalog_provider.schema_names();
self.catalog_db_schema_names
.append(&mut schema_names.clone());
Expand All @@ -379,15 +381,15 @@ impl GetObjectsBuilder {
.push(self.catalog_db_schema_offsets.last().unwrap() + schema_names.len() as i32);

schema_names.iter().for_each(|schema| {
let schema_provider = catalog_provider.schema(&schema).unwrap();
let schema_provider = catalog_provider.schema(schema).unwrap();
let table_names = schema_provider.table_names();
self.table_names.append(&mut table_names.clone());
self.table_offsets
.push(self.table_offsets.last().unwrap() + table_names.len() as i32);

table_names.iter().for_each(|t| {
runtime.block_on(async {
let table_provider = schema_provider.table(&t).await.unwrap().unwrap();
let table_provider = schema_provider.table(t).await.unwrap().unwrap();
let table_type = match table_provider.table_type() {
TableType::Base => "Base",
TableType::View => "View",
Expand Down Expand Up @@ -649,41 +651,41 @@ impl Connection for DataFusionConnection {
fn get_objects(
&self,
depth: adbc_core::options::ObjectDepth,
catalog: Option<&str>,
db_schema: Option<&str>,
table_name: Option<&str>,
table_type: Option<Vec<&str>>,
column_name: Option<&str>,
) -> adbc_core::error::Result<SingleBatchReader> {
_catalog: Option<&str>,
_db_schema: Option<&str>,
_table_name: Option<&str>,
_table_type: Option<Vec<&str>>,
_column_name: Option<&str>,
) -> Result<impl RecordBatchReader + Send> {
let batch = GetObjectsBuilder::new().build(&self.runtime, &self.ctx, &depth)?;
let reader = SingleBatchReader::new(batch);
Ok(reader)
}

fn get_table_schema(
&self,
catalog: Option<&str>,
db_schema: Option<&str>,
table_name: &str,
_catalog: Option<&str>,
_db_schema: Option<&str>,
_table_name: &str,
) -> adbc_core::error::Result<arrow_schema::Schema> {
todo!()
}

fn get_table_types(&self) -> adbc_core::error::Result<SingleBatchReader> {
fn get_table_types(&self) -> Result<SingleBatchReader> {
todo!()
}

fn get_statistic_names(&self) -> adbc_core::error::Result<SingleBatchReader> {
fn get_statistic_names(&self) -> Result<SingleBatchReader> {
todo!()
}

fn get_statistics(
&self,
catalog: Option<&str>,
db_schema: Option<&str>,
table_name: Option<&str>,
approximate: bool,
) -> adbc_core::error::Result<SingleBatchReader> {
_catalog: Option<&str>,
_db_schema: Option<&str>,
_table_name: Option<&str>,
_approximate: bool,
) -> Result<SingleBatchReader> {
todo!()
}

Expand All @@ -695,10 +697,7 @@ impl Connection for DataFusionConnection {
todo!()
}

fn read_partition(
&self,
partition: impl AsRef<[u8]>,
) -> adbc_core::error::Result<SingleBatchReader> {
fn read_partition(&self, _partition: impl AsRef<[u8]>) -> Result<SingleBatchReader> {
todo!()
}
}
Expand All @@ -715,37 +714,37 @@ impl Optionable for DataFusionStatement {

fn set_option(
&mut self,
key: Self::Option,
value: adbc_core::options::OptionValue,
_key: Self::Option,
_value: adbc_core::options::OptionValue,
) -> adbc_core::error::Result<()> {
todo!()
}

fn get_option_string(&self, key: Self::Option) -> adbc_core::error::Result<String> {
fn get_option_string(&self, _key: Self::Option) -> adbc_core::error::Result<String> {
todo!()
}

fn get_option_bytes(&self, key: Self::Option) -> adbc_core::error::Result<Vec<u8>> {
fn get_option_bytes(&self, _key: Self::Option) -> adbc_core::error::Result<Vec<u8>> {
todo!()
}

fn get_option_int(&self, key: Self::Option) -> adbc_core::error::Result<i64> {
fn get_option_int(&self, _key: Self::Option) -> adbc_core::error::Result<i64> {
todo!()
}

fn get_option_double(&self, key: Self::Option) -> adbc_core::error::Result<f64> {
fn get_option_double(&self, _key: Self::Option) -> adbc_core::error::Result<f64> {
todo!()
}
}

impl Statement for DataFusionStatement {
fn bind(&mut self, batch: arrow_array::RecordBatch) -> adbc_core::error::Result<()> {
fn bind(&mut self, _batch: arrow_array::RecordBatch) -> adbc_core::error::Result<()> {
todo!()
}

fn bind_stream(
&mut self,
reader: Box<dyn arrow_array::RecordBatchReader + Send>,
_reader: Box<dyn arrow_array::RecordBatchReader + Send>,
) -> adbc_core::error::Result<()> {
todo!()
}
Expand Down
6 changes: 3 additions & 3 deletions rust/drivers/datafusion/tests/test_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ fn get_objects(connection: &ManagedConnection) -> RecordBatch {

let batches: Vec<RecordBatch> = objects.unwrap().map(|b| b.unwrap()).collect();

let schema = batches.get(0).unwrap().schema();
let schema = batches.first().unwrap().schema();

concat_batches(&schema, &batches).unwrap()
}
Expand All @@ -68,7 +68,7 @@ fn execute_sql_query(connection: &mut ManagedConnection, query: &str) -> RecordB

let batches: Vec<RecordBatch> = statement.execute().unwrap().map(|b| b.unwrap()).collect();

let schema = batches.get(0).unwrap().schema();
let schema = batches.first().unwrap().schema();

concat_batches(&schema, &batches).unwrap()
}
Expand All @@ -80,7 +80,7 @@ fn execute_substrait(connection: &mut ManagedConnection, plan: Plan) -> RecordBa

let batches: Vec<RecordBatch> = statement.execute().unwrap().map(|b| b.unwrap()).collect();

let schema = batches.get(0).unwrap().schema();
let schema = batches.first().unwrap().schema();

concat_batches(&schema, &batches).unwrap()
}
Expand Down

0 comments on commit 14a5cfc

Please sign in to comment.