From 9cb3ab5f4d8cbf52b268b74c74c7b1126719b2b5 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 8 Feb 2024 17:37:25 +0000 Subject: [PATCH] Added improved implementation for reorged logs subscription --- crates/node-core/src/args/rpc_server_args.rs | 5 +- crates/rpc/rpc-builder/src/auth.rs | 30 ++++- crates/rpc/rpc-builder/src/lib.rs | 13 +- crates/rpc/rpc/src/eth/filter.rs | 131 +++++++++++++++++-- crates/rpc/rpc/src/eth/pubsub.rs | 11 +- 5 files changed, 164 insertions(+), 26 deletions(-) diff --git a/crates/node-core/src/args/rpc_server_args.rs b/crates/node-core/src/args/rpc_server_args.rs index e36d6d7f31d0..d9aef220136c 100644 --- a/crates/node-core/src/args/rpc_server_args.rs +++ b/crates/node-core/src/args/rpc_server_args.rs @@ -385,7 +385,7 @@ impl RpcServerArgs { /// Create Engine API server. #[allow(clippy::too_many_arguments)] - pub async fn start_auth_server( + pub async fn start_auth_server( &self, provider: Provider, pool: Pool, @@ -394,6 +394,7 @@ impl RpcServerArgs { engine_api: EngineApi, jwt_secret: JwtSecret, evm_config: EvmConfig, + events: Events, ) -> Result where Provider: BlockReaderIdExt @@ -409,6 +410,7 @@ impl RpcServerArgs { Tasks: TaskSpawner + Clone + 'static, EngineT: EngineTypes + 'static, EvmConfig: EvmEnvConfig + 'static, + Events: CanonStateSubscriptions + Clone + 'static, { let socket_address = SocketAddr::new(self.auth_addr, self.auth_port); @@ -421,6 +423,7 @@ impl RpcServerArgs { socket_address, jwt_secret, evm_config, + events, ) .await } diff --git a/crates/rpc/rpc-builder/src/auth.rs b/crates/rpc/rpc-builder/src/auth.rs index 7626584903af..0d0a1e083714 100644 --- a/crates/rpc/rpc-builder/src/auth.rs +++ b/crates/rpc/rpc-builder/src/auth.rs @@ -14,8 +14,8 @@ use jsonrpsee::{ use reth_network_api::{NetworkInfo, Peers}; use reth_node_api::{EngineTypes, EvmEnvConfig}; use reth_provider::{ - BlockReaderIdExt, ChainSpecProvider, EvmEnvProvider, HeaderProvider, ReceiptProviderIdExt, - StateProviderFactory, + BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, EvmEnvProvider, HeaderProvider, + ReceiptProviderIdExt, StateProviderFactory, }; use reth_rpc::{ eth::{ @@ -35,7 +35,7 @@ use std::{ /// Configure and launch a _standalone_ auth server with `engine` and a _new_ `eth` namespace. #[allow(clippy::too_many_arguments)] -pub async fn launch( +pub async fn launch( provider: Provider, pool: Pool, network: Network, @@ -44,6 +44,7 @@ pub async fn launch Result where Provider: BlockReaderIdExt @@ -61,6 +62,7 @@ where EngineT: EngineTypes + 'static, EngineApi: EngineApiServer, EvmConfig: EvmEnvConfig + 'static, + Events: CanonStateSubscriptions + Clone + 'static, { // spawn a new cache task let eth_cache = EthStateCache::spawn_with( @@ -77,7 +79,7 @@ where let eth_api = EthApi::with_spawner( provider.clone(), pool.clone(), - network, + network.clone(), eth_cache.clone(), gas_oracle, EthConfig::default().rpc_gas_cap, @@ -89,9 +91,23 @@ where let config = EthFilterConfig::default() .max_logs_per_response(DEFAULT_MAX_LOGS_PER_RESPONSE) .max_blocks_per_filter(DEFAULT_MAX_BLOCKS_PER_FILTER); - let eth_filter = - EthFilter::new(provider, pool, eth_cache.clone(), config, Box::new(executor.clone())); - launch_with_eth_api(eth_api, eth_filter, engine_api, socket_addr, secret).await + + let eth_filter = EthFilter::new::( + provider.clone(), + pool.clone(), + eth_cache.clone(), + config, + Box::new(executor.clone()), + events.clone(), + ); + launch_with_eth_api::( + eth_api, + eth_filter, + engine_api, + socket_addr, + secret, + ) + .await } /// Configure and launch a _standalone_ auth server with existing EthApi implementation. diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index a6ddc8912bc2..b583c98eaad4 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -1322,20 +1322,21 @@ where fee_history_cache, self.evm_config.clone(), ); - let filter = EthFilter::new( + let pubsub = EthPubSub::with_spawner( self.provider.clone(), self.pool.clone(), - cache.clone(), - self.config.eth.filter_config(), + self.events.clone(), + self.network.clone(), executor.clone(), ); - let pubsub = EthPubSub::with_spawner( + let filter = EthFilter::new::( self.provider.clone(), self.pool.clone(), + cache.clone(), + self.config.eth.filter_config(), + executor.clone(), self.events.clone(), - self.network.clone(), - executor, ); let eth = EthHandlers { api, cache, filter, pubsub, blocking_task_pool }; diff --git a/crates/rpc/rpc/src/eth/filter.rs b/crates/rpc/rpc/src/eth/filter.rs index 94aed3e80752..11fe56f050a3 100644 --- a/crates/rpc/rpc/src/eth/filter.rs +++ b/crates/rpc/rpc/src/eth/filter.rs @@ -1,18 +1,22 @@ use super::cache::EthStateCache; use crate::{ - eth::{ - error::EthApiError, - logs_utils::{self, append_matching_block_logs}, - }, + eth::{error::EthApiError, logs_utils}, result::{rpc_error_with_code, ToRpcResult}, EthSubscriptionIdProvider, }; use core::fmt; +use futures::StreamExt; +use reth_network_api::NetworkInfo; +use tokio_stream::{wrappers::BroadcastStream, Stream}; +use alloy_primitives::B256; use async_trait::async_trait; use jsonrpsee::{core::RpcResult, server::IdProvider}; use reth_primitives::{IntoRecoveredTransaction, TxHash}; -use reth_provider::{BlockIdReader, BlockReader, EvmEnvProvider, ProviderError}; +use reth_provider::{ + chain::BlockReceipts, BlockIdReader, BlockReader, CanonStateSubscriptions, EvmEnvProvider, + ProviderError, +}; use reth_rpc_api::EthFilterApiServer; use reth_rpc_types::{ BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log, @@ -44,7 +48,7 @@ pub struct EthFilter { impl EthFilter where - Provider: Send + Sync + 'static, + Provider: BlockReader + EvmEnvProvider + Send + Sync + 'static, Pool: Send + Sync + 'static, { /// Creates a new, shareable instance. @@ -55,13 +59,18 @@ where /// See also [EthFilterConfig]. /// /// This also spawns a task that periodically clears stale filters. - pub fn new( + pub fn new( provider: Provider, pool: Pool, eth_cache: EthStateCache, config: EthFilterConfig, task_spawner: Box, - ) -> Self { + events: Events, + ) -> Self + where + Events: CanonStateSubscriptions + 'static, + Network: NetworkInfo + 'static, + { let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } = config; let inner = EthFilterInner { @@ -88,6 +97,14 @@ where }), ); + let also_this = eth_filter.clone(); + eth_filter.inner.task_spawner.clone().spawn_critical( + "eth-filters_update_reorged_logs", + Box::pin(async move { + also_this.update_reorged_logs::(events).await; + }), + ); + eth_filter } @@ -120,6 +137,93 @@ where is_valid }) } + + /// Endless future that updates the reorged logs for impacted filters + async fn update_reorged_logs(&self, events: Events) + where + Events: CanonStateSubscriptions + 'static, + Network: NetworkInfo + 'static, + { + let mut stream = self.block_receipts_stream::(events).await; + + let mut temp_reorged_blocks = Vec::new(); + let mut reverted_logs: Vec = Vec::new(); + let mut checkpoint_block: u64 = 1; + + while let Some((block_receipts, removed)) = stream.next().await { + let mut filters = self.active_filters().inner.lock().await; + let mut reorged_logs: Vec<(FilterId, Vec)> = Vec::new(); + + if removed { + let block_number = block_receipts.block.number; + temp_reorged_blocks.push(block_receipts); + + for reorged_block in &mut temp_reorged_blocks { + if checkpoint_block < reorged_block.block.number { + filters.iter().for_each(|(id, active_filter)| { + if block_number <= active_filter.block { + if let FilterKind::Log(ref filter) = active_filter.kind { + let filtered_params = + FilteredParams::new(Some(*filter.clone())); + + let mut matching_logs = + logs_utils::matching_block_logs_with_tx_hashes( + &filtered_params, + reorged_block.block, + reorged_block + .tx_receipts + .clone() + .iter() + .map(|(tx, receipt)| (*tx, receipt)), + true, + ); + + reverted_logs.append(&mut matching_logs); + if Some(reorged_block.block.hash) == active_filter.block_hash { + reorged_logs.push((id.clone(), reverted_logs.clone())); + } + } + } + }); + } + } + checkpoint_block = block_number; + } else { + temp_reorged_blocks.clear(); + reverted_logs.clear(); + checkpoint_block = 1; + } + + if let Some((id, mut logs)) = reorged_logs.pop() { + let active_filter = + filters.get_mut(&id).ok_or(FilterError::FilterNotFound(id)).unwrap(); + let guard = &mut active_filter.reorged_logs; + if let Some(reorged_logs_vec) = guard.as_mut() { + reorged_logs_vec.append(&mut logs); + } + } + } + } + + /// Generates a stream of block receipts + pub async fn block_receipts_stream( + &self, + events: Events, + ) -> impl Stream + '_ + where + Events: CanonStateSubscriptions + 'static, + Network: NetworkInfo + 'static, + { + BroadcastStream::new(events.subscribe_to_canonical_state()) + .map(move |canon_state| { + canon_state.expect("new block subscription never ends; qed").block_receipts() + }) + .flat_map(futures::stream::iter) + .flat_map(move |(block_receipts, removed)| { + let block_stream: Vec<(BlockReceipts, bool)> = vec![(block_receipts, removed)]; + futures::stream::iter(block_stream) + }) + } } impl EthFilter @@ -390,14 +494,19 @@ where /// Installs a new filter and returns the new identifier. async fn install_filter(&self, kind: FilterKind) -> RpcResult { let last_poll_block_number = self.provider.best_block_number().to_rpc_result()?; + let last_poll_block_hash = + self.provider.block_hash(last_poll_block_number).to_rpc_result()?; let id = FilterId::from(self.id_provider.next_id()); let mut filters = self.active_filters.inner.lock().await; + let reorged_logs: Option> = Some(Vec::new()); filters.insert( id.clone(), ActiveFilter { block: last_poll_block_number, + block_hash: last_poll_block_hash, last_poll_timestamp: Instant::now(), kind, + reorged_logs, }, ); Ok(id) @@ -450,7 +559,7 @@ where }; if let Some(receipts) = self.eth_cache.get_receipts(block_hash).await? { - append_matching_block_logs( + logs_utils::append_matching_block_logs( &mut all_logs, &self.provider, &filter_params, @@ -537,10 +646,14 @@ pub struct ActiveFilters { struct ActiveFilter { /// At which block the filter was polled last. block: u64, + /// Hash of the block at which the filter was polled last. + block_hash: Option, /// Last time this filter was polled. last_poll_timestamp: Instant, /// What kind of filter it is. kind: FilterKind, + /// Reorged logs + reorged_logs: Option>, } /// A receiver for pending transactions that returns all new transactions since the last poll. diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index bf114688b39a..a0760b6eddf7 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -61,6 +61,11 @@ impl EthPubSub let inner = EthPubSubInner { provider, pool, chain_events, network }; Self { inner: Arc::new(inner), subscription_task_spawner } } + + /// Method to get chain events for canonical state subscription + pub fn get_chain_events(&self) -> &Events { + &self.inner.chain_events + } } #[async_trait::async_trait] @@ -134,7 +139,7 @@ where ), )) }); - return pipe_from_stream(accepted_sink, stream).await + return pipe_from_stream(accepted_sink, stream).await; } Params::Bool(false) | Params::None => { // only hashes requested @@ -164,7 +169,7 @@ where // send the current status immediately let msg = SubscriptionMessage::from_json(¤t_sub_res)?; if accepted_sink.send(msg).await.is_err() { - return Ok(()) + return Ok(()); } while (canon_state.next().await).is_some() { @@ -178,7 +183,7 @@ where let sync_status = pubsub.sync_status(current_syncing).await; let msg = SubscriptionMessage::from_json(&sync_status)?; if accepted_sink.send(msg).await.is_err() { - break + break; } } }