Skip to content

Commit

Permalink
[CHORE] Enable read_sql for swordfish (#2918)
Browse files Browse the repository at this point in the history
Enables read_sql tests for swordfish.

Additionally adds an `EmptyScan` operator. This is needed because some
sql tests produce 0 scan tasks (some other IO tests that are skipped
also need empty scan, so this PR will enable those to pass as well, I
will unskip them in future PRs).

_Technically_, we don't need a dedicated empty scan operator for this,
but it might be useful to be explicit. (For what it's worth the current
runner does have an [empty scan
operator](https://github.com/Eventual-Inc/Daft/blob/3f37a69e37211d5aa17f416096cb4eb9a22e3b7a/daft/execution/rust_physical_plan_shim.py#L53)
as well)

---------

Co-authored-by: Colin Ho <[email protected]>
Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
3 people authored Oct 8, 2024
1 parent 64b8699 commit 73ff3f3
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 14 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,10 @@ jobs:
matrix:
python-version: ['3.8'] # can't use 3.7 due to requiring anon mode for adlfs
daft-runner: [py, ray]
enable-native-executor: [0, 1]
exclude:
- daft-runner: ray
enable-native-executor: 1
steps:
- uses: actions/checkout@v4
with:
Expand Down Expand Up @@ -583,6 +587,7 @@ jobs:
pytest tests/integration/sql -m 'integration or not integration' --durations=50
env:
DAFT_RUNNER: ${{ matrix.daft-runner }}
DAFT_ENABLE_NATIVE_EXECUTOR: ${{ matrix.enable-native-executor }}
- name: Send Slack notification on failure
uses: slackapi/[email protected]
if: ${{ failure() && (github.ref == 'refs/heads/main') }}
Expand Down
10 changes: 7 additions & 3 deletions src/daft-local-execution/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use daft_core::{
use daft_dsl::{col, join::get_common_join_keys, Expr};
use daft_micropartition::MicroPartition;
use daft_physical_plan::{
Filter, HashAggregate, HashJoin, InMemoryScan, Limit, LocalPhysicalPlan, Project, Sort,
UnGroupedAggregate,
EmptyScan, Filter, HashAggregate, HashJoin, InMemoryScan, Limit, LocalPhysicalPlan, Project,
Sort, UnGroupedAggregate,
};
use daft_plan::{populate_aggregation_stages, JoinType};
use daft_table::{Probeable, Table};
Expand All @@ -30,7 +30,7 @@ use crate::{
hash_join_build::HashJoinBuildSink, limit::LimitSink, sort::SortSink,
streaming_sink::StreamingSinkNode,
},
sources::in_memory::InMemorySource,
sources::{empty_scan::EmptyScanSource, in_memory::InMemorySource},
ExecutionRuntimeHandle, PipelineCreationSnafu,
};

Expand Down Expand Up @@ -104,6 +104,10 @@ pub fn physical_plan_to_pipeline(

use crate::sources::scan_task::ScanTaskSource;
let out: Box<dyn PipelineNode> = match physical_plan {
LocalPhysicalPlan::EmptyScan(EmptyScan { schema, .. }) => {
let source = EmptyScanSource::new(schema.clone());
source.boxed().into()
}
LocalPhysicalPlan::PhysicalScan(PhysicalScan { scan_tasks, .. }) => {
let scan_task_source = ScanTaskSource::new(scan_tasks.clone());
scan_task_source.boxed().into()
Expand Down
38 changes: 38 additions & 0 deletions src/daft-local-execution/src/sources/empty_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use std::sync::Arc;

use daft_core::prelude::SchemaRef;
use daft_io::IOStatsRef;
use daft_micropartition::MicroPartition;
use tracing::instrument;

use super::source::Source;
use crate::{sources::source::SourceStream, ExecutionRuntimeHandle};

pub struct EmptyScanSource {
schema: SchemaRef,
}

impl EmptyScanSource {
pub fn new(schema: SchemaRef) -> Self {
Self { schema }
}
pub fn boxed(self) -> Box<dyn Source> {
Box::new(self) as Box<dyn Source>
}
}

impl Source for EmptyScanSource {
#[instrument(name = "EmptyScanSource::get_data", level = "info", skip_all)]
fn get_data(
&self,
_maintain_order: bool,
_runtime_handle: &mut ExecutionRuntimeHandle,
_io_stats: IOStatsRef,
) -> crate::Result<SourceStream<'static>> {
let empty = Arc::new(MicroPartition::empty(Some(self.schema.clone())));
Ok(Box::pin(futures::stream::once(async { empty })))
}
fn name(&self) -> &'static str {
"EmptyScanSource"
}
}
1 change: 1 addition & 0 deletions src/daft-local-execution/src/sources/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod empty_scan;
pub mod in_memory;
pub mod scan_task;
pub mod source;
33 changes: 29 additions & 4 deletions src/daft-local-execution/src/sources/scan_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use daft_micropartition::MicroPartition;
use daft_parquet::read::ParquetSchemaInferenceOptions;
use daft_scan::{storage_config::StorageConfig, ChunkSpec, ScanTask};
use futures::{Stream, StreamExt};
use snafu::ResultExt;
use tokio_stream::wrappers::ReceiverStream;
use tracing::instrument;

Expand Down Expand Up @@ -261,10 +262,34 @@ async fn stream_scan_task(
.await?
}
#[cfg(feature = "python")]
FileFormatConfig::Database(_) => {
return Err(common_error::DaftError::TypeError(
"Database file format not implemented".to_string(),
));
FileFormatConfig::Database(common_file_formats::DatabaseSourceConfig { sql, conn }) => {
use pyo3::Python;

use crate::PyIOSnafu;
let predicate = scan_task
.pushdowns
.filters
.as_ref()
.map(|p| (*p.as_ref()).clone().into());
let table = Python::with_gil(|py| {
daft_micropartition::python::read_sql_into_py_table(
py,
sql,
conn,
predicate.clone(),
scan_task.schema.clone().into(),
scan_task
.pushdowns
.columns
.as_ref()
.map(|cols| cols.as_ref().clone()),
scan_task.pushdowns.limit,
)
.map(|t| t.into())
.context(PyIOSnafu)
})?;
// SQL Scan cannot be streamed at the moment, so we just return the table
Box::pin(futures::stream::once(async { Ok(table) }))
}
#[cfg(feature = "python")]
FileFormatConfig::PythonFunction => {
Expand Down
2 changes: 1 addition & 1 deletion src/daft-micropartition/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ pub fn read_parquet_into_py_table(
.extract()
}

pub(crate) fn read_sql_into_py_table(
pub fn read_sql_into_py_table(
py: Python,
sql: &str,
conn: &PyObject,
Expand Down
2 changes: 1 addition & 1 deletion src/daft-physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod local_plan;
mod translate;

pub use local_plan::{
Concat, Filter, HashAggregate, HashJoin, InMemoryScan, Limit, LocalPhysicalPlan,
Concat, EmptyScan, Filter, HashAggregate, HashJoin, InMemoryScan, Limit, LocalPhysicalPlan,
LocalPhysicalPlanRef, PhysicalScan, PhysicalWrite, Project, Sort, UnGroupedAggregate,
};
pub use translate::translate;
17 changes: 16 additions & 1 deletion src/daft-physical-plan/src/local_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub type LocalPhysicalPlanRef = Arc<LocalPhysicalPlan>;
pub enum LocalPhysicalPlan {
InMemoryScan(InMemoryScan),
PhysicalScan(PhysicalScan),
// EmptyScan(EmptyScan),
EmptyScan(EmptyScan),
Project(Project),
Filter(Filter),
Limit(Limit),
Expand Down Expand Up @@ -77,6 +77,14 @@ impl LocalPhysicalPlan {
.arced()
}

pub(crate) fn empty_scan(schema: SchemaRef) -> LocalPhysicalPlanRef {
Self::EmptyScan(EmptyScan {
schema,
plan_stats: PlanStats {},
})
.arced()
}

pub(crate) fn filter(input: LocalPhysicalPlanRef, predicate: ExprRef) -> LocalPhysicalPlanRef {
let schema = input.schema().clone();
Self::Filter(Filter {
Expand Down Expand Up @@ -196,6 +204,7 @@ impl LocalPhysicalPlan {
pub fn schema(&self) -> &SchemaRef {
match self {
Self::PhysicalScan(PhysicalScan { schema, .. })
| Self::EmptyScan(EmptyScan { schema, .. })
| Self::Filter(Filter { schema, .. })
| Self::Limit(Limit { schema, .. })
| Self::Project(Project { schema, .. })
Expand Down Expand Up @@ -223,6 +232,12 @@ pub struct PhysicalScan {
pub plan_stats: PlanStats,
}

#[derive(Debug)]
pub struct EmptyScan {
pub schema: SchemaRef,
pub plan_stats: PlanStats,
}

#[derive(Debug)]
pub struct Project {
pub input: LocalPhysicalPlanRef,
Expand Down
12 changes: 8 additions & 4 deletions src/daft-physical-plan/src/translate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@ pub fn translate(plan: &LogicalPlanRef) -> DaftResult<LocalPhysicalPlanRef> {
// We should be able to pass the ScanOperator into the physical plan directly but we need to figure out the serialization story
let scan_tasks_iter = info.scan_op.0.to_scan_tasks(info.pushdowns.clone())?;
let scan_tasks = scan_tasks_iter.collect::<DaftResult<Vec<_>>>()?;
Ok(LocalPhysicalPlan::physical_scan(
scan_tasks,
source.output_schema.clone(),
))
if scan_tasks.is_empty() {
Ok(LocalPhysicalPlan::empty_scan(source.output_schema.clone()))
} else {
Ok(LocalPhysicalPlan::physical_scan(
scan_tasks,
source.output_schema.clone(),
))
}
}
SourceInfo::PlaceHolder(_) => {
panic!("We should not encounter a PlaceHolder during translation")
Expand Down

0 comments on commit 73ff3f3

Please sign in to comment.