From db896c029ed95eafaea6b3f5ae3e0629df639ad7 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 14 Mar 2020 20:02:43 +0100 Subject: [PATCH 1/2] Init get/set limit methods --- src/from_stream.rs | 11 +++++++++++ src/par_stream/for_each.rs | 7 +++++-- src/par_stream/map.rs | 13 ++++++++++++- src/par_stream/mod.rs | 6 ++++++ src/par_stream/take.rs | 22 ++++++++++++++++++---- src/vec.rs | 11 +++++++++++ 6 files changed, 63 insertions(+), 7 deletions(-) diff --git a/src/from_stream.rs b/src/from_stream.rs index ef660cb..b06ea58 100644 --- a/src/from_stream.rs +++ b/src/from_stream.rs @@ -17,6 +17,7 @@ pin_project! { pub struct FromStream { #[pin] stream: S, + limit: Option, } } @@ -26,6 +27,7 @@ where S: Send + Sync, { FromStream { + limit: None, stream: stream.into_stream(), } } @@ -40,4 +42,13 @@ where let this = self.project(); this.stream.poll_next(cx) } + + fn set_limit(mut self, limit: impl Into>) -> Self { + self.limit = limit.into(); + self + } + + fn limit(&self) -> Option { + self.limit + } } diff --git a/src/par_stream/for_each.rs b/src/par_stream/for_each.rs index 321686c..2e9e9bd 100644 --- a/src/par_stream/for_each.rs +++ b/src/par_stream/for_each.rs @@ -19,12 +19,14 @@ pin_project_lite::pin_project! { exhausted: Arc, // Count how many tasks are executing. ref_count: Arc, + // Max concurrency limit. + limit: Option, } } impl ForEach { /// Create a new instance of `ForEach`. - pub fn new(mut input: S, mut f: F) -> Self + pub fn new(mut stream: S, mut f: F) -> Self where S: ParallelStream, F: FnMut(S::Item) -> Fut + Send + Sync + Copy + 'static, @@ -39,10 +41,11 @@ impl ForEach { receiver, exhausted: exhausted.clone(), ref_count: ref_count.clone(), + limit: stream.limit(), }; task::spawn(async move { - while let Some(item) = input.next().await { + while let Some(item) = stream.next().await { let sender = sender.clone(); let exhausted = exhausted.clone(); let ref_count = ref_count.clone(); diff --git a/src/par_stream/map.rs b/src/par_stream/map.rs index 1faf249..29e5058 100644 --- a/src/par_stream/map.rs +++ b/src/par_stream/map.rs @@ -14,6 +14,7 @@ pin_project_lite::pin_project! { pub struct Map { #[pin] receiver: Receiver, + limit: Option, } } @@ -26,6 +27,7 @@ impl Map { Fut: Future + Send, { let (sender, receiver) = sync::channel(1); + let limit = stream.limit(); task::spawn(async move { while let Some(item) = stream.next().await { let sender = sender.clone(); @@ -35,7 +37,7 @@ impl Map { }); } }); - Map { receiver } + Map { receiver, limit } } } @@ -46,6 +48,15 @@ impl ParallelStream for Map { let this = self.project(); this.receiver.poll_next(cx) } + + fn set_limit(mut self, limit: impl Into>) -> Self { + self.limit = limit.into(); + self + } + + fn limit(&self) -> Option { + self.limit + } } #[async_std::test] diff --git a/src/par_stream/mod.rs b/src/par_stream/mod.rs index 6e7fca6..92e6c24 100644 --- a/src/par_stream/mod.rs +++ b/src/par_stream/mod.rs @@ -21,6 +21,12 @@ pub trait ParallelStream: Sized + Send + Sync + Unpin + 'static { /// Attempts to receive the next item from the stream. fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; + /// Set a max concurrency limit + fn set_limit(self, limit: impl Into>) -> Self; + + /// Get the max concurrency limit + fn limit(&self) -> Option; + /// Applies `f` to each item of this stream in parallel, producing a new /// stream with the results. fn map(self, f: F) -> Map diff --git a/src/par_stream/take.rs b/src/par_stream/take.rs index 75d6f07..ba9010b 100644 --- a/src/par_stream/take.rs +++ b/src/par_stream/take.rs @@ -17,14 +17,19 @@ pin_project! { #[derive(Clone, Debug)] pub struct Take { #[pin] - pub(crate) stream: S, - pub(crate) remaining: usize, + stream: S, + remaining: usize, + limit: Option, } } -impl Take { +impl Take { pub(super) fn new(stream: S, remaining: usize) -> Self { - Self { stream, remaining } + Self { + limit: stream.limit(), + remaining, + stream, + } } } @@ -44,4 +49,13 @@ impl ParallelStream for Take { Poll::Ready(next) } } + + fn set_limit(mut self, limit: impl Into>) -> Self { + self.limit = limit.into(); + self + } + + fn limit(&self) -> Option { + self.limit + } } diff --git a/src/vec.rs b/src/vec.rs index 6e3c829..7fcf1d0 100644 --- a/src/vec.rs +++ b/src/vec.rs @@ -17,6 +17,7 @@ pin_project_lite::pin_project! { pub struct IntoParStream { #[pin] stream: FromStream>>, + limit: Option, } } @@ -26,6 +27,15 @@ impl ParallelStream for IntoParStream { let this = self.project(); this.stream.poll_next(cx) } + + fn set_limit(mut self, limit: impl Into>) -> Self { + self.limit = limit.into(); + self + } + + fn limit(&self) -> Option { + self.limit + } } impl IntoParallelStream for Vec { @@ -36,6 +46,7 @@ impl IntoParallelStream for Vec { fn into_par_stream(self) -> Self::IntoParStream { IntoParStream { stream: from_stream(from_iter(self)), + limit: None, } } } From a3fde164cf9f1a56375371a13f7f372b48df50d1 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 14 Mar 2020 20:46:57 +0100 Subject: [PATCH 2/2] init limit functionality Updates the traits to support limiting --- src/from_stream.rs | 4 +-- src/lib.rs | 2 ++ src/par_stream/for_each.rs | 4 +-- src/par_stream/map.rs | 6 ++--- src/par_stream/mod.rs | 4 +-- src/par_stream/take.rs | 6 ++--- src/utils.rs | 55 ++++++++++++++++++++++++++++++++++++++ src/vec.rs | 4 +-- 8 files changed, 70 insertions(+), 15 deletions(-) create mode 100644 src/utils.rs diff --git a/src/from_stream.rs b/src/from_stream.rs index b06ea58..275317c 100644 --- a/src/from_stream.rs +++ b/src/from_stream.rs @@ -43,12 +43,12 @@ where this.stream.poll_next(cx) } - fn set_limit(mut self, limit: impl Into>) -> Self { + fn limit(mut self, limit: impl Into>) -> Self { self.limit = limit.into(); self } - fn limit(&self) -> Option { + fn get_limit(&self) -> Option { self.limit } } diff --git a/src/lib.rs b/src/lib.rs index aa5ce60..af93ecd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -56,3 +56,5 @@ pub use par_stream::{ForEach, Map, NextFuture, ParallelStream, Take}; pub mod prelude; pub mod vec; + +pub(crate) mod utils; diff --git a/src/par_stream/for_each.rs b/src/par_stream/for_each.rs index 2e9e9bd..e971db1 100644 --- a/src/par_stream/for_each.rs +++ b/src/par_stream/for_each.rs @@ -19,8 +19,6 @@ pin_project_lite::pin_project! { exhausted: Arc, // Count how many tasks are executing. ref_count: Arc, - // Max concurrency limit. - limit: Option, } } @@ -35,13 +33,13 @@ impl ForEach { let exhausted = Arc::new(AtomicBool::new(false)); let ref_count = Arc::new(AtomicU64::new(0)); let (sender, receiver): (Sender<()>, Receiver<()>) = sync::channel(1); + let _limit = stream.get_limit(); // Initialize the return type here to prevent borrowing issues. let this = Self { receiver, exhausted: exhausted.clone(), ref_count: ref_count.clone(), - limit: stream.limit(), }; task::spawn(async move { diff --git a/src/par_stream/map.rs b/src/par_stream/map.rs index 29e5058..e10ca00 100644 --- a/src/par_stream/map.rs +++ b/src/par_stream/map.rs @@ -27,7 +27,7 @@ impl Map { Fut: Future + Send, { let (sender, receiver) = sync::channel(1); - let limit = stream.limit(); + let limit = stream.get_limit(); task::spawn(async move { while let Some(item) = stream.next().await { let sender = sender.clone(); @@ -49,12 +49,12 @@ impl ParallelStream for Map { this.receiver.poll_next(cx) } - fn set_limit(mut self, limit: impl Into>) -> Self { + fn limit(mut self, limit: impl Into>) -> Self { self.limit = limit.into(); self } - fn limit(&self) -> Option { + fn get_limit(&self) -> Option { self.limit } } diff --git a/src/par_stream/mod.rs b/src/par_stream/mod.rs index 92e6c24..41aa782 100644 --- a/src/par_stream/mod.rs +++ b/src/par_stream/mod.rs @@ -22,10 +22,10 @@ pub trait ParallelStream: Sized + Send + Sync + Unpin + 'static { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; /// Set a max concurrency limit - fn set_limit(self, limit: impl Into>) -> Self; + fn limit(self, limit: impl Into>) -> Self; /// Get the max concurrency limit - fn limit(&self) -> Option; + fn get_limit(&self) -> Option; /// Applies `f` to each item of this stream in parallel, producing a new /// stream with the results. diff --git a/src/par_stream/take.rs b/src/par_stream/take.rs index ba9010b..b34dbb8 100644 --- a/src/par_stream/take.rs +++ b/src/par_stream/take.rs @@ -26,7 +26,7 @@ pin_project! { impl Take { pub(super) fn new(stream: S, remaining: usize) -> Self { Self { - limit: stream.limit(), + limit: stream.get_limit(), remaining, stream, } @@ -50,12 +50,12 @@ impl ParallelStream for Take { } } - fn set_limit(mut self, limit: impl Into>) -> Self { + fn limit(mut self, limit: impl Into>) -> Self { self.limit = limit.into(); self } - fn limit(&self) -> Option { + fn get_limit(&self) -> Option { self.limit } } diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 0000000..25b0f50 --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,55 @@ +// use core::pin::Pin; +// use core::task::{Context, Poll}; + +// use std::sync::atomic::{AtomicUsize, Ordering}; +// use std::sync::Arc; + +// use async_std::stream::Stream; + +// /// A stream that has a max concurrency of N. +// pub(crate) struct LimitStream { +// limit: Option, +// ref_count: Arc, +// } + +// impl LimitStream { +// /// Create a new instance of LimitStream. +// pub(crate) fn new(limit: Option) -> Self { +// Self { +// limit, +// ref_count: Arc::new(AtomicUsize::new(0)), +// } +// } +// } + +// #[derive(Debug)] +// pub(crate) struct Guard { +// limit: Option, +// ref_count: Arc, +// } + +// impl Guard { +// fn new(limit: Option, ref_count: Arc) -> Self { +// Self { limit, ref_count } +// } +// } + +// impl Drop for Guard { +// fn drop(&mut self) { +// if self.limit.is_some() { +// self.ref_count.fetch_sub(1, Ordering::SeqCst); +// } +// } +// } + +// impl Stream for LimitStream { +// type Item = Guard; + +// fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { +// if self.limit.is_none() { +// let guard = Guard::new(self.limit, self.ref_count.clone()); +// return Poll::Ready(Some(guard)); +// } +// todo!(); +// } +// } diff --git a/src/vec.rs b/src/vec.rs index 7fcf1d0..b5ec202 100644 --- a/src/vec.rs +++ b/src/vec.rs @@ -28,12 +28,12 @@ impl ParallelStream for IntoParStream { this.stream.poll_next(cx) } - fn set_limit(mut self, limit: impl Into>) -> Self { + fn limit(mut self, limit: impl Into>) -> Self { self.limit = limit.into(); self } - fn limit(&self) -> Option { + fn get_limit(&self) -> Option { self.limit } }