Skip to content

Commit

Permalink
read all stream instead of single
Browse files Browse the repository at this point in the history
Signed-off-by: borngraced <[email protected]>
  • Loading branch information
borngraced committed Dec 18, 2023
1 parent 1d89f45 commit 2a64404
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 13 deletions.
2 changes: 1 addition & 1 deletion mm2src/coins/z_coin/z_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ impl ZRpcOps for LightRpcClient {
.into_inner();
// without Pin method get_mut is not found in current scope
while let Some(block) = Pin::new(&mut response).get_mut().message().await? {
debug!("Got block {:?}", block);
debug!("Got block {}", block.height);
let height = u32::try_from(block.height)
.map_err(|_| UpdateBlocksCacheErr::DecodeError("Block height too large".to_string()))?;
db.insert_block(height, block.encode_to_vec())
Expand Down
1 change: 1 addition & 0 deletions mm2src/mm2_net/src/grpc_web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ pub enum PostGrpcWebErr {
DecodeBody(String),
EncodeBody(String),
InvalidRequest(String),
BadResponse(String),
Internal(String),
PayloadTooShort(String),
Status(String),
Expand Down
38 changes: 26 additions & 12 deletions mm2src/mm2_net/src/wasm/body_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ pub struct ResponseBody {
impl ResponseBody {
/// Creates a new `ResponseBody` based on a ReadableStream and content type.
pub(crate) async fn new(body_stream: ReadableStream, content_type: &str) -> Result<Self, PostGrpcWebErr> {
let body_stream: ReadableStreamDefaultReader = body_stream.get_reader().dyn_into().unwrap();
let body_stream: ReadableStreamDefaultReader = body_stream
.get_reader()
.dyn_into()
.map_err(|err| PostGrpcWebErr::BadResponse(format!("{err:?}")))?;

Ok(Self {
body_stream: BodyStream::new(body_stream).await?,
Expand Down Expand Up @@ -350,19 +353,30 @@ pub struct BodyStream {
impl BodyStream {
/// Creates a new `BodyStream` based on an `ReadableStreamDefaultReader`.
pub async fn new(body_stream: ReadableStreamDefaultReader) -> Result<Self, PostGrpcWebErr> {
let value = JsFuture::from(body_stream.read())
.await
.map_err(|err| PostGrpcWebErr::InvalidRequest(format!("{err:?}")))?;
let object: Object = value
.dyn_into()
.map_err(|err| PostGrpcWebErr::InvalidRequest(format!("{err:?}")))?;
let object_value = js_sys::Reflect::get(&object, &JsValue::from_str("value"))
.map_err(|err| PostGrpcWebErr::InvalidRequest(format!("{err:?}")))?;
let chunk = Uint8Array::new(&object_value).to_vec();
let bytes_stream = Box::pin(stream::once(async { Ok(Bytes::from(chunk)) }));
let mut chunks = vec![];
loop {
let value = JsFuture::from(body_stream.read())
.await
.map_err(|err| PostGrpcWebErr::InvalidRequest(format!("{err:?}")))?;
let object: Object = value
.dyn_into()
.map_err(|err| PostGrpcWebErr::BadResponse(format!("{err:?}")))?;
let object_value = js_sys::Reflect::get(&object, &JsValue::from_str("value"))
.map_err(|err| PostGrpcWebErr::BadResponse(format!("{err:?}")))?;
let object_progress = js_sys::Reflect::get(&object, &JsValue::from_str("done"))
.map_err(|err| PostGrpcWebErr::BadResponse(format!("{err:?}")))?;
let chunk = Uint8Array::new(&object_value).to_vec();
chunks.extend_from_slice(&chunk);

if object_progress.as_bool().ok_or_else(|| {
PostGrpcWebErr::BadResponse("Expected done(bool) field in json object response".to_string())
})? {
break;
}
}

Ok(Self {
body_stream: bytes_stream,
body_stream: Box::pin(stream::once(async { Ok(Bytes::from(chunks)) })),
})
}

Expand Down

0 comments on commit 2a64404

Please sign in to comment.