Skip to content

Commit

Permalink
Merge pull request #3 from async-rs/limits
Browse files Browse the repository at this point in the history
Limits
  • Loading branch information
yoshuawuyts authored Mar 14, 2020
2 parents e53eed6 + a3fde16 commit 945b548
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 7 deletions.
11 changes: 11 additions & 0 deletions src/from_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pin_project! {
pub struct FromStream<S> {
#[pin]
stream: S,
limit: Option<usize>,
}
}

Expand All @@ -26,6 +27,7 @@ where
S: Send + Sync,
{
FromStream {
limit: None,
stream: stream.into_stream(),
}
}
Expand All @@ -40,4 +42,13 @@ where
let this = self.project();
this.stream.poll_next(cx)
}

fn limit(mut self, limit: impl Into<Option<usize>>) -> Self {
self.limit = limit.into();
self
}

fn get_limit(&self) -> Option<usize> {
self.limit
}
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,5 @@ pub use par_stream::{ForEach, Map, NextFuture, ParallelStream, Take};

pub mod prelude;
pub mod vec;

pub(crate) mod utils;
5 changes: 3 additions & 2 deletions src/par_stream/for_each.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pin_project_lite::pin_project! {

impl ForEach {
/// Create a new instance of `ForEach`.
pub fn new<S, F, Fut>(mut input: S, mut f: F) -> Self
pub fn new<S, F, Fut>(mut stream: S, mut f: F) -> Self
where
S: ParallelStream,
F: FnMut(S::Item) -> Fut + Send + Sync + Copy + 'static,
Expand All @@ -33,6 +33,7 @@ impl ForEach {
let exhausted = Arc::new(AtomicBool::new(false));
let ref_count = Arc::new(AtomicU64::new(0));
let (sender, receiver): (Sender<()>, Receiver<()>) = sync::channel(1);
let _limit = stream.get_limit();

// Initialize the return type here to prevent borrowing issues.
let this = Self {
Expand All @@ -42,7 +43,7 @@ impl ForEach {
};

task::spawn(async move {
while let Some(item) = input.next().await {
while let Some(item) = stream.next().await {
let sender = sender.clone();
let exhausted = exhausted.clone();
let ref_count = ref_count.clone();
Expand Down
13 changes: 12 additions & 1 deletion src/par_stream/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pin_project_lite::pin_project! {
pub struct Map<T> {
#[pin]
receiver: Receiver<T>,
limit: Option<usize>,
}
}

Expand All @@ -26,6 +27,7 @@ impl<T: Send + 'static> Map<T> {
Fut: Future<Output = T> + Send,
{
let (sender, receiver) = sync::channel(1);
let limit = stream.get_limit();
task::spawn(async move {
while let Some(item) = stream.next().await {
let sender = sender.clone();
Expand All @@ -35,7 +37,7 @@ impl<T: Send + 'static> Map<T> {
});
}
});
Map { receiver }
Map { receiver, limit }
}
}

Expand All @@ -46,6 +48,15 @@ impl<T: Send + 'static> ParallelStream for Map<T> {
let this = self.project();
this.receiver.poll_next(cx)
}

fn limit(mut self, limit: impl Into<Option<usize>>) -> Self {
self.limit = limit.into();
self
}

fn get_limit(&self) -> Option<usize> {
self.limit
}
}

#[async_std::test]
Expand Down
6 changes: 6 additions & 0 deletions src/par_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ pub trait ParallelStream: Sized + Send + Sync + Unpin + 'static {
/// Attempts to receive the next item from the stream.
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;

/// Set a max concurrency limit
fn limit(self, limit: impl Into<Option<usize>>) -> Self;

/// Get the max concurrency limit
fn get_limit(&self) -> Option<usize>;

/// Applies `f` to each item of this stream in parallel, producing a new
/// stream with the results.
fn map<F, T, Fut>(self, f: F) -> Map<T>
Expand Down
22 changes: 18 additions & 4 deletions src/par_stream/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@ pin_project! {
#[derive(Clone, Debug)]
pub struct Take<S> {
#[pin]
pub(crate) stream: S,
pub(crate) remaining: usize,
stream: S,
remaining: usize,
limit: Option<usize>,
}
}

impl<S> Take<S> {
impl<S: ParallelStream> Take<S> {
pub(super) fn new(stream: S, remaining: usize) -> Self {
Self { stream, remaining }
Self {
limit: stream.get_limit(),
remaining,
stream,
}
}
}

Expand All @@ -44,4 +49,13 @@ impl<S: ParallelStream> ParallelStream for Take<S> {
Poll::Ready(next)
}
}

fn limit(mut self, limit: impl Into<Option<usize>>) -> Self {
self.limit = limit.into();
self
}

fn get_limit(&self) -> Option<usize> {
self.limit
}
}
55 changes: 55 additions & 0 deletions src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// use core::pin::Pin;
// use core::task::{Context, Poll};

// use std::sync::atomic::{AtomicUsize, Ordering};
// use std::sync::Arc;

// use async_std::stream::Stream;

// /// A stream that has a max concurrency of N.
// pub(crate) struct LimitStream {
// limit: Option<usize>,
// ref_count: Arc<AtomicUsize>,
// }

// impl LimitStream {
// /// Create a new instance of LimitStream.
// pub(crate) fn new(limit: Option<usize>) -> Self {
// Self {
// limit,
// ref_count: Arc::new(AtomicUsize::new(0)),
// }
// }
// }

// #[derive(Debug)]
// pub(crate) struct Guard {
// limit: Option<usize>,
// ref_count: Arc<AtomicUsize>,
// }

// impl Guard {
// fn new(limit: Option<usize>, ref_count: Arc<AtomicUsize>) -> Self {
// Self { limit, ref_count }
// }
// }

// impl Drop for Guard {
// fn drop(&mut self) {
// if self.limit.is_some() {
// self.ref_count.fetch_sub(1, Ordering::SeqCst);
// }
// }
// }

// impl Stream for LimitStream {
// type Item = Guard;

// fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// if self.limit.is_none() {
// let guard = Guard::new(self.limit, self.ref_count.clone());
// return Poll::Ready(Some(guard));
// }
// todo!();
// }
// }
11 changes: 11 additions & 0 deletions src/vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pin_project_lite::pin_project! {
pub struct IntoParStream<T> {
#[pin]
stream: FromStream<FromIter<vec::IntoIter<T>>>,
limit: Option<usize>,
}
}

Expand All @@ -26,6 +27,15 @@ impl<T: Send + Sync + 'static> ParallelStream for IntoParStream<T> {
let this = self.project();
this.stream.poll_next(cx)
}

fn limit(mut self, limit: impl Into<Option<usize>>) -> Self {
self.limit = limit.into();
self
}

fn get_limit(&self) -> Option<usize> {
self.limit
}
}

impl<T: Send + Sync + 'static> IntoParallelStream for Vec<T> {
Expand All @@ -36,6 +46,7 @@ impl<T: Send + Sync + 'static> IntoParallelStream for Vec<T> {
fn into_par_stream(self) -> Self::IntoParStream {
IntoParStream {
stream: from_stream(from_iter(self)),
limit: None,
}
}
}
Expand Down

0 comments on commit 945b548

Please sign in to comment.