Skip to content

Commit

Permalink
chore(interop): Replace futures crates with tokio-stream
Browse files Browse the repository at this point in the history
  • Loading branch information
tottoto committed Jul 26, 2023
1 parent 1e98d99 commit bbc10d5
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 12 deletions.
2 changes: 0 additions & 2 deletions interop/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ async-stream = "0.3"
strum = {version = "0.25", features = ["derive"]}
pico-args = {version = "0.5", features = ["eq-separator"]}
console = "0.15"
futures-core = "0.3"
futures-util = "0.3"
http = "0.2"
http-body = "0.4.2"
hyper = "0.14"
Expand Down
12 changes: 6 additions & 6 deletions interop/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use crate::{
pb::test_service_client::*, pb::unimplemented_service_client::*, pb::*, test_assert,
TestAssertion,
};
use futures_util::{future, stream, StreamExt};
use tokio::sync::mpsc;
use tokio_stream::StreamExt;
use tonic::transport::Channel;
use tonic::{metadata::MetadataValue, Code, Request, Response, Status};

Expand Down Expand Up @@ -87,7 +87,7 @@ pub async fn client_streaming(client: &mut TestClient, assertions: &mut Vec<Test
..Default::default()
});

let stream = stream::iter(requests);
let stream = tokio_stream::iter(requests);

let result = client.streaming_input_call(Request::new(stream)).await;

Expand Down Expand Up @@ -129,7 +129,7 @@ pub async fn server_streaming(client: &mut TestClient, assertions: &mut Vec<Test
if let Ok(response) = result {
let responses = response
.into_inner()
.filter_map(|m| future::ready(m.ok()))
.filter_map(|m| m.ok())
.collect::<Vec<_>>()
.await;
let actual_response_lengths = crate::response_lengths(&responses);
Expand Down Expand Up @@ -207,7 +207,7 @@ pub async fn ping_pong(client: &mut TestClient, assertions: &mut Vec<TestAsserti
}

pub async fn empty_stream(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
let stream = stream::iter(Vec::new());
let stream = tokio_stream::iter(Vec::new());
let result = client.full_duplex_call(Request::new(stream)).await;

assertions.push(test_assert!(
Expand Down Expand Up @@ -270,7 +270,7 @@ pub async fn status_code_and_message(client: &mut TestClient, assertions: &mut V
let result = client.unary_call(Request::new(simple_req)).await;
validate_response(result, assertions);

let stream = stream::iter(vec![duplex_req]);
let stream = tokio_stream::iter(vec![duplex_req]);
let result = match client.full_duplex_call(Request::new(stream)).await {
Ok(response) => {
let stream = response.into_inner();
Expand Down Expand Up @@ -356,7 +356,7 @@ pub async fn custom_metadata(client: &mut TestClient, assertions: &mut Vec<TestA
req_unary.metadata_mut().insert(key1, value1.clone());
req_unary.metadata_mut().insert_bin(key2, value2.clone());

let stream = stream::iter(vec![make_ping_pong_request(0)]);
let stream = tokio_stream::iter(vec![make_ping_pong_request(0)]);
let mut req_stream = Request::new(stream);
req_stream.metadata_mut().insert(key1, value1.clone());
req_stream.metadata_mut().insert_bin(key2, value2.clone());
Expand Down
8 changes: 4 additions & 4 deletions interop/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::pb::{self, *};
use async_stream::try_stream;
use futures_util::{stream, StreamExt, TryStreamExt};
use http::header::{HeaderMap, HeaderName, HeaderValue};
use http_body::Body;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio_stream::StreamExt;
use tonic::{body::BoxBody, transport::NamedService, Code, Request, Response, Status};
use tower::Service;

Expand All @@ -19,7 +19,7 @@ pub struct TestService;
type Result<T> = std::result::Result<Response<T>, Status>;
type Streaming<T> = Request<tonic::Streaming<T>>;
type Stream<T> =
Pin<Box<dyn futures_core::Stream<Item = std::result::Result<T, Status>> + Send + 'static>>;
Pin<Box<dyn tokio_stream::Stream<Item = std::result::Result<T, Status>> + Send + 'static>>;
type BoxFuture<T, E> = Pin<Box<dyn Future<Output = std::result::Result<T, E>> + Send + 'static>>;

#[tonic::async_trait]
Expand Down Expand Up @@ -115,7 +115,7 @@ impl pb::test_service_server::TestService for TestService {
return Err(status);
}

let single_message = stream::iter(vec![Ok(first_msg)]);
let single_message = tokio_stream::iter(vec![Ok(first_msg)]);
let mut stream = single_message.chain(stream);

let stream = try_stream! {
Expand All @@ -136,7 +136,7 @@ impl pb::test_service_server::TestService for TestService {

Ok(Response::new(Box::pin(stream) as Self::FullDuplexCallStream))
} else {
let stream = stream::empty();
let stream = tokio_stream::empty();
Ok(Response::new(Box::pin(stream) as Self::FullDuplexCallStream))
}
}
Expand Down

0 comments on commit bbc10d5

Please sign in to comment.