Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Compute pool for native executor #2986

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open

Conversation

colin-ho
Copy link
Contributor

@colin-ho colin-ho commented Oct 2, 2024

Create a multithreaded compute runtime for swordfish compute tasks. Switch query runtime to be single threaded, and use IO pool for scan task streams.

Additionally, adds in a tokio_select together with the tokio::signal::ctrlc and main async execution loop so that queries can be cancelled.

import os
import daft
import numpy
import time
import psutil

current_process = psutil.Process(os.getpid())

daft.set_execution_config(enable_native_executor=True, default_morsel_size=1)
dfs = [
    iter(
        daft.from_pydict({"a": numpy.random.rand(10)}).with_column(
            "plus_one", daft.col("a") + 1
        )
    )
    for _ in range(10)
]
while True:
    for i, df in enumerate(dfs):
        time.sleep(0.1)
        try:
            print("threads: ", current_process.num_threads())
            print(next(df))
        except StopIteration:
            dfs.pop(i)
    if not dfs:
        break

If you run this script you can see that the number of threads increases by only 1 per dataframe.

@github-actions github-actions bot added the enhancement New feature or request label Oct 2, 2024
Copy link

codspeed-hq bot commented Oct 2, 2024

CodSpeed Performance Report

Merging #2986 will not alter performance

Comparing colin/compute-pool (fa8e8bb) with main (73ff3f3)

Summary

✅ 17 untouched benchmarks

Copy link

codecov bot commented Oct 2, 2024

Codecov Report

Attention: Patch coverage is 77.70270% with 33 lines in your changes missing coverage. Please review.

Project coverage is 78.48%. Comparing base (73ff3f3) to head (fa8e8bb).

Files with missing lines Patch % Lines
src/daft-local-execution/src/run.rs 58.62% 12 Missing ⚠️
src/daft-io/src/lib.rs 70.96% 9 Missing ⚠️
src/daft-local-execution/src/lib.rs 84.21% 9 Missing ⚠️
...-execution/src/intermediate_ops/intermediate_op.rs 66.66% 3 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #2986      +/-   ##
==========================================
+ Coverage   78.14%   78.48%   +0.33%     
==========================================
  Files         610      610              
  Lines       72146    71837     -309     
==========================================
- Hits        56381    56379       -2     
+ Misses      15765    15458     -307     
Files with missing lines Coverage Δ
...-local-execution/src/intermediate_ops/aggregate.rs 100.00% <ø> (ø)
.../src/intermediate_ops/anti_semi_hash_join_probe.rs 96.36% <100.00%> (ø)
...aft-local-execution/src/intermediate_ops/filter.rs 100.00% <ø> (ø)
...-execution/src/intermediate_ops/hash_join_probe.rs 97.41% <100.00%> (ø)
...ft-local-execution/src/intermediate_ops/project.rs 100.00% <ø> (ø)
...rc/daft-local-execution/src/sinks/blocking_sink.rs 74.39% <100.00%> (+3.96%) ⬆️
src/daft-local-execution/src/sources/scan_task.rs 81.95% <ø> (ø)
...-execution/src/intermediate_ops/intermediate_op.rs 77.44% <66.66%> (-1.30%) ⬇️
src/daft-io/src/lib.rs 77.96% <70.96%> (+1.60%) ⬆️
src/daft-local-execution/src/lib.rs 86.45% <84.21%> (-3.29%) ⬇️
... and 1 more

... and 12 files with indirect coverage changes

@colin-ho colin-ho marked this pull request as ready for review October 2, 2024 21:32
@@ -28,11 +35,11 @@ pub trait IntermediateOperator: Send + Sync {
&self,
idx: usize,
input: &PipelineResultType,
state: Option<&mut Box<dyn IntermediateOperatorState>>,
state: &mut dyn IntermediateOperatorState,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't like the Option<&mut Box<dyn IntermediateOperatorState>> parameter, instead changing it to just &mut dyn IntermediateOperatorState

static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
format!("Executor-Worker-{}", id)
})
Copy link
Contributor Author

@colin-ho colin-ho Oct 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Todo: look into thread priority: https://docs.rs/thread-priority/latest/thread_priority/, we could potentially set priorities for compute and io pool.

Would do it on https://docs.rs/tokio/latest/tokio/runtime/struct.Builder.html#method.on_thread_start

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant