Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limits #3

Merged
merged 2 commits into from
Mar 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/from_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pin_project! {
pub struct FromStream<S> {
#[pin]
stream: S,
limit: Option<usize>,
}
}

Expand All @@ -26,6 +27,7 @@ where
S: Send + Sync,
{
FromStream {
limit: None,
stream: stream.into_stream(),
}
}
Expand All @@ -40,4 +42,13 @@ where
let this = self.project();
this.stream.poll_next(cx)
}

fn limit(mut self, limit: impl Into<Option<usize>>) -> Self {
self.limit = limit.into();
self
}

fn get_limit(&self) -> Option<usize> {
self.limit
}
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,5 @@ pub use par_stream::{ForEach, Map, NextFuture, ParallelStream, Take};

pub mod prelude;
pub mod vec;

pub(crate) mod utils;
5 changes: 3 additions & 2 deletions src/par_stream/for_each.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pin_project_lite::pin_project! {

impl ForEach {
/// Create a new instance of `ForEach`.
pub fn new<S, F, Fut>(mut input: S, mut f: F) -> Self
pub fn new<S, F, Fut>(mut stream: S, mut f: F) -> Self
where
S: ParallelStream,
F: FnMut(S::Item) -> Fut + Send + Sync + Copy + 'static,
Expand All @@ -33,6 +33,7 @@ impl ForEach {
let exhausted = Arc::new(AtomicBool::new(false));
let ref_count = Arc::new(AtomicU64::new(0));
let (sender, receiver): (Sender<()>, Receiver<()>) = sync::channel(1);
let _limit = stream.get_limit();

// Initialize the return type here to prevent borrowing issues.
let this = Self {
Expand All @@ -42,7 +43,7 @@ impl ForEach {
};

task::spawn(async move {
while let Some(item) = input.next().await {
while let Some(item) = stream.next().await {
let sender = sender.clone();
let exhausted = exhausted.clone();
let ref_count = ref_count.clone();
Expand Down
13 changes: 12 additions & 1 deletion src/par_stream/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pin_project_lite::pin_project! {
pub struct Map<T> {
#[pin]
receiver: Receiver<T>,
limit: Option<usize>,
}
}

Expand All @@ -26,6 +27,7 @@ impl<T: Send + 'static> Map<T> {
Fut: Future<Output = T> + Send,
{
let (sender, receiver) = sync::channel(1);
let limit = stream.get_limit();
task::spawn(async move {
while let Some(item) = stream.next().await {
let sender = sender.clone();
Expand All @@ -35,7 +37,7 @@ impl<T: Send + 'static> Map<T> {
});
}
});
Map { receiver }
Map { receiver, limit }
}
}

Expand All @@ -46,6 +48,15 @@ impl<T: Send + 'static> ParallelStream for Map<T> {
let this = self.project();
this.receiver.poll_next(cx)
}

fn limit(mut self, limit: impl Into<Option<usize>>) -> Self {
self.limit = limit.into();
self
}

fn get_limit(&self) -> Option<usize> {
self.limit
}
}

#[async_std::test]
Expand Down
6 changes: 6 additions & 0 deletions src/par_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ pub trait ParallelStream: Sized + Send + Sync + Unpin + 'static {
/// Attempts to receive the next item from the stream.
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;

/// Set a max concurrency limit
fn limit(self, limit: impl Into<Option<usize>>) -> Self;

/// Get the max concurrency limit
fn get_limit(&self) -> Option<usize>;

/// Applies `f` to each item of this stream in parallel, producing a new
/// stream with the results.
fn map<F, T, Fut>(self, f: F) -> Map<T>
Expand Down
22 changes: 18 additions & 4 deletions src/par_stream/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@ pin_project! {
#[derive(Clone, Debug)]
pub struct Take<S> {
#[pin]
pub(crate) stream: S,
pub(crate) remaining: usize,
stream: S,
remaining: usize,
limit: Option<usize>,
}
}

impl<S> Take<S> {
impl<S: ParallelStream> Take<S> {
pub(super) fn new(stream: S, remaining: usize) -> Self {
Self { stream, remaining }
Self {
limit: stream.get_limit(),
remaining,
stream,
}
}
}

Expand All @@ -44,4 +49,13 @@ impl<S: ParallelStream> ParallelStream for Take<S> {
Poll::Ready(next)
}
}

fn limit(mut self, limit: impl Into<Option<usize>>) -> Self {
self.limit = limit.into();
self
}

fn get_limit(&self) -> Option<usize> {
self.limit
}
}
55 changes: 55 additions & 0 deletions src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// use core::pin::Pin;
// use core::task::{Context, Poll};

// use std::sync::atomic::{AtomicUsize, Ordering};
// use std::sync::Arc;

// use async_std::stream::Stream;

// /// A stream that has a max concurrency of N.
// pub(crate) struct LimitStream {
// limit: Option<usize>,
// ref_count: Arc<AtomicUsize>,
// }

// impl LimitStream {
// /// Create a new instance of LimitStream.
// pub(crate) fn new(limit: Option<usize>) -> Self {
// Self {
// limit,
// ref_count: Arc::new(AtomicUsize::new(0)),
// }
// }
// }

// #[derive(Debug)]
// pub(crate) struct Guard {
// limit: Option<usize>,
// ref_count: Arc<AtomicUsize>,
// }

// impl Guard {
// fn new(limit: Option<usize>, ref_count: Arc<AtomicUsize>) -> Self {
// Self { limit, ref_count }
// }
// }

// impl Drop for Guard {
// fn drop(&mut self) {
// if self.limit.is_some() {
// self.ref_count.fetch_sub(1, Ordering::SeqCst);
// }
// }
// }

// impl Stream for LimitStream {
// type Item = Guard;

// fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// if self.limit.is_none() {
// let guard = Guard::new(self.limit, self.ref_count.clone());
// return Poll::Ready(Some(guard));
// }
// todo!();
// }
// }
11 changes: 11 additions & 0 deletions src/vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pin_project_lite::pin_project! {
pub struct IntoParStream<T> {
#[pin]
stream: FromStream<FromIter<vec::IntoIter<T>>>,
limit: Option<usize>,
}
}

Expand All @@ -26,6 +27,15 @@ impl<T: Send + Sync + 'static> ParallelStream for IntoParStream<T> {
let this = self.project();
this.stream.poll_next(cx)
}

fn limit(mut self, limit: impl Into<Option<usize>>) -> Self {
self.limit = limit.into();
self
}

fn get_limit(&self) -> Option<usize> {
self.limit
}
}

impl<T: Send + Sync + 'static> IntoParallelStream for Vec<T> {
Expand All @@ -36,6 +46,7 @@ impl<T: Send + Sync + 'static> IntoParallelStream for Vec<T> {
fn into_par_stream(self) -> Self::IntoParStream {
IntoParStream {
stream: from_stream(from_iter(self)),
limit: None,
}
}
}
Expand Down