Skip to content

Commit

Permalink
feat(repo): Parallel publishing of multiple streams (optimization) (#240
Browse files Browse the repository at this point in the history
)

feat(repo): parallel publishing of multiple streams (optimization)
  • Loading branch information
0xterminator committed Sep 30, 2024
1 parent a5b5cc7 commit 1ed8daf
Showing 1 changed file with 62 additions and 62 deletions.
124 changes: 62 additions & 62 deletions crates/fuel-streams-publisher/src/publisher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{sync::Arc, time::Instant};

use async_nats::{jetstream::stream::State as StreamState, RequestErrorKind};
use fuel_core::database::database_description::DatabaseHeight;
Expand Down Expand Up @@ -338,68 +338,68 @@ impl Publisher {
) -> anyhow::Result<()> {
let block_height = block.header().consensus().height;

blocks::publish(
&self.metrics,
&*self.fuel_core,
&self.streams.blocks,
block,
block_producer,
)
.await?;

transactions::publish(
&self.metrics,
&*self.fuel_core,
&self.streams.transactions,
block.transactions(),
block_producer,
block_height.into(),
)
.await?;

receipts::publish(
&self.metrics,
&*self.fuel_core,
&self.streams.receipts,
block.transactions(),
block_producer,
)
.await?;

logs::publish(
&self.metrics,
&*self.fuel_core,
&self.streams.logs,
block.transactions(),
block_producer,
block_height.into(),
)
.await?;

inputs::publish(
&self.metrics,
&self.streams.inputs,
&*self.fuel_core,
block.transactions(),
block_producer,
)
.await?;

utxos::publish(
&self.metrics,
&self.streams.utxos,
&*self.fuel_core,
block.transactions(),
block_producer,
)
.await?;
let publish_streams = vec![
blocks::publish(
&self.metrics,
&*self.fuel_core,
&self.streams.blocks,
block,
block_producer,
)
.boxed(),
transactions::publish(
&self.metrics,
&*self.fuel_core,
&self.streams.transactions,
block.transactions(),
block_producer,
block_height.into(),
)
.boxed(),
receipts::publish(
&self.metrics,
&*self.fuel_core,
&self.streams.receipts,
block.transactions(),
block_producer,
)
.boxed(),
logs::publish(
&self.metrics,
&*self.fuel_core,
&self.streams.logs,
block.transactions(),
block_producer,
block_height.into(),
)
.boxed(),
inputs::publish(
&self.metrics,
&self.streams.inputs,
&*self.fuel_core,
block.transactions(),
block_producer,
)
.boxed(),
utxos::publish(
&self.metrics,
&self.streams.utxos,
&*self.fuel_core,
block.transactions(),
block_producer,
)
.boxed(),
outputs::publish(
&self.streams.outputs,
self.fuel_core.chain_id(),
block.transactions(),
)
.boxed(),
];

outputs::publish(
&self.streams.outputs,
self.fuel_core.chain_id(),
block.transactions(),
)
.await?;
let time_start = Instant::now();
try_join_all(publish_streams).await?;
tracing::info!("Published all data streams for block height {block_height} in {:?} ms", time_start.elapsed().as_millis());

Ok(())
}
Expand Down

0 comments on commit 1ed8daf

Please sign in to comment.