diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9d1be25f68..edf4396c8e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -30,42 +30,36 @@ concurrency: jobs: test: - name: cargo test (${{ matrix.os }}) + name: cargo test strategy: fail-fast: false matrix: - os: - - ubuntu-latest - - macos-latest - - windows-latest + include: + - os: ubuntu-latest + - os: macos-latest + - os: windows-latest + - os: ubuntu-latest + target: aarch64-unknown-linux-gnu + - os: ubuntu-latest + target: armv7-unknown-linux-gnueabihf + - os: ubuntu-latest + target: armv5te-unknown-linux-gnueabi + - os: ubuntu-latest + target: i686-unknown-linux-gnu runs-on: ${{ matrix.os }} steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Install Rust # --no-self-update is necessary because the windows environment cannot self-update rustup.exe. run: rustup update nightly --no-self-update && rustup default nightly - - run: cargo test --workspace --all-features - - run: cargo test --workspace --all-features --release - - cross: - name: cargo test --target ${{ matrix.target }} - strategy: - fail-fast: false - matrix: - target: - - aarch64-unknown-linux-gnu - - armv7-unknown-linux-gnueabihf - - i686-unknown-linux-gnu - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - name: Install Rust - run: rustup update nightly && rustup default nightly - uses: taiki-e/setup-cross-toolchain-action@v1 with: target: ${{ matrix.target }} - - run: cargo test --target ${{ matrix.target }} --workspace --all-features $DOCTEST_XCOMPILE - - run: cargo test --target ${{ matrix.target }} --workspace --all-features --release $DOCTEST_XCOMPILE + if: matrix.target != '' + - run: cargo test --workspace --all-features $DOCTEST_XCOMPILE + - run: cargo test --workspace --all-features --release $DOCTEST_XCOMPILE + # TODO: https://github.com/rust-lang/futures-rs/issues/2451 + if: matrix.target != 'aarch64-unknown-linux-gnu' core-msrv: name: cargo +${{ matrix.rust }} build (futures-{core, io, sink}) @@ -77,7 +71,7 @@ jobs: - '1.36' runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Install Rust run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }} # cargo does not support for --features/--no-default-features with workspace, so use cargo-hack instead. @@ -111,7 +105,7 @@ jobs: - '1.56' runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Install Rust run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }} - name: Install cargo-hack @@ -127,7 +121,8 @@ jobs: # Check std feature - run: cargo hack build --workspace --ignore-private --no-default-features --features std --ignore-unknown-features # Check compat feature (futures, futures-util) - - run: cargo hack build -p futures -p futures-util --no-default-features --features std,io-compat + # Exclude io-compat feature because the MSRV when it is enabled depends on the MSRV of tokio 0.1. + - run: cargo hack build -p futures -p futures-util --no-default-features --features std,compat # Check thread-pool feature (futures, futures-executor) - run: cargo hack build -p futures -p futures-executor --no-default-features --features std,thread-pool @@ -142,7 +137,7 @@ jobs: - nightly runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Install Rust run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }} - name: Install cargo-hack @@ -151,18 +146,17 @@ jobs: - run: cargo build --tests --features default,thread-pool,io-compat --manifest-path futures/Cargo.toml minimal-versions: - name: cargo build -Z minimal-versions + name: cargo minimal-versions build runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Install Rust run: rustup update nightly && rustup default nightly - name: Install cargo-hack uses: taiki-e/install-action@cargo-hack - # remove dev-dependencies to avoid https://github.com/rust-lang/cargo/issues/4866 - - run: cargo hack --remove-dev-deps --workspace - - run: cargo update -Z minimal-versions - - run: cargo build --workspace --all-features + - name: Install cargo-minimal-versions + uses: taiki-e/install-action@cargo-minimal-versions + - run: cargo minimal-versions build --workspace --ignore-private --all-features no-std: name: cargo build --target ${{ matrix.target }} @@ -176,7 +170,7 @@ jobs: - thumbv6m-none-eabi runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Install Rust run: rustup update nightly && rustup default nightly - run: rustup target add ${{ matrix.target }} @@ -208,7 +202,7 @@ jobs: name: cargo bench runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Install Rust run: rustup update nightly && rustup default nightly - run: cargo bench --workspace @@ -218,7 +212,7 @@ jobs: name: cargo hack check --feature-powerset runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Install Rust run: rustup update nightly && rustup default nightly - name: Install cargo-hack @@ -243,7 +237,7 @@ jobs: contents: write pull-requests: write steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Install Rust run: rustup update nightly && rustup default nightly - run: ci/no_atomic_cas.sh @@ -260,7 +254,7 @@ jobs: echo "::set-output name=success::false" fi if: github.repository_owner == 'rust-lang' && github.event_name == 'schedule' - - uses: peter-evans/create-pull-request@v3 + - uses: peter-evans/create-pull-request@v5 with: title: Update no_atomic_cas.rs body: | @@ -276,7 +270,7 @@ jobs: name: cargo miri test runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Install Rust run: rustup toolchain install nightly --component miri && rustup default nightly - run: cargo miri test --workspace --all-features @@ -295,10 +289,9 @@ jobs: - thread runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Install Rust - run: rustup update nightly && rustup default nightly - - run: rustup component add rust-src + run: rustup toolchain install nightly --component rust-src && rustup default nightly - run: cargo -Z build-std test --workspace --all-features --target x86_64-unknown-linux-gnu --lib --tests env: # TODO: Once `cfg(sanitize = "..")` is stable, replace @@ -312,7 +305,7 @@ jobs: # name: cargo clippy # runs-on: ubuntu-latest # steps: - # - uses: actions/checkout@v3 + # - uses: actions/checkout@v4 # - name: Install Rust # run: rustup toolchain install nightly --component clippy && rustup default nightly # - run: cargo clippy --workspace --all-features --all-targets @@ -321,7 +314,7 @@ jobs: name: cargo fmt runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Install Rust run: rustup update stable - run: cargo fmt --all -- --check @@ -330,7 +323,7 @@ jobs: name: cargo doc runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Install Rust run: rustup update nightly && rustup default nightly - run: RUSTDOCFLAGS="-D warnings --cfg docsrs" cargo doc --workspace --no-deps --all-features diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 2c52681b21..5fec08e5bd 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -10,7 +10,7 @@ jobs: if: github.repository_owner == 'rust-lang' runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: taiki-e/create-gh-release-action@v1 with: changelog: CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md index f23a1c84e9..e689f36917 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +# 0.3.29 - 2023-10-26 + +* Add `TryStreamExt::try_ready_chunks` (#2757) +* Add `TryStreamExt::{try_all,try_any}` (#2783) +* Add `UnboundedSender::{len,is_empty}` (#2750) +* Fix `Sync` impl of `FuturesUnordered` (#2788) +* Fix infinite loop caused by invalid UTF-8 bytes (#2785) +* Fix build error with -Z minimal-versions (#2761) + # 0.3.28 - 2023-03-30 * Update to syn 2. This raises MSRV of utility crates to 1.56. (#2730, #2733) diff --git a/futures-channel/Cargo.toml b/futures-channel/Cargo.toml index 588cf69b2f..1f50e60b44 100644 --- a/futures-channel/Cargo.toml +++ b/futures-channel/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "futures-channel" -version = "0.3.28" +version = "0.3.29" edition = "2018" rust-version = "1.56" license = "MIT OR Apache-2.0" @@ -22,8 +22,8 @@ unstable = [] cfg-target-has-atomic = [] [dependencies] -futures-core = { path = "../futures-core", version = "0.3.28", default-features = false } -futures-sink = { path = "../futures-sink", version = "0.3.28", default-features = false, optional = true } +futures-core = { path = "../futures-core", version = "0.3.29", default-features = false } +futures-sink = { path = "../futures-sink", version = "0.3.29", default-features = false, optional = true } [dev-dependencies] futures = { path = "../futures", default-features = true } diff --git a/futures-channel/src/mpsc/mod.rs b/futures-channel/src/mpsc/mod.rs index cf45fe77fe..64f7526fa4 100644 --- a/futures-channel/src/mpsc/mod.rs +++ b/futures-channel/src/mpsc/mod.rs @@ -119,12 +119,12 @@ impl Unpin for BoundedSenderInner {} /// The transmission end of a bounded mpsc channel. /// -/// This value is created by the [`channel`](channel) function. +/// This value is created by the [`channel`] function. pub struct Sender(Option>); /// The transmission end of an unbounded mpsc channel. /// -/// This value is created by the [`unbounded`](unbounded) function. +/// This value is created by the [`unbounded`] function. pub struct UnboundedSender(Option>); trait AssertKinds: Send + Sync + Clone {} @@ -132,14 +132,14 @@ impl AssertKinds for UnboundedSender {} /// The receiving end of a bounded mpsc channel. /// -/// This value is created by the [`channel`](channel) function. +/// This value is created by the [`channel`] function. pub struct Receiver { inner: Option>>, } /// The receiving end of an unbounded mpsc channel. /// -/// This value is created by the [`unbounded`](unbounded) function. +/// This value is created by the [`unbounded`] function. pub struct UnboundedReceiver { inner: Option>>, } @@ -343,9 +343,8 @@ impl SenderTask { /// guaranteed slot in the channel capacity, and on top of that there are /// `buffer` "first come, first serve" slots available to all senders. /// -/// The [`Receiver`](Receiver) returned implements the -/// [`Stream`](futures_core::stream::Stream) trait, while [`Sender`](Sender) implements -/// `Sink`. +/// The [`Receiver`] returned implements the [`Stream`] trait, while [`Sender`] +/// implements `Sink`. pub fn channel(buffer: usize) -> (Sender, Receiver) { // Check that the requested buffer size does not exceed the maximum buffer // size permitted by the system. @@ -842,6 +841,20 @@ impl UnboundedSender { let ptr = self.0.as_ref().map(|inner| inner.ptr()); ptr.hash(hasher); } + + /// Return the number of messages in the queue or 0 if channel is disconnected. + pub fn len(&self) -> usize { + if let Some(sender) = &self.0 { + decode_state(sender.inner.state.load(SeqCst)).num_messages + } else { + 0 + } + } + + /// Return false is channel has no queued messages, true otherwise. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } } impl Clone for Sender { diff --git a/futures-channel/src/oneshot.rs b/futures-channel/src/oneshot.rs index 70449f43d6..fe5b115a33 100644 --- a/futures-channel/src/oneshot.rs +++ b/futures-channel/src/oneshot.rs @@ -14,7 +14,7 @@ use crate::lock::Lock; /// A future for a value that will be provided by another asynchronous task. /// -/// This is created by the [`channel`](channel) function. +/// This is created by the [`channel`] function. #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Receiver { inner: Arc>, @@ -22,7 +22,7 @@ pub struct Receiver { /// A means of transmitting a single value to another task. /// -/// This is created by the [`channel`](channel) function. +/// This is created by the [`channel`] function. pub struct Sender { inner: Arc>, } @@ -332,8 +332,8 @@ impl Sender { /// Completes this oneshot with a successful result. /// /// This function will consume `self` and indicate to the other end, the - /// [`Receiver`](Receiver), that the value provided is the result of the - /// computation this represents. + /// [`Receiver`], that the value provided is the result of the computation + /// this represents. /// /// If the value is successfully enqueued for the remote end to receive, /// then `Ok(())` is returned. If the receiving end was dropped before @@ -343,7 +343,7 @@ impl Sender { } /// Polls this `Sender` half to detect whether its associated - /// [`Receiver`](Receiver) has been dropped. + /// [`Receiver`] has been dropped. /// /// # Return values /// @@ -359,10 +359,10 @@ impl Sender { } /// Creates a future that resolves when this `Sender`'s corresponding - /// [`Receiver`](Receiver) half has hung up. + /// [`Receiver`] half has hung up. /// /// This is a utility wrapping [`poll_canceled`](Sender::poll_canceled) - /// to expose a [`Future`](core::future::Future). + /// to expose a [`Future`]. pub fn cancellation(&mut self) -> Cancellation<'_, T> { Cancellation { inner: self } } @@ -413,8 +413,8 @@ impl Future for Cancellation<'_, T> { } } -/// Error returned from a [`Receiver`](Receiver) when the corresponding -/// [`Sender`](Sender) is dropped. +/// Error returned from a [`Receiver`] when the corresponding [`Sender`] is +/// dropped. #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub struct Canceled; diff --git a/futures-channel/tests/mpsc.rs b/futures-channel/tests/mpsc.rs index 444c8e10fd..f5d7198d22 100644 --- a/futures-channel/tests/mpsc.rs +++ b/futures-channel/tests/mpsc.rs @@ -632,3 +632,26 @@ fn send_backpressure_multi_senders() { let item = block_on(rx.next()).unwrap(); assert_eq!(item, 2); } + +/// Test that empty channel has zero length and that non-empty channel has length equal to number +/// of enqueued items +#[test] +fn unbounded_len() { + let (tx, mut rx) = mpsc::unbounded(); + assert_eq!(tx.len(), 0); + assert!(tx.is_empty()); + tx.unbounded_send(1).unwrap(); + assert_eq!(tx.len(), 1); + assert!(!tx.is_empty()); + tx.unbounded_send(2).unwrap(); + assert_eq!(tx.len(), 2); + assert!(!tx.is_empty()); + let item = block_on(rx.next()).unwrap(); + assert_eq!(item, 1); + assert_eq!(tx.len(), 1); + assert!(!tx.is_empty()); + let item = block_on(rx.next()).unwrap(); + assert_eq!(item, 2); + assert_eq!(tx.len(), 0); + assert!(tx.is_empty()); +} diff --git a/futures-core/Cargo.toml b/futures-core/Cargo.toml index a65e8fb034..704861cc3e 100644 --- a/futures-core/Cargo.toml +++ b/futures-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "futures-core" -version = "0.3.28" +version = "0.3.29" edition = "2018" rust-version = "1.36" license = "MIT OR Apache-2.0" @@ -21,7 +21,7 @@ unstable = [] cfg-target-has-atomic = [] [dependencies] -portable-atomic = { version = "1", default-features = false, optional = true } +portable-atomic = { version = "1.3", optional = true, default-features = false, features = ["require-cas"] } [dev-dependencies] futures = { path = "../futures" } diff --git a/futures-core/src/task/__internal/mod.rs b/futures-core/src/task/__internal/mod.rs index c902eb4bfb..377f3e286c 100644 --- a/futures-core/src/task/__internal/mod.rs +++ b/futures-core/src/task/__internal/mod.rs @@ -1,4 +1,4 @@ -#[cfg(not(futures_no_atomic_cas))] +#[cfg(any(not(futures_no_atomic_cas), feature = "portable-atomic"))] mod atomic_waker; -#[cfg(not(futures_no_atomic_cas))] +#[cfg(any(not(futures_no_atomic_cas), feature = "portable-atomic"))] pub use self::atomic_waker::AtomicWaker; diff --git a/futures-executor/Cargo.toml b/futures-executor/Cargo.toml index bd47e42a4e..22cf99636c 100644 --- a/futures-executor/Cargo.toml +++ b/futures-executor/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "futures-executor" -version = "0.3.28" +version = "0.3.29" edition = "2018" rust-version = "1.56" license = "MIT OR Apache-2.0" @@ -16,9 +16,9 @@ std = ["futures-core/std", "futures-task/std", "futures-util/std"] thread-pool = ["std", "num_cpus"] [dependencies] -futures-core = { path = "../futures-core", version = "0.3.28", default-features = false } -futures-task = { path = "../futures-task", version = "0.3.28", default-features = false } -futures-util = { path = "../futures-util", version = "0.3.28", default-features = false } +futures-core = { path = "../futures-core", version = "0.3.29", default-features = false } +futures-task = { path = "../futures-task", version = "0.3.29", default-features = false } +futures-util = { path = "../futures-util", version = "0.3.29", default-features = false } num_cpus = { version = "1.8.0", optional = true } [dev-dependencies] diff --git a/futures-io/Cargo.toml b/futures-io/Cargo.toml index dac499faab..a0699de52a 100644 --- a/futures-io/Cargo.toml +++ b/futures-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "futures-io" -version = "0.3.28" +version = "0.3.29" edition = "2018" rust-version = "1.36" license = "MIT OR Apache-2.0" diff --git a/futures-macro/Cargo.toml b/futures-macro/Cargo.toml index 23a2529cd3..f180d9e9f6 100644 --- a/futures-macro/Cargo.toml +++ b/futures-macro/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "futures-macro" -version = "0.3.28" +version = "0.3.29" edition = "2018" rust-version = "1.56" license = "MIT OR Apache-2.0" @@ -16,6 +16,6 @@ proc-macro = true [features] [dependencies] -proc-macro2 = "1.0" +proc-macro2 = "1.0.60" quote = "1.0" syn = { version = "2.0.8", features = ["full"] } diff --git a/futures-sink/Cargo.toml b/futures-sink/Cargo.toml index ec760185ba..dbf5e91be7 100644 --- a/futures-sink/Cargo.toml +++ b/futures-sink/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "futures-sink" -version = "0.3.28" +version = "0.3.29" edition = "2018" rust-version = "1.36" license = "MIT OR Apache-2.0" diff --git a/futures-task/Cargo.toml b/futures-task/Cargo.toml index 8ae7c3f5ad..b3bf447d39 100644 --- a/futures-task/Cargo.toml +++ b/futures-task/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "futures-task" -version = "0.3.28" +version = "0.3.29" edition = "2018" rust-version = "1.56" license = "MIT OR Apache-2.0" diff --git a/futures-test/Cargo.toml b/futures-test/Cargo.toml index 6b16a384ab..81f00e9147 100644 --- a/futures-test/Cargo.toml +++ b/futures-test/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "futures-test" -version = "0.3.28" +version = "0.3.29" edition = "2018" rust-version = "1.56" license = "MIT OR Apache-2.0" @@ -11,13 +11,13 @@ Common utilities for testing components built off futures-rs. """ [dependencies] -futures-core = { version = "0.3.28", path = "../futures-core", default-features = false } -futures-task = { version = "0.3.28", path = "../futures-task", default-features = false } -futures-io = { version = "0.3.28", path = "../futures-io", default-features = false } -futures-util = { version = "0.3.28", path = "../futures-util", default-features = false } -futures-executor = { version = "0.3.28", path = "../futures-executor", default-features = false } -futures-sink = { version = "0.3.28", path = "../futures-sink", default-features = false } -futures-macro = { version = "=0.3.28", path = "../futures-macro", default-features = false } +futures-core = { version = "0.3.29", path = "../futures-core", default-features = false } +futures-task = { version = "0.3.29", path = "../futures-task", default-features = false } +futures-io = { version = "0.3.29", path = "../futures-io", default-features = false } +futures-util = { version = "0.3.29", path = "../futures-util", default-features = false } +futures-executor = { version = "0.3.29", path = "../futures-executor", default-features = false } +futures-sink = { version = "0.3.29", path = "../futures-sink", default-features = false } +futures-macro = { version = "=0.3.29", path = "../futures-macro", default-features = false } pin-utils = { version = "0.1.0", default-features = false } pin-project = "1.0.11" diff --git a/futures-test/src/assert_unmoved.rs b/futures-test/src/assert_unmoved.rs index 95d9a095f2..baeaeb59ab 100644 --- a/futures-test/src/assert_unmoved.rs +++ b/futures-test/src/assert_unmoved.rs @@ -7,7 +7,6 @@ use futures_io::{ use futures_sink::Sink; use pin_project::{pin_project, pinned_drop}; use std::pin::Pin; -use std::ptr; use std::thread::panicking; /// Combinator that asserts that the underlying type is not moved after being polled. @@ -24,26 +23,21 @@ use std::thread::panicking; pub struct AssertUnmoved { #[pin] inner: T, - this_ptr: *const Self, + this_addr: usize, } -// Safety: having a raw pointer in a struct makes it `!Send`, however the -// pointer is never dereferenced so this is safe. -unsafe impl Send for AssertUnmoved {} -unsafe impl Sync for AssertUnmoved {} - impl AssertUnmoved { pub(crate) fn new(inner: T) -> Self { - Self { inner, this_ptr: ptr::null() } + Self { inner, this_addr: 0 } } fn poll_with<'a, U>(mut self: Pin<&'a mut Self>, f: impl FnOnce(Pin<&'a mut T>) -> U) -> U { - let cur_this = &*self as *const Self; - if self.this_ptr.is_null() { + let cur_this = &*self as *const Self as usize; + if self.this_addr == 0 { // First time being polled - *self.as_mut().project().this_ptr = cur_this; + *self.as_mut().project().this_addr = cur_this; } else { - assert_eq!(self.this_ptr, cur_this, "AssertUnmoved moved between poll calls"); + assert_eq!(self.this_addr, cur_this, "AssertUnmoved moved between poll calls"); } f(self.project().inner) } @@ -166,9 +160,9 @@ impl PinnedDrop for AssertUnmoved { fn drop(self: Pin<&mut Self>) { // If the thread is panicking then we can't panic again as that will // cause the process to be aborted. - if !panicking() && !self.this_ptr.is_null() { - let cur_this = &*self as *const Self; - assert_eq!(self.this_ptr, cur_this, "AssertUnmoved moved before drop"); + if !panicking() && self.this_addr != 0 { + let cur_this = &*self as *const Self as usize; + assert_eq!(self.this_addr, cur_this, "AssertUnmoved moved before drop"); } } } diff --git a/futures-test/src/lib.rs b/futures-test/src/lib.rs index 2eb4a1c4cd..49f834846e 100644 --- a/futures-test/src/lib.rs +++ b/futures-test/src/lib.rs @@ -61,7 +61,7 @@ mod interleave_pending; mod track_closed; /// Enables an `async` test function. The generated future will be run to completion with -/// [`futures_executor::block_on`](futures_executor::block_on). +/// [`futures_executor::block_on`]. /// /// ``` /// #[futures_test::test] diff --git a/futures-test/src/task/mod.rs b/futures-test/src/task/mod.rs index 118291c0c9..e10ecbb875 100644 --- a/futures-test/src/task/mod.rs +++ b/futures-test/src/task/mod.rs @@ -15,29 +15,29 @@ //! [`Spawn`](futures_task::Spawn) implementations. //! //! Test contexts: -//! - [`noop_context`](crate::task::noop_context) creates a context that ignores calls to +//! - [`noop_context`] creates a context that ignores calls to //! [`cx.waker().wake_by_ref()`](futures_core::task::Waker). -//! - [`panic_context`](crate::task::panic_context) creates a context that panics when +//! - [`panic_context`] creates a context that panics when //! [`cx.waker().wake_by_ref()`](futures_core::task::Waker) is called. //! //! Test wakers: -//! - [`noop_waker`](crate::task::noop_waker) creates a waker that ignores calls to +//! - [`noop_waker`] creates a waker that ignores calls to //! [`wake`](futures_core::task::Waker). -//! - [`panic_waker`](crate::task::panic_waker()) creates a waker that panics when +//! - [`panic_waker`](panic_waker()) creates a waker that panics when //! [`wake`](futures_core::task::Waker) is called. -//! - [`new_count_waker`](crate::task::new_count_waker) creates a waker that increments a counter whenever +//! - [`new_count_waker`] creates a waker that increments a counter whenever //! [`wake`](futures_core::task::Waker) is called. //! //! Test spawners: -//! - [`NoopSpawner`](crate::task::NoopSpawner) ignores calls to +//! - [`NoopSpawner`] ignores calls to //! [`spawn`](futures_util::task::SpawnExt::spawn) -//! - [`PanicSpawner`](crate::task::PanicSpawner) panics if [`spawn`](futures_util::task::SpawnExt::spawn) is +//! - [`PanicSpawner`] panics if [`spawn`](futures_util::task::SpawnExt::spawn) is //! called. -//! - [`RecordSpawner`](crate::task::RecordSpawner) records the spawned futures. +//! - [`RecordSpawner`] records the spawned futures. //! //! For convenience there additionally exist various functions that directly -//! return waker/spawner references: [`noop_waker_ref`](crate::task::noop_waker_ref), -//! [`panic_waker_ref`](crate::task::panic_waker_ref), [`noop_spawner_mut`](crate::task::noop_spawner_mut) and [`panic_spawner_mut`](crate::task::panic_spawner_mut). +//! return waker/spawner references: [`noop_waker_ref`], [`panic_waker_ref`], +//! [`noop_spawner_mut`] and [`panic_spawner_mut`]. mod context; pub use self::context::{noop_context, panic_context}; diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index 93924692f0..27e9e94985 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "futures-util" -version = "0.3.28" +version = "0.3.29" edition = "2018" rust-version = "1.56" license = "MIT OR Apache-2.0" @@ -35,12 +35,12 @@ write-all-vectored = ["io"] cfg-target-has-atomic = [] [dependencies] -futures-core = { path = "../futures-core", version = "0.3.28", default-features = false } -futures-task = { path = "../futures-task", version = "0.3.28", default-features = false } -futures-channel = { path = "../futures-channel", version = "0.3.28", default-features = false, features = ["std"], optional = true } -futures-io = { path = "../futures-io", version = "0.3.28", default-features = false, features = ["std"], optional = true } -futures-sink = { path = "../futures-sink", version = "0.3.28", default-features = false, optional = true } -futures-macro = { path = "../futures-macro", version = "=0.3.28", default-features = false, optional = true } +futures-core = { path = "../futures-core", version = "0.3.29", default-features = false } +futures-task = { path = "../futures-task", version = "0.3.29", default-features = false } +futures-channel = { path = "../futures-channel", version = "0.3.29", default-features = false, features = ["std"], optional = true } +futures-io = { path = "../futures-io", version = "0.3.29", default-features = false, features = ["std"], optional = true } +futures-sink = { path = "../futures-sink", version = "0.3.29", default-features = false, optional = true } +futures-macro = { path = "../futures-macro", version = "=0.3.29", default-features = false, optional = true } slab = { version = "0.4.2", optional = true } memchr = { version = "2.2", optional = true } futures_01 = { version = "0.1.25", optional = true, package = "futures" } diff --git a/futures-util/src/async_await/mod.rs b/futures-util/src/async_await/mod.rs index 7276da227a..7e3f12c99f 100644 --- a/futures-util/src/async_await/mod.rs +++ b/futures-util/src/async_await/mod.rs @@ -31,9 +31,11 @@ mod select_mod; pub use self::select_mod::*; // Primary export is a macro +#[cfg(feature = "std")] #[cfg(feature = "async-await-macro")] mod stream_select_mod; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/64762 +#[cfg(feature = "std")] #[cfg(feature = "async-await-macro")] pub use self::stream_select_mod::*; diff --git a/futures-util/src/async_await/stream_select_mod.rs b/futures-util/src/async_await/stream_select_mod.rs index 1c8002fff3..61e3fa1c66 100644 --- a/futures-util/src/async_await/stream_select_mod.rs +++ b/futures-util/src/async_await/stream_select_mod.rs @@ -1,6 +1,5 @@ //! The `stream_select` macro. -#[cfg(feature = "std")] #[allow(unreachable_pub)] #[doc(hidden)] pub use futures_macro::stream_select_internal; @@ -28,7 +27,6 @@ pub use futures_macro::stream_select_internal; /// } /// # }); /// ``` -#[cfg(feature = "std")] #[macro_export] macro_rules! stream_select { ($($tokens:tt)*) => {{ diff --git a/futures-util/src/future/future/mod.rs b/futures-util/src/future/future/mod.rs index c11d108207..955af3776a 100644 --- a/futures-util/src/future/future/mod.rs +++ b/futures-util/src/future/future/mod.rs @@ -463,10 +463,6 @@ pub trait FutureExt: Future { /// ``` /// /// ``` - /// // Note, unlike most examples this is written in the context of a - /// // synchronous function to better illustrate the cross-thread aspect of - /// // the `shared` combinator. - /// /// # futures::executor::block_on(async { /// use futures::future::FutureExt; /// use futures::executor::block_on; diff --git a/futures-util/src/future/join_all.rs b/futures-util/src/future/join_all.rs index 7dc159ba07..11b6f27288 100644 --- a/futures-util/src/future/join_all.rs +++ b/futures-util/src/future/join_all.rs @@ -77,7 +77,7 @@ where /// /// `join_all` will switch to the more powerful [`FuturesOrdered`] for performance /// reasons if the number of futures is large. You may want to look into using it or -/// it's counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly. +/// its counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly. /// /// Some examples for additional functionality provided by these are: /// diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs index 8ce3ad644b..fdad60b1fa 100644 --- a/futures-util/src/io/mod.rs +++ b/futures-util/src/io/mod.rs @@ -804,11 +804,11 @@ pub trait AsyncBufReadExt: AsyncBufRead { /// use futures::io::{AsyncBufReadExt, Cursor}; /// use futures::stream::StreamExt; /// - /// let cursor = Cursor::new(b"lorem\nipsum\r\ndolor"); + /// let cursor = Cursor::new(b"lorem\nipsum\xc2\r\ndolor"); /// - /// let mut lines_stream = cursor.lines().map(|l| l.unwrap()); + /// let mut lines_stream = cursor.lines().map(|l| l.unwrap_or(String::from("invalid UTF_8"))); /// assert_eq!(lines_stream.next().await, Some(String::from("lorem"))); - /// assert_eq!(lines_stream.next().await, Some(String::from("ipsum"))); + /// assert_eq!(lines_stream.next().await, Some(String::from("invalid UTF_8"))); /// assert_eq!(lines_stream.next().await, Some(String::from("dolor"))); /// assert_eq!(lines_stream.next().await, None); /// # Ok::<(), Box>(()) }).unwrap(); diff --git a/futures-util/src/io/read_line.rs b/futures-util/src/io/read_line.rs index e1b8fc9455..df782c9570 100644 --- a/futures-util/src/io/read_line.rs +++ b/futures-util/src/io/read_line.rs @@ -35,6 +35,7 @@ pub(super) fn read_line_internal( ) -> Poll> { let ret = ready!(read_until_internal(reader, cx, b'\n', bytes, read)); if str::from_utf8(bytes).is_err() { + bytes.clear(); Poll::Ready(ret.and_then(|_| { Err(io::Error::new(io::ErrorKind::InvalidData, "stream did not contain valid UTF-8")) })) diff --git a/futures-util/src/io/window.rs b/futures-util/src/io/window.rs index 77b7267c69..d857282383 100644 --- a/futures-util/src/io/window.rs +++ b/futures-util/src/io/window.rs @@ -1,6 +1,6 @@ use std::ops::{Bound, Range, RangeBounds}; -/// A owned window around an underlying buffer. +/// An owned window around an underlying buffer. /// /// Normally slices work great for considering sub-portions of a buffer, but /// unfortunately a slice is a *borrowed* type in Rust which has an associated diff --git a/futures-util/src/stream/futures_ordered.rs b/futures-util/src/stream/futures_ordered.rs index 618bf1b7bd..3aaef8bdef 100644 --- a/futures-util/src/stream/futures_ordered.rs +++ b/futures-util/src/stream/futures_ordered.rs @@ -58,36 +58,39 @@ where /// An unbounded queue of futures. /// -/// This "combinator" is similar to [`FuturesUnordered`], but it imposes a FIFO order -/// on top of the set of futures. While futures in the set will race to +/// This "combinator" is similar to [`FuturesUnordered`], but it imposes a FIFO +/// order on top of the set of futures. While futures in the set will race to /// completion in parallel, results will only be returned in the order their /// originating futures were added to the queue. /// /// Futures are pushed into this queue and their realized values are yielded in /// order. This structure is optimized to manage a large number of futures. -/// Futures managed by `FuturesOrdered` will only be polled when they generate +/// Futures managed by [`FuturesOrdered`] will only be polled when they generate /// notifications. This reduces the required amount of work needed to coordinate /// large numbers of futures. /// -/// When a `FuturesOrdered` is first created, it does not contain any futures. -/// Calling `poll` in this state will result in `Poll::Ready(None))` to be -/// returned. Futures are submitted to the queue using `push`; however, the -/// future will **not** be polled at this point. `FuturesOrdered` will only -/// poll managed futures when `FuturesOrdered::poll` is called. As such, it -/// is important to call `poll` after pushing new futures. +/// When a [`FuturesOrdered`] is first created, it does not contain any futures. +/// Calling [`poll_next`](FuturesOrdered::poll_next) in this state will result +/// in [`Poll::Ready(None)`](Poll::Ready) to be returned. Futures are submitted +/// to the queue using [`push_back`](FuturesOrdered::push_back) (or +/// [`push_front`](FuturesOrdered::push_front)); however, the future will +/// **not** be polled at this point. [`FuturesOrdered`] will only poll managed +/// futures when [`FuturesOrdered::poll_next`] is called. As such, it +/// is important to call [`poll_next`](FuturesOrdered::poll_next) after pushing +/// new futures. /// -/// If `FuturesOrdered::poll` returns `Poll::Ready(None)` this means that -/// the queue is currently not managing any futures. A future may be submitted -/// to the queue at a later time. At that point, a call to -/// `FuturesOrdered::poll` will either return the future's resolved value -/// **or** `Poll::Pending` if the future has not yet completed. When -/// multiple futures are submitted to the queue, `FuturesOrdered::poll` will -/// return `Poll::Pending` until the first future completes, even if +/// If [`FuturesOrdered::poll_next`] returns [`Poll::Ready(None)`](Poll::Ready) +/// this means that the queue is currently not managing any futures. A future +/// may be submitted to the queue at a later time. At that point, a call to +/// [`FuturesOrdered::poll_next`] will either return the future's resolved value +/// **or** [`Poll::Pending`] if the future has not yet completed. When +/// multiple futures are submitted to the queue, [`FuturesOrdered::poll_next`] +/// will return [`Poll::Pending`] until the first future completes, even if /// some of the later futures have already completed. /// -/// Note that you can create a ready-made `FuturesOrdered` via the +/// Note that you can create a ready-made [`FuturesOrdered`] via the /// [`collect`](Iterator::collect) method, or you can start with an empty queue -/// with the `FuturesOrdered::new` constructor. +/// with the [`FuturesOrdered::new`] constructor. /// /// This type is only available when the `std` or `alloc` feature of this /// library is activated, and it is activated by default. @@ -104,8 +107,9 @@ impl Unpin for FuturesOrdered {} impl FuturesOrdered { /// Constructs a new, empty `FuturesOrdered` /// - /// The returned `FuturesOrdered` does not contain any futures and, in this - /// state, `FuturesOrdered::poll_next` will return `Poll::Ready(None)`. + /// The returned [`FuturesOrdered`] does not contain any futures and, in + /// this state, [`FuturesOrdered::poll_next`] will return + /// [`Poll::Ready(None)`](Poll::Ready). pub fn new() -> Self { Self { in_progress_queue: FuturesUnordered::new(), @@ -132,9 +136,9 @@ impl FuturesOrdered { /// Push a future into the queue. /// /// This function submits the given future to the internal set for managing. - /// This function will not call `poll` on the submitted future. The caller - /// must ensure that `FuturesOrdered::poll` is called in order to receive - /// task notifications. + /// This function will not call [`poll`](Future::poll) on the submitted + /// future. The caller must ensure that [`FuturesOrdered::poll_next`] is + /// called in order to receive task notifications. #[deprecated(note = "use `push_back` instead")] pub fn push(&mut self, future: Fut) { self.push_back(future); @@ -143,9 +147,9 @@ impl FuturesOrdered { /// Pushes a future to the back of the queue. /// /// This function submits the given future to the internal set for managing. - /// This function will not call `poll` on the submitted future. The caller - /// must ensure that `FuturesOrdered::poll` is called in order to receive - /// task notifications. + /// This function will not call [`poll`](Future::poll) on the submitted + /// future. The caller must ensure that [`FuturesOrdered::poll_next`] is + /// called in order to receive task notifications. pub fn push_back(&mut self, future: Fut) { let wrapped = OrderWrapper { data: future, index: self.next_incoming_index }; self.next_incoming_index += 1; @@ -155,10 +159,10 @@ impl FuturesOrdered { /// Pushes a future to the front of the queue. /// /// This function submits the given future to the internal set for managing. - /// This function will not call `poll` on the submitted future. The caller - /// must ensure that `FuturesOrdered::poll` is called in order to receive - /// task notifications. This future will be the next future to be returned - /// complete. + /// This function will not call [`poll`](Future::poll) on the submitted + /// future. The caller must ensure that [`FuturesOrdered::poll_next`] is + /// called in order to receive task notifications. This future will be + /// the next future to be returned complete. pub fn push_front(&mut self, future: Fut) { let wrapped = OrderWrapper { data: future, index: self.next_outgoing_index - 1 }; self.next_outgoing_index -= 1; diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index 6b5804dc41..0dbaea9080 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -62,7 +62,7 @@ pub struct FuturesUnordered { } unsafe impl Send for FuturesUnordered {} -unsafe impl Sync for FuturesUnordered {} +unsafe impl Sync for FuturesUnordered {} impl Unpin for FuturesUnordered {} impl Spawn for FuturesUnordered> { diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index bf9506147c..34d68a80be 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -68,7 +68,7 @@ pub use self::try_stream::{ }; #[cfg(feature = "alloc")] -pub use self::try_stream::{TryChunks, TryChunksError}; +pub use self::try_stream::{TryChunks, TryChunksError, TryReadyChunks, TryReadyChunksError}; // Primitive streams diff --git a/futures-util/src/stream/stream/all.rs b/futures-util/src/stream/stream/all.rs index ba2baa5cf1..1435c798f2 100644 --- a/futures-util/src/stream/stream/all.rs +++ b/futures-util/src/stream/stream/all.rs @@ -13,7 +13,7 @@ pin_project! { #[pin] stream: St, f: F, - accum: Option, + done: bool, #[pin] future: Option, } @@ -27,7 +27,7 @@ where fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("All") .field("stream", &self.stream) - .field("accum", &self.accum) + .field("done", &self.done) .field("future", &self.future) .finish() } @@ -40,7 +40,7 @@ where Fut: Future, { pub(super) fn new(stream: St, f: F) -> Self { - Self { stream, f, accum: Some(true), future: None } + Self { stream, f, done: false, future: None } } } @@ -51,7 +51,7 @@ where Fut: Future, { fn is_terminated(&self) -> bool { - self.accum.is_none() && self.future.is_none() + self.done && self.future.is_none() } } @@ -67,21 +67,22 @@ where let mut this = self.project(); Poll::Ready(loop { if let Some(fut) = this.future.as_mut().as_pin_mut() { - // we're currently processing a future to produce a new accum value - let acc = this.accum.unwrap() && ready!(fut.poll(cx)); - if !acc { + // we're currently processing a future to produce a new value + let res = ready!(fut.poll(cx)); + this.future.set(None); + if !res { + *this.done = true; break false; } // early exit - *this.accum = Some(acc); - this.future.set(None); - } else if this.accum.is_some() { + } else if !*this.done { // we're waiting on a new item from the stream match ready!(this.stream.as_mut().poll_next(cx)) { Some(item) => { this.future.set(Some((this.f)(item))); } None => { - break this.accum.take().unwrap(); + *this.done = true; + break true; } } } else { diff --git a/futures-util/src/stream/stream/any.rs b/futures-util/src/stream/stream/any.rs index f023125c70..cc3d695b9d 100644 --- a/futures-util/src/stream/stream/any.rs +++ b/futures-util/src/stream/stream/any.rs @@ -13,7 +13,7 @@ pin_project! { #[pin] stream: St, f: F, - accum: Option, + done: bool, #[pin] future: Option, } @@ -27,7 +27,7 @@ where fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Any") .field("stream", &self.stream) - .field("accum", &self.accum) + .field("done", &self.done) .field("future", &self.future) .finish() } @@ -40,7 +40,7 @@ where Fut: Future, { pub(super) fn new(stream: St, f: F) -> Self { - Self { stream, f, accum: Some(false), future: None } + Self { stream, f, done: false, future: None } } } @@ -51,7 +51,7 @@ where Fut: Future, { fn is_terminated(&self) -> bool { - self.accum.is_none() && self.future.is_none() + self.done && self.future.is_none() } } @@ -67,21 +67,22 @@ where let mut this = self.project(); Poll::Ready(loop { if let Some(fut) = this.future.as_mut().as_pin_mut() { - // we're currently processing a future to produce a new accum value - let acc = this.accum.unwrap() || ready!(fut.poll(cx)); - if acc { + // we're currently processing a future to produce a new value + let res = ready!(fut.poll(cx)); + this.future.set(None); + if res { + *this.done = true; break true; } // early exit - *this.accum = Some(acc); - this.future.set(None); - } else if this.accum.is_some() { + } else if !*this.done { // we're waiting on a new item from the stream match ready!(this.stream.as_mut().poll_next(cx)) { Some(item) => { this.future.set(Some((this.f)(item))); } None => { - break this.accum.take().unwrap(); + *this.done = true; + break false; } } } else { diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index 558dc22bd7..3978d188fc 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -323,6 +323,9 @@ pub trait StreamExt: Stream { /// wrapped version of it, similar to the existing `map` methods in the /// standard library. /// + /// See [`StreamExt::then`](Self::then) if you want to use a closure that + /// returns a future instead of a value. + /// /// # Examples /// /// ``` @@ -467,6 +470,9 @@ pub trait StreamExt: Stream { /// Note that this function consumes the stream passed into it and returns a /// wrapped version of it. /// + /// See [`StreamExt::map`](Self::map) if you want to use a closure that + /// returns a value instead of a future. + /// /// # Examples /// /// ``` diff --git a/futures-util/src/stream/try_stream/mod.rs b/futures-util/src/stream/try_stream/mod.rs index 414a40dbe3..5d5702f363 100644 --- a/futures-util/src/stream/try_stream/mod.rs +++ b/futures-util/src/stream/try_stream/mod.rs @@ -111,6 +111,12 @@ mod try_chunks; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_chunks::{TryChunks, TryChunksError}; +#[cfg(feature = "alloc")] +mod try_ready_chunks; +#[cfg(feature = "alloc")] +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::try_ready_chunks::{TryReadyChunks, TryReadyChunksError}; + mod try_fold; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_fold::TryFold; @@ -160,6 +166,14 @@ mod into_async_read; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::into_async_read::IntoAsyncRead; +mod try_all; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::try_all::TryAll; + +mod try_any; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::try_any::TryAny; + impl TryStreamExt for S {} /// Adapters specific to `Result`-returning streams @@ -640,6 +654,55 @@ pub trait TryStreamExt: TryStream { ) } + /// An adaptor for chunking up successful, ready items of the stream inside a vector. + /// + /// This combinator will attempt to pull successful items from this stream and buffer + /// them into a local vector. At most `capacity` items will get buffered + /// before they're yielded from the returned stream. If the underlying stream + /// returns `Poll::Pending`, and the collected chunk is not empty, it will + /// be immidiatly returned. + /// + /// Note that the vectors returned from this iterator may not always have + /// `capacity` elements. If the underlying stream ended and only a partial + /// vector was created, it'll be returned. Additionally if an error happens + /// from the underlying stream then the currently buffered items will be + /// yielded. + /// + /// This method is only available when the `std` or `alloc` feature of this + /// library is activated, and it is activated by default. + /// + /// This function is similar to + /// [`StreamExt::ready_chunks`](crate::stream::StreamExt::ready_chunks) but exits + /// early if an error occurs. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, TryReadyChunksError, TryStreamExt}; + /// + /// let stream = stream::iter(vec![Ok::(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)]); + /// let mut stream = stream.try_ready_chunks(2); + /// + /// assert_eq!(stream.try_next().await, Ok(Some(vec![1, 2]))); + /// assert_eq!(stream.try_next().await, Err(TryReadyChunksError(vec![3], 4))); + /// assert_eq!(stream.try_next().await, Ok(Some(vec![5, 6]))); + /// # }) + /// ``` + /// + /// # Panics + /// + /// This method will panic if `capacity` is zero. + #[cfg(feature = "alloc")] + fn try_ready_chunks(self, capacity: usize) -> TryReadyChunks + where + Self: Sized, + { + assert_stream::, TryReadyChunksError>, _>( + TryReadyChunks::new(self, capacity), + ) + } + /// Attempt to filter the values produced by this stream according to the /// provided asynchronous closure. /// @@ -1127,4 +1190,62 @@ pub trait TryStreamExt: TryStream { { crate::io::assert_read(IntoAsyncRead::new(self)) } + + /// Attempt to execute a predicate over an asynchronous stream and evaluate if all items + /// satisfy the predicate. Exits early if an `Err` is encountered or if an `Ok` item is found + /// that does not satisfy the predicate. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt, TryStreamExt}; + /// use std::convert::Infallible; + /// + /// let number_stream = stream::iter(1..10).map(Ok::<_, Infallible>); + /// let positive = number_stream.try_all(|i| async move { i > 0 }); + /// assert_eq!(positive.await, Ok(true)); + /// + /// let stream_with_errors = stream::iter([Ok(1), Err("err"), Ok(3)]); + /// let positive = stream_with_errors.try_all(|i| async move { i > 0 }); + /// assert_eq!(positive.await, Err("err")); + /// # }); + /// ``` + fn try_all(self, f: F) -> TryAll + where + Self: Sized, + F: FnMut(Self::Ok) -> Fut, + Fut: Future, + { + assert_future::, _>(TryAll::new(self, f)) + } + + /// Attempt to execute a predicate over an asynchronous stream and evaluate if any items + /// satisfy the predicate. Exits early if an `Err` is encountered or if an `Ok` item is found + /// that satisfies the predicate. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt, TryStreamExt}; + /// use std::convert::Infallible; + /// + /// let number_stream = stream::iter(0..10).map(Ok::<_, Infallible>); + /// let contain_three = number_stream.try_any(|i| async move { i == 3 }); + /// assert_eq!(contain_three.await, Ok(true)); + /// + /// let stream_with_errors = stream::iter([Ok(1), Err("err"), Ok(3)]); + /// let contain_three = stream_with_errors.try_any(|i| async move { i == 3 }); + /// assert_eq!(contain_three.await, Err("err")); + /// # }); + /// ``` + fn try_any(self, f: F) -> TryAny + where + Self: Sized, + F: FnMut(Self::Ok) -> Fut, + Fut: Future, + { + assert_future::, _>(TryAny::new(self, f)) + } } diff --git a/futures-util/src/stream/try_stream/try_all.rs b/futures-util/src/stream/try_stream/try_all.rs new file mode 100644 index 0000000000..8179f86afc --- /dev/null +++ b/futures-util/src/stream/try_stream/try_all.rs @@ -0,0 +1,98 @@ +use core::fmt; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; +use futures_core::stream::TryStream; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`try_all`](super::TryStreamExt::try_all) method. + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct TryAll { + #[pin] + stream: St, + f: F, + done: bool, + #[pin] + future: Option, + } +} + +impl fmt::Debug for TryAll +where + St: fmt::Debug, + Fut: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TryAll") + .field("stream", &self.stream) + .field("done", &self.done) + .field("future", &self.future) + .finish() + } +} + +impl TryAll +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: Future, +{ + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f, done: false, future: None } + } +} + +impl FusedFuture for TryAll +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: Future, +{ + fn is_terminated(&self) -> bool { + self.done && self.future.is_none() + } +} + +impl Future for TryAll +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: Future, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + Poll::Ready(loop { + if let Some(fut) = this.future.as_mut().as_pin_mut() { + // we're currently processing a future to produce a new value + let acc = ready!(fut.poll(cx)); + this.future.set(None); + if !acc { + *this.done = true; + break Ok(false); + } // early exit + } else if !*this.done { + // we're waiting on a new item from the stream + match ready!(this.stream.as_mut().try_poll_next(cx)) { + Some(Ok(item)) => { + this.future.set(Some((this.f)(item))); + } + Some(Err(err)) => { + *this.done = true; + break Err(err); + } + None => { + *this.done = true; + break Ok(true); + } + } + } else { + panic!("TryAll polled after completion") + } + }) + } +} diff --git a/futures-util/src/stream/try_stream/try_any.rs b/futures-util/src/stream/try_stream/try_any.rs new file mode 100644 index 0000000000..15adb30971 --- /dev/null +++ b/futures-util/src/stream/try_stream/try_any.rs @@ -0,0 +1,98 @@ +use core::fmt; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; +use futures_core::stream::TryStream; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`any`](super::StreamExt::any) method. + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct TryAny { + #[pin] + stream: St, + f: F, + done: bool, + #[pin] + future: Option, + } +} + +impl fmt::Debug for TryAny +where + St: fmt::Debug, + Fut: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TryAny") + .field("stream", &self.stream) + .field("done", &self.done) + .field("future", &self.future) + .finish() + } +} + +impl TryAny +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: Future, +{ + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f, done: false, future: None } + } +} + +impl FusedFuture for TryAny +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: Future, +{ + fn is_terminated(&self) -> bool { + self.done && self.future.is_none() + } +} + +impl Future for TryAny +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: Future, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + Poll::Ready(loop { + if let Some(fut) = this.future.as_mut().as_pin_mut() { + // we're currently processing a future to produce a new value + let acc = ready!(fut.poll(cx)); + this.future.set(None); + if acc { + *this.done = true; + break Ok(true); + } // early exit + } else if !*this.done { + // we're waiting on a new item from the stream + match ready!(this.stream.as_mut().try_poll_next(cx)) { + Some(Ok(item)) => { + this.future.set(Some((this.f)(item))); + } + Some(Err(err)) => { + *this.done = true; + break Err(err); + } + None => { + *this.done = true; + break Ok(false); + } + } + } else { + panic!("TryAny polled after completion") + } + }) + } +} diff --git a/futures-util/src/stream/try_stream/try_ready_chunks.rs b/futures-util/src/stream/try_stream/try_ready_chunks.rs new file mode 100644 index 0000000000..8b1470ea26 --- /dev/null +++ b/futures-util/src/stream/try_stream/try_ready_chunks.rs @@ -0,0 +1,126 @@ +use crate::stream::{Fuse, IntoStream, StreamExt}; + +use alloc::vec::Vec; +use core::fmt; +use core::pin::Pin; +use futures_core::stream::{FusedStream, Stream, TryStream}; +use futures_core::task::{Context, Poll}; +#[cfg(feature = "sink")] +use futures_sink::Sink; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`try_ready_chunks`](super::TryStreamExt::try_ready_chunks) method. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct TryReadyChunks { + #[pin] + stream: Fuse>, + cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 + } +} + +impl TryReadyChunks { + pub(super) fn new(stream: St, capacity: usize) -> Self { + assert!(capacity > 0); + + Self { stream: IntoStream::new(stream).fuse(), cap: capacity } + } + + delegate_access_inner!(stream, St, (. .)); +} + +type TryReadyChunksStreamError = + TryReadyChunksError<::Ok, ::Error>; + +impl Stream for TryReadyChunks { + type Item = Result, TryReadyChunksStreamError>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.as_mut().project(); + + let mut items: Vec = Vec::new(); + + loop { + match this.stream.as_mut().poll_next(cx) { + // Flush all the collected data if the underlying stream doesn't + // contain more ready values + Poll::Pending => { + return if items.is_empty() { + Poll::Pending + } else { + Poll::Ready(Some(Ok(items))) + } + } + + // Push the ready item into the buffer and check whether it is full. + // If so, return the buffer. + Poll::Ready(Some(Ok(item))) => { + if items.is_empty() { + items.reserve_exact(*this.cap); + } + items.push(item); + if items.len() >= *this.cap { + return Poll::Ready(Some(Ok(items))); + } + } + + // Return the already collected items and the error. + Poll::Ready(Some(Err(e))) => { + return Poll::Ready(Some(Err(TryReadyChunksError(items, e)))); + } + + // Since the underlying stream ran out of values, return what we + // have buffered, if we have anything. + Poll::Ready(None) => { + let last = if items.is_empty() { None } else { Some(Ok(items)) }; + return Poll::Ready(last); + } + } + } + } + + fn size_hint(&self) -> (usize, Option) { + let (lower, upper) = self.stream.size_hint(); + let lower = lower / self.cap; + (lower, upper) + } +} + +impl FusedStream for TryReadyChunks { + fn is_terminated(&self) -> bool { + self.stream.is_terminated() + } +} + +// Forwarding impl of Sink from the underlying stream +#[cfg(feature = "sink")] +impl Sink for TryReadyChunks +where + S: TryStream + Sink, +{ + type Error = >::Error; + + delegate_sink!(stream, Item); +} + +/// Error indicating, that while chunk was collected inner stream produced an error. +/// +/// Contains all items that were collected before an error occurred, and the stream error itself. +#[derive(PartialEq, Eq)] +pub struct TryReadyChunksError(pub Vec, pub E); + +impl fmt::Debug for TryReadyChunksError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.1.fmt(f) + } +} + +impl fmt::Display for TryReadyChunksError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.1.fmt(f) + } +} + +#[cfg(feature = "std")] +impl std::error::Error for TryReadyChunksError {} diff --git a/futures-util/src/task/mod.rs b/futures-util/src/task/mod.rs index 0a31eeac14..3ed4bfadad 100644 --- a/futures-util/src/task/mod.rs +++ b/futures-util/src/task/mod.rs @@ -30,7 +30,7 @@ pub use futures_task::waker; #[cfg(feature = "alloc")] pub use futures_task::{waker_ref, WakerRef}; -#[cfg(not(futures_no_atomic_cas))] +#[cfg(any(not(futures_no_atomic_cas), feature = "portable-atomic"))] pub use futures_core::task::__internal::AtomicWaker; mod spawn; diff --git a/futures/Cargo.toml b/futures/Cargo.toml index 140fa210f0..96db108d37 100644 --- a/futures/Cargo.toml +++ b/futures/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "futures" -version = "0.3.28" +version = "0.3.29" edition = "2018" rust-version = "1.56" license = "MIT OR Apache-2.0" @@ -15,13 +15,13 @@ composability, and iterator-like interfaces. categories = ["asynchronous"] [dependencies] -futures-core = { path = "../futures-core", version = "0.3.28", default-features = false } -futures-task = { path = "../futures-task", version = "0.3.28", default-features = false } -futures-channel = { path = "../futures-channel", version = "0.3.28", default-features = false, features = ["sink"] } -futures-executor = { path = "../futures-executor", version = "0.3.28", default-features = false, optional = true } -futures-io = { path = "../futures-io", version = "0.3.28", default-features = false } -futures-sink = { path = "../futures-sink", version = "0.3.28", default-features = false } -futures-util = { path = "../futures-util", version = "0.3.28", default-features = false, features = ["sink"] } +futures-core = { path = "../futures-core", version = "0.3.29", default-features = false } +futures-task = { path = "../futures-task", version = "0.3.29", default-features = false } +futures-channel = { path = "../futures-channel", version = "0.3.29", default-features = false, features = ["sink"] } +futures-executor = { path = "../futures-executor", version = "0.3.29", default-features = false, optional = true } +futures-io = { path = "../futures-io", version = "0.3.29", default-features = false } +futures-sink = { path = "../futures-sink", version = "0.3.29", default-features = false } +futures-util = { path = "../futures-util", version = "0.3.29", default-features = false, features = ["sink"] } [dev-dependencies] futures-executor = { path = "../futures-executor", features = ["thread-pool"] } diff --git a/futures/tests/auto_traits.rs b/futures/tests/auto_traits.rs index 5fc0f7d675..004fda1e71 100644 --- a/futures/tests/auto_traits.rs +++ b/futures/tests/auto_traits.rs @@ -18,6 +18,8 @@ pub type SendFuture = Pin + Send>>; pub type SendTryFuture = SendFuture>; pub type SyncFuture = Pin + Sync>>; pub type SyncTryFuture = SyncFuture>; +pub type SendSyncFuture = Pin + Send + Sync>>; +pub type SendSyncTryFuture = SendSyncFuture>; pub type UnpinFuture = LocalFuture; pub type UnpinTryFuture = UnpinFuture>; pub struct PinnedFuture(PhantomPinned, PhantomData); @@ -35,6 +37,8 @@ pub type SendStream = Pin + Send>>; pub type SendTryStream = SendStream>; pub type SyncStream = Pin + Sync>>; pub type SyncTryStream = SyncStream>; +pub type SendSyncStream = Pin + Send + Sync>>; +pub type SendSyncTryStream = SendSyncStream>; pub type UnpinStream = LocalStream; pub type UnpinTryStream = UnpinStream>; pub struct PinnedStream(PhantomPinned, PhantomData); @@ -365,9 +369,10 @@ pub mod future { assert_impl!(JoinAll>: Send); assert_not_impl!(JoinAll: Send); assert_not_impl!(JoinAll: Send); - assert_impl!(JoinAll>: Sync); - assert_not_impl!(JoinAll: Sync); - assert_not_impl!(JoinAll: Sync); + assert_impl!(JoinAll>: Sync); + assert_not_impl!(JoinAll>: Sync); + assert_not_impl!(JoinAll>: Sync); + assert_not_impl!(JoinAll: Sync); assert_impl!(JoinAll: Unpin); assert_impl!(Lazy<()>: Send); @@ -579,9 +584,10 @@ pub mod future { assert_impl!(TryJoinAll>: Send); assert_not_impl!(TryJoinAll: Send); assert_not_impl!(TryJoinAll: Send); - assert_impl!(TryJoinAll>: Sync); - assert_not_impl!(TryJoinAll: Sync); - assert_not_impl!(TryJoinAll: Sync); + assert_impl!(TryJoinAll>: Sync); + assert_not_impl!(TryJoinAll>: Sync); + assert_not_impl!(TryJoinAll>: Sync); + assert_not_impl!(TryJoinAll: Sync); assert_impl!(TryJoinAll: Unpin); assert_impl!(TrySelect: Send); @@ -1118,10 +1124,9 @@ pub mod stream { assert_not_impl!(Buffered>: Send); assert_not_impl!(Buffered>: Send); assert_not_impl!(Buffered>>: Send); - assert_impl!(Buffered>>: Sync); - assert_not_impl!(Buffered>: Sync); - assert_not_impl!(Buffered>: Sync); - assert_not_impl!(Buffered>>: Sync); + assert_impl!(Buffered>>: Sync); + assert_not_impl!(Buffered>>: Sync); + assert_not_impl!(Buffered>>: Sync); assert_impl!(Buffered>: Unpin); assert_not_impl!(Buffered>: Unpin); @@ -1303,9 +1308,10 @@ pub mod stream { assert_impl!(FuturesOrdered>: Send); assert_not_impl!(FuturesOrdered: Send); assert_not_impl!(FuturesOrdered: Send); - assert_impl!(FuturesOrdered>: Sync); - assert_not_impl!(FuturesOrdered>: Sync); - assert_not_impl!(FuturesOrdered>: Sync); + assert_impl!(FuturesOrdered>: Sync); + assert_not_impl!(FuturesOrdered>: Sync); + assert_not_impl!(FuturesOrdered>: Sync); + assert_not_impl!(FuturesOrdered: Sync); assert_impl!(FuturesOrdered: Unpin); assert_impl!(FuturesUnordered<()>: Send); @@ -1647,11 +1653,12 @@ pub mod stream { assert_not_impl!(TryBuffered>>: Send); assert_not_impl!(TryBuffered>>: Send); assert_not_impl!(TryBuffered>>: Send); - assert_impl!(TryBuffered>>: Sync); - assert_not_impl!(TryBuffered>>: Sync); - assert_not_impl!(TryBuffered>>: Sync); - assert_not_impl!(TryBuffered>>: Sync); - assert_not_impl!(TryBuffered>>: Sync); + assert_impl!(TryBuffered>>: Sync); + assert_not_impl!(TryBuffered>>: Sync); + assert_not_impl!(TryBuffered>>: Sync); + assert_not_impl!(TryBuffered>>: Sync); + assert_not_impl!(TryBuffered>>: Sync); + assert_not_impl!(TryBuffered>>: Sync); assert_impl!(TryBuffered>: Unpin); assert_not_impl!(TryBuffered>: Unpin); diff --git a/futures/tests/stream.rs b/futures/tests/stream.rs index 79d8e233cc..6cbef7516c 100644 --- a/futures/tests/stream.rs +++ b/futures/tests/stream.rs @@ -535,3 +535,43 @@ fn select_with_strategy_doesnt_terminate_early() { assert_eq!(count.get(), times_should_poll + 1); } } + +async fn is_even(number: u8) -> bool { + number % 2 == 0 +} + +#[test] +fn all() { + block_on(async { + let empty: [u8; 0] = []; + let st = stream::iter(empty); + let all = st.all(is_even).await; + assert!(all); + + let st = stream::iter([2, 4, 6, 8]); + let all = st.all(is_even).await; + assert!(all); + + let st = stream::iter([2, 3, 4]); + let all = st.all(is_even).await; + assert!(!all); + }); +} + +#[test] +fn any() { + block_on(async { + let empty: [u8; 0] = []; + let st = stream::iter(empty); + let any = st.any(is_even).await; + assert!(!any); + + let st = stream::iter([1, 2, 3]); + let any = st.any(is_even).await; + assert!(any); + + let st = stream::iter([1, 3, 5]); + let any = st.any(is_even).await; + assert!(!any); + }); +} diff --git a/futures/tests/stream_try_stream.rs b/futures/tests/stream_try_stream.rs index b3d04b9200..ef38c510b8 100644 --- a/futures/tests/stream_try_stream.rs +++ b/futures/tests/stream_try_stream.rs @@ -1,4 +1,5 @@ use core::pin::Pin; +use std::convert::Infallible; use futures::{ stream::{self, repeat, Repeat, StreamExt, TryStreamExt}, @@ -132,3 +133,51 @@ fn try_flatten_unordered() { assert_eq!(taken, 31); }) } + +async fn is_even(number: u8) -> bool { + number % 2 == 0 +} + +#[test] +fn try_all() { + block_on(async { + let empty: [Result; 0] = []; + let st = stream::iter(empty); + let all = st.try_all(is_even).await; + assert_eq!(Ok(true), all); + + let st = stream::iter([Ok::<_, Infallible>(2), Ok(4), Ok(6), Ok(8)]); + let all = st.try_all(is_even).await; + assert_eq!(Ok(true), all); + + let st = stream::iter([Ok::<_, Infallible>(2), Ok(3), Ok(4)]); + let all = st.try_all(is_even).await; + assert_eq!(Ok(false), all); + + let st = stream::iter([Ok(2), Ok(4), Err("err"), Ok(8)]); + let all = st.try_all(is_even).await; + assert_eq!(Err("err"), all); + }); +} + +#[test] +fn try_any() { + block_on(async { + let empty: [Result; 0] = []; + let st = stream::iter(empty); + let any = st.try_any(is_even).await; + assert_eq!(Ok(false), any); + + let st = stream::iter([Ok::<_, Infallible>(1), Ok(2), Ok(3)]); + let any = st.try_any(is_even).await; + assert_eq!(Ok(true), any); + + let st = stream::iter([Ok::<_, Infallible>(1), Ok(3), Ok(5)]); + let any = st.try_any(is_even).await; + assert_eq!(Ok(false), any); + + let st = stream::iter([Ok(1), Ok(3), Err("err"), Ok(8)]); + let any = st.try_any(is_even).await; + assert_eq!(Err("err"), any); + }); +}