From 1ed8daf2dcf3a3d156fc5ccb53faf238a2320684 Mon Sep 17 00:00:00 2001 From: 0xterminator Date: Mon, 30 Sep 2024 06:19:07 +0300 Subject: [PATCH] feat(repo): Parallel publishing of multiple streams (optimization) (#240) feat(repo): parallel publishing of multiple streams (optimization) --- .../fuel-streams-publisher/src/publisher.rs | 124 +++++++++--------- 1 file changed, 62 insertions(+), 62 deletions(-) diff --git a/crates/fuel-streams-publisher/src/publisher.rs b/crates/fuel-streams-publisher/src/publisher.rs index 2a415b0..f9bd8ee 100644 --- a/crates/fuel-streams-publisher/src/publisher.rs +++ b/crates/fuel-streams-publisher/src/publisher.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Instant}; use async_nats::{jetstream::stream::State as StreamState, RequestErrorKind}; use fuel_core::database::database_description::DatabaseHeight; @@ -338,68 +338,68 @@ impl Publisher { ) -> anyhow::Result<()> { let block_height = block.header().consensus().height; - blocks::publish( - &self.metrics, - &*self.fuel_core, - &self.streams.blocks, - block, - block_producer, - ) - .await?; - - transactions::publish( - &self.metrics, - &*self.fuel_core, - &self.streams.transactions, - block.transactions(), - block_producer, - block_height.into(), - ) - .await?; - - receipts::publish( - &self.metrics, - &*self.fuel_core, - &self.streams.receipts, - block.transactions(), - block_producer, - ) - .await?; - - logs::publish( - &self.metrics, - &*self.fuel_core, - &self.streams.logs, - block.transactions(), - block_producer, - block_height.into(), - ) - .await?; - - inputs::publish( - &self.metrics, - &self.streams.inputs, - &*self.fuel_core, - block.transactions(), - block_producer, - ) - .await?; - - utxos::publish( - &self.metrics, - &self.streams.utxos, - &*self.fuel_core, - block.transactions(), - block_producer, - ) - .await?; + let publish_streams = vec![ + blocks::publish( + &self.metrics, + &*self.fuel_core, + &self.streams.blocks, + block, + block_producer, + ) + .boxed(), + transactions::publish( + &self.metrics, + &*self.fuel_core, + &self.streams.transactions, + block.transactions(), + block_producer, + block_height.into(), + ) + .boxed(), + receipts::publish( + &self.metrics, + &*self.fuel_core, + &self.streams.receipts, + block.transactions(), + block_producer, + ) + .boxed(), + logs::publish( + &self.metrics, + &*self.fuel_core, + &self.streams.logs, + block.transactions(), + block_producer, + block_height.into(), + ) + .boxed(), + inputs::publish( + &self.metrics, + &self.streams.inputs, + &*self.fuel_core, + block.transactions(), + block_producer, + ) + .boxed(), + utxos::publish( + &self.metrics, + &self.streams.utxos, + &*self.fuel_core, + block.transactions(), + block_producer, + ) + .boxed(), + outputs::publish( + &self.streams.outputs, + self.fuel_core.chain_id(), + block.transactions(), + ) + .boxed(), + ]; - outputs::publish( - &self.streams.outputs, - self.fuel_core.chain_id(), - block.transactions(), - ) - .await?; + let time_start = Instant::now(); + try_join_all(publish_streams).await?; + tracing::info!("Published all data streams for block height {block_height} in {:?} ms", time_start.elapsed().as_millis()); Ok(()) }