From 771ee59ee6af6cfc5ea519477e947fd835ca4d32 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 15 Jan 2024 17:41:40 +0000 Subject: [PATCH] Tested changes --- bin/reth/src/args/rpc_server_args.rs | 6 +- crates/rpc/rpc-builder/src/auth.rs | 38 +++++++-- crates/rpc/rpc-builder/src/lib.rs | 15 ++-- crates/rpc/rpc/src/eth/filter.rs | 120 +++++++++++++++++++++++++-- crates/rpc/rpc/src/eth/pubsub.rs | 32 ++----- 5 files changed, 162 insertions(+), 49 deletions(-) diff --git a/bin/reth/src/args/rpc_server_args.rs b/bin/reth/src/args/rpc_server_args.rs index 9b153c2fbff49..32ffc2db38a93 100644 --- a/bin/reth/src/args/rpc_server_args.rs +++ b/bin/reth/src/args/rpc_server_args.rs @@ -331,7 +331,8 @@ impl RpcServerArgs { } /// Create Engine API server. - pub async fn start_auth_server( + #[allow(clippy::too_many_arguments)] + pub async fn start_auth_server( &self, provider: Provider, pool: Pool, @@ -339,6 +340,7 @@ impl RpcServerArgs { executor: Tasks, engine_api: EngineApi, jwt_secret: JwtSecret, + events: Events, ) -> Result where Provider: BlockReaderIdExt @@ -353,6 +355,7 @@ impl RpcServerArgs { Network: NetworkInfo + Peers + Clone + 'static, Tasks: TaskSpawner + Clone + 'static, EngineT: EngineTypes + 'static, + Events: CanonStateSubscriptions + Clone + 'static, { let socket_address = SocketAddr::new(self.auth_addr, self.auth_port); @@ -364,6 +367,7 @@ impl RpcServerArgs { engine_api, socket_address, jwt_secret, + events, ) .await } diff --git a/crates/rpc/rpc-builder/src/auth.rs b/crates/rpc/rpc-builder/src/auth.rs index 6bf01a883d939..cbceac633deb2 100644 --- a/crates/rpc/rpc-builder/src/auth.rs +++ b/crates/rpc/rpc-builder/src/auth.rs @@ -14,15 +14,15 @@ use jsonrpsee::{ use reth_network_api::{NetworkInfo, Peers}; use reth_node_api::EngineTypes; use reth_provider::{ - BlockReaderIdExt, ChainSpecProvider, EvmEnvProvider, HeaderProvider, ReceiptProviderIdExt, - StateProviderFactory, + BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, EvmEnvProvider, HeaderProvider, + ReceiptProviderIdExt, StateProviderFactory, }; use reth_rpc::{ eth::{ cache::EthStateCache, gas_oracle::GasPriceOracle, EthFilterConfig, FeeHistoryCache, FeeHistoryCacheConfig, }, - AuthLayer, BlockingTaskPool, Claims, EngineEthApi, EthApi, EthFilter, + AuthLayer, BlockingTaskPool, Claims, EngineEthApi, EthApi, EthFilter, EthPubSub, EthSubscriptionIdProvider, JwtAuthValidator, JwtSecret, }; use reth_rpc_api::{servers::*, EngineApiServer}; @@ -35,7 +35,7 @@ use std::{ /// Configure and launch a _standalone_ auth server with `engine` and a _new_ `eth` namespace. #[allow(clippy::too_many_arguments)] -pub async fn launch( +pub async fn launch( provider: Provider, pool: Pool, network: Network, @@ -43,6 +43,7 @@ pub async fn launch( engine_api: EngineApi, socket_addr: SocketAddr, secret: JwtSecret, + events: Events, ) -> Result where Provider: BlockReaderIdExt @@ -59,6 +60,7 @@ where Tasks: TaskSpawner + Clone + 'static, EngineT: EngineTypes, EngineApi: EngineApiServer, + Events: CanonStateSubscriptions + Clone + 'static, { // spawn a new cache task let eth_cache = @@ -71,7 +73,7 @@ where let eth_api = EthApi::with_spawner( provider.clone(), pool.clone(), - network, + network.clone(), eth_cache.clone(), gas_oracle, EthConfig::default().rpc_gas_cap, @@ -82,9 +84,29 @@ where let config = EthFilterConfig::default() .max_logs_per_response(DEFAULT_MAX_LOGS_PER_RESPONSE) .max_blocks_per_filter(DEFAULT_MAX_BLOCKS_PER_FILTER); - let eth_filter = - EthFilter::new(provider, pool, eth_cache.clone(), config, Box::new(executor.clone())); - launch_with_eth_api(eth_api, eth_filter, engine_api, socket_addr, secret).await + let eth_pubsub = EthPubSub::with_spawner( + provider.clone(), + pool.clone(), + events.clone(), + network.clone(), + Box::new(executor.clone()), + ); + let eth_filter = EthFilter::new( + provider.clone(), + pool.clone(), + eth_cache.clone(), + config, + Box::new(executor.clone()), + eth_pubsub.clone(), + ); + launch_with_eth_api::( + eth_api, + eth_filter, + engine_api, + socket_addr, + secret, + ) + .await } /// Configure and launch a _standalone_ auth server with existing EthApi implementation. diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 92a187b42292c..32c511aae7245 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -1236,20 +1236,21 @@ where blocking_task_pool.clone(), fee_history_cache, ); - let filter = EthFilter::new( + let pubsub = EthPubSub::with_spawner( self.provider.clone(), self.pool.clone(), - cache.clone(), - self.config.eth.filter_config(), + self.events.clone(), + self.network.clone(), executor.clone(), ); - let pubsub = EthPubSub::with_spawner( + let filter = EthFilter::new( self.provider.clone(), self.pool.clone(), - self.events.clone(), - self.network.clone(), - executor, + cache.clone(), + self.config.eth.filter_config(), + executor.clone(), + pubsub.clone(), ); let eth = EthHandlers { api, cache, filter, pubsub, blocking_task_pool }; diff --git a/crates/rpc/rpc/src/eth/filter.rs b/crates/rpc/rpc/src/eth/filter.rs index 94aed3e807527..4a673c7cfafe1 100644 --- a/crates/rpc/rpc/src/eth/filter.rs +++ b/crates/rpc/rpc/src/eth/filter.rs @@ -1,18 +1,21 @@ use super::cache::EthStateCache; use crate::{ - eth::{ - error::EthApiError, - logs_utils::{self, append_matching_block_logs}, - }, + eth::{error::EthApiError, logs_utils, pubsub::EthPubSub}, result::{rpc_error_with_code, ToRpcResult}, EthSubscriptionIdProvider, }; use core::fmt; +use futures::{executor, StreamExt}; +use reth_network_api::NetworkInfo; +use tokio_stream::{wrappers::BroadcastStream, Stream}; +use alloy_primitives::B256; use async_trait::async_trait; use jsonrpsee::{core::RpcResult, server::IdProvider}; use reth_primitives::{IntoRecoveredTransaction, TxHash}; -use reth_provider::{BlockIdReader, BlockReader, EvmEnvProvider, ProviderError}; +use reth_provider::{ + BlockIdReader, BlockReader, CanonStateSubscriptions, EvmEnvProvider, ProviderError, +}; use reth_rpc_api::EthFilterApiServer; use reth_rpc_types::{ BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log, @@ -44,7 +47,7 @@ pub struct EthFilter { impl EthFilter where - Provider: Send + Sync + 'static, + Provider: BlockReader + EvmEnvProvider + Send + Sync + 'static, Pool: Send + Sync + 'static, { /// Creates a new, shareable instance. @@ -55,13 +58,18 @@ where /// See also [EthFilterConfig]. /// /// This also spawns a task that periodically clears stale filters. - pub fn new( + pub fn new( provider: Provider, pool: Pool, eth_cache: EthStateCache, config: EthFilterConfig, task_spawner: Box, - ) -> Self { + pubsub: EthPubSub, + ) -> Self + where + Events: CanonStateSubscriptions + 'static, + Network: NetworkInfo + 'static, + { let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } = config; let inner = EthFilterInner { @@ -88,6 +96,14 @@ where }), ); + let also_this = eth_filter.clone(); + eth_filter.inner.task_spawner.clone().spawn_critical( + "eth-filters_update_reorged_logs", + Box::pin(async move { + also_this.update_reorged_logs(pubsub).await; + }), + ); + eth_filter } @@ -120,6 +136,83 @@ where is_valid }) } + + /// Endless future that updates the reorged logs for impacted filters + async fn update_reorged_logs( + &self, + pubsub: EthPubSub, + ) where + Events: CanonStateSubscriptions + 'static, + Network: NetworkInfo + 'static, + { + let mut stream = self.reorged_logs_stream(pubsub).await; + while let Some((id, mut logs)) = stream.next().await { + let mut filters = executor::block_on(self.active_filters().inner.lock()); + let active_filter = + filters.get_mut(&id).ok_or(FilterError::FilterNotFound(id)).unwrap(); + let mut guard = active_filter.reorged_logs.lock().await; + if let Some(reorged_logs_vec) = guard.as_mut() { + reorged_logs_vec.append(&mut logs); + } + } + } + + /// Reacts to reorged blocks, checks impacted log filters, stores reorged logs in the log filter + pub async fn reorged_logs_stream( + &self, + pubsub: EthPubSub, + ) -> impl Stream)> + '_ + where + Events: CanonStateSubscriptions + 'static, + Network: NetworkInfo + 'static, + { + let mut temp_reorged_blocks = Vec::new(); + + BroadcastStream::new(pubsub.get_chain_events().subscribe_to_canonical_state()) + .map(move |canon_state| { + canon_state.expect("new block subscription never ends; qed").block_receipts() + }) + .flat_map(futures::stream::iter) + .flat_map(move |(block_receipts, removed)| { + let mut reorged_logs: Vec<(FilterId, Vec)> = Vec::new(); + + if removed { + temp_reorged_blocks.push(block_receipts); + + let filters = executor::block_on(self.active_filters().inner.lock()); + + filters.iter().for_each(|(id, active_filter)| { + if let FilterKind::Log(ref filter) = active_filter.kind { + let mut reverted_logs: Vec = Vec::new(); + let filtered_params = FilteredParams::new(Some(*filter.clone())); + + for reorged_block in &mut temp_reorged_blocks { + let mut matching_logs = + logs_utils::matching_block_logs_with_tx_hashes( + &filtered_params, + reorged_block.block, + reorged_block + .tx_receipts + .clone() + .iter() + .map(|(tx, receipt)| (*tx, receipt)), + true, + ); + + reverted_logs.append(&mut matching_logs); + if Some(reorged_block.block.hash) == active_filter.block_hash { + reorged_logs.push((id.clone(), reverted_logs.clone())); + } + } + } + }); + } else { + temp_reorged_blocks.clear(); + } + + futures::stream::iter(reorged_logs) + }) + } } impl EthFilter @@ -390,14 +483,19 @@ where /// Installs a new filter and returns the new identifier. async fn install_filter(&self, kind: FilterKind) -> RpcResult { let last_poll_block_number = self.provider.best_block_number().to_rpc_result()?; + let last_poll_block_hash = + self.provider.block_hash(last_poll_block_number).to_rpc_result()?; let id = FilterId::from(self.id_provider.next_id()); let mut filters = self.active_filters.inner.lock().await; + let reorged_logs: Arc>>> = Arc::new(Mutex::new(Some(Vec::new()))); filters.insert( id.clone(), ActiveFilter { block: last_poll_block_number, + block_hash: last_poll_block_hash, last_poll_timestamp: Instant::now(), kind, + reorged_logs, }, ); Ok(id) @@ -450,7 +548,7 @@ where }; if let Some(receipts) = self.eth_cache.get_receipts(block_hash).await? { - append_matching_block_logs( + logs_utils::append_matching_block_logs( &mut all_logs, &self.provider, &filter_params, @@ -537,10 +635,14 @@ pub struct ActiveFilters { struct ActiveFilter { /// At which block the filter was polled last. block: u64, + /// Hash of the block at which the filter was polled last. + block_hash: Option, /// Last time this filter was polled. last_poll_timestamp: Instant, /// What kind of filter it is. kind: FilterKind, + /// Reorged logs + reorged_logs: Arc>>>, } /// A receiver for pending transactions that returns all new transactions since the last poll. diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index bf114688b39a9..b50c0b7edf66e 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -1,5 +1,4 @@ //! `eth_` PubSub RPC handler implementation - use crate::{eth::logs_utils, result::invalid_params_rpc_err}; use futures::StreamExt; use jsonrpsee::{server::SubscriptionMessage, PendingSubscriptionSink, SubscriptionSink}; @@ -22,7 +21,6 @@ use tokio_stream::{ wrappers::{BroadcastStream, ReceiverStream}, Stream, }; - /// `Eth` pubsub RPC implementation. /// /// This handles `eth_subscribe` RPC calls. @@ -33,9 +31,7 @@ pub struct EthPubSub { /// The type that's used to spawn subscription tasks. subscription_task_spawner: Box, } - // === impl EthPubSub === - impl EthPubSub { /// Creates a new, shareable instance. /// @@ -49,7 +45,6 @@ impl EthPubSub Box::::default(), ) } - /// Creates a new, shareable instance. pub fn with_spawner( provider: Provider, @@ -61,6 +56,11 @@ impl EthPubSub let inner = EthPubSubInner { provider, pool, chain_events, network }; Self { inner: Arc::new(inner), subscription_task_spawner } } + + /// Method to get chain events for canonical state subscription + pub fn get_chain_events(&self) -> &Events { + &self.inner.chain_events + } } #[async_trait::async_trait] @@ -84,11 +84,9 @@ where self.subscription_task_spawner.spawn(Box::pin(async move { let _ = handle_accepted(pubsub, sink, kind, params).await; })); - Ok(()) } } - /// The actual handler for an accepted [`EthPubSub::subscribe`] call. async fn handle_accepted( pubsub: Arc>, @@ -134,7 +132,7 @@ where ), )) }); - return pipe_from_stream(accepted_sink, stream).await + return pipe_from_stream(accepted_sink, stream).await; } Params::Bool(false) | Params::None => { // only hashes requested @@ -147,7 +145,6 @@ where } } } - let stream = pubsub .pending_transaction_hashes_stream() .map(EthSubscriptionResult::TransactionHash); @@ -160,34 +157,29 @@ where // get current sync status let mut initial_sync_status = pubsub.network.is_syncing(); let current_sub_res = pubsub.sync_status(initial_sync_status).await; - // send the current status immediately let msg = SubscriptionMessage::from_json(¤t_sub_res)?; if accepted_sink.send(msg).await.is_err() { - return Ok(()) + return Ok(()); } - while (canon_state.next().await).is_some() { let current_syncing = pubsub.network.is_syncing(); // Only send a new response if the sync status has changed if current_syncing != initial_sync_status { // Update the sync status on each new block initial_sync_status = current_syncing; - // send a new message now that the status changed let sync_status = pubsub.sync_status(current_syncing).await; let msg = SubscriptionMessage::from_json(&sync_status)?; if accepted_sink.send(msg).await.is_err() { - break + break; } } } - Ok(()) } } } - /// Pipes all stream items to the subscription sink. async fn pipe_from_stream( sink: SubscriptionSink, @@ -219,7 +211,6 @@ where } } } - impl std::fmt::Debug for EthPubSub { @@ -227,7 +218,6 @@ impl std::fmt::Debug f.debug_struct("EthPubSub").finish_non_exhaustive() } } - /// Container type `EthPubSub` #[derive(Clone)] struct EthPubSubInner { @@ -240,9 +230,7 @@ struct EthPubSubInner { /// The network. network: Network, } - // == impl EthPubSubInner === - impl EthPubSubInner where Provider: BlockReader + 'static, @@ -263,7 +251,6 @@ where } } } - impl EthPubSubInner where Pool: TransactionPool + 'static, @@ -272,7 +259,6 @@ where fn pending_transaction_hashes_stream(&self) -> impl Stream { ReceiverStream::new(self.pool.pending_transactions_listener()) } - /// Returns a stream that yields all transactions emitted by the txpool. fn full_pending_transaction_stream( &self, @@ -280,7 +266,6 @@ where self.pool.new_pending_pool_transactions_listener() } } - impl EthPubSubInner where Provider: BlockReader + EvmEnvProvider + 'static, @@ -300,7 +285,6 @@ where ) }) } - /// Returns a stream that yields all logs that match the given filter. fn log_stream(&self, filter: FilteredParams) -> impl Stream { BroadcastStream::new(self.chain_events.subscribe_to_canonical_state())