diff --git a/Cargo.lock b/Cargo.lock index 2721c182415d..b3fc50c262f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -195,21 +195,20 @@ dependencies = [ [[package]] name = "alloy-rlp" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc0fac0fc16baf1f63f78b47c3d24718f3619b0714076f6a02957d808d52cbef" +checksum = "8d58d9f5da7b40e9bfff0b7e7816700be4019db97d4b6359fe7f94a9e22e42ac" dependencies = [ "alloy-rlp-derive", "arrayvec", "bytes", - "smol_str", ] [[package]] name = "alloy-rlp-derive" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0391754c09fab4eae3404d19d0d297aa1c670c1775ab51d8a5312afeca23157" +checksum = "1a047897373be4bbb0224c1afdabca92648dc57a9c9ef6e7b0be3aff7a859c83" dependencies = [ "proc-macro2", "quote", @@ -326,9 +325,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.75" +version = "1.0.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" +checksum = "59d2a3357dde987206219e78ecfbbb6e8dad06cbb65292758d3270e6254f7355" [[package]] name = "aquamarine" @@ -563,9 +562,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.74" +version = "0.1.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" +checksum = "fdf6721fb0140e4f897002dd086c06f6c27775df19cfe1fccb21181a48fd2c98" dependencies = [ "proc-macro2", "quote", @@ -3282,9 +3281,9 @@ dependencies = [ [[package]] name = "hyper" -version = "0.14.27" +version = "0.14.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468" +checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80" dependencies = [ "bytes", "futures-channel", @@ -3297,7 +3296,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.5", "tokio", "tower-service", "tracing", @@ -3351,7 +3350,7 @@ dependencies = [ "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows-core", + "windows-core 0.51.1", ] [[package]] @@ -4123,9 +4122,9 @@ checksum = "3ea9b256699eda7b0387ffbc776dd625e28bde3918446381781245b7a50349d8" [[package]] name = "mach2" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d0d1830bcd151a6fc4aea1369af235b36c1528fe976b8ff678683c9995eade8" +checksum = "19b955cdeb2a02b9117f121ce63aa52d08ade45de53e48fe6a38b39c10f6f709" dependencies = [ "libc", ] @@ -4242,9 +4241,9 @@ dependencies = [ [[package]] name = "metrics-process" -version = "1.0.13" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2674a02f6ad51326c2106d9aa5a07d1f759695b655c06df0bba5d5fb338ac0a4" +checksum = "6aa2a67e2580fbeba4d5a96e659945981e700a383b4cea1432e0cfc18f58c5da" dependencies = [ "libproc", "mach2", @@ -4954,9 +4953,9 @@ dependencies = [ [[package]] name = "pkg-config" -version = "0.3.27" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +checksum = "69d3587f8a9e599cc7ec2c00e331f71c4e69a5f9a4b8a6efd5b07466b9736f9a" [[package]] name = "plain_hasher" @@ -5181,9 +5180,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.70" +version = "1.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39278fbbf5fb4f646ce651690877f89d1c5811a3d4acb27700c1cb3cdb78fd3b" +checksum = "75cb1540fadbd5b8fbccc4dddad2734eba435053f725621c070711a14bb5f4b8" dependencies = [ "unicode-ident", ] @@ -5516,9 +5515,9 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.11.22" +version = "0.11.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" +checksum = "37b1ae8d9ac08420c66222fb9096fc5de435c3c48542bc5336c51892cffafb41" dependencies = [ "base64 0.21.5", "bytes", @@ -6504,6 +6503,8 @@ dependencies = [ "serde_with", "similar-asserts", "thiserror", + "tokio", + "tokio-util", "url", ] @@ -7242,9 +7243,9 @@ dependencies = [ [[package]] name = "serde_spanned" -version = "0.6.4" +version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12022b835073e5b11e90a14f86838ceb1c8fb0325b72416845c487ac0fa95e80" +checksum = "eb3622f419d1296904700073ea6cc23ad690adbd66f13ea683df73298736f0c1" dependencies = [ "serde", ] @@ -7498,15 +7499,6 @@ dependencies = [ "serde", ] -[[package]] -name = "smol_str" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74212e6bbe9a4352329b2f68ba3130c15a3f26fe88ff22dbdc6cdd58fa85e99c" -dependencies = [ - "serde", -] - [[package]] name = "snap" version = "1.1.1" @@ -7735,7 +7727,6 @@ checksum = "285ba80e733fac80aa4270fbcdf83772a79b80aa35c97075320abfee4a915b06" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", "unicode-xid", ] @@ -7893,9 +7884,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4a34ab300f2dee6e562c10a046fc05e358b29f9bf92277f30c3c8d82275f6f5" +checksum = "f657ba42c3f86e7680e53c8cd3af8abbe56b5491790b46e22e19c0d57463583e" dependencies = [ "deranged", "itoa", @@ -7915,9 +7906,9 @@ checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.15" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ad70d68dba9e1f8aceda7aa6711965dfec1cac869f311a51bd08b3a2ccbce20" +checksum = "26197e33420244aeb70c3e8c78376ca46571bc4e701e4791c2cd9f57dcb3a43f" dependencies = [ "time-core", ] @@ -7969,9 +7960,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.35.0" +version = "1.35.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "841d45b238a16291a4e1584e61820b8ae57d696cc5015c459c229ccc6990cc1c" +checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104" dependencies = [ "backtrace", "bytes", @@ -8382,9 +8373,9 @@ checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "trybuild" -version = "1.0.85" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "196a58260a906cedb9bf6d8034b6379d0c11f552416960452f267402ceeddff1" +checksum = "8419ecd263363827c5730386f418715766f584e2f874d32c23c5b00bd9727e7e" dependencies = [ "basic-toml", "glob", @@ -8804,12 +8795,12 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] name = "windows" -version = "0.51.1" +version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca229916c5ee38c2f2bc1e9d8f04df975b4bd93f9955dc69fabb5d91270045c9" +checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" dependencies = [ - "windows-core", - "windows-targets 0.48.5", + "windows-core 0.52.0", + "windows-targets 0.52.0", ] [[package]] @@ -8821,6 +8812,15 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.0", +] + [[package]] name = "windows-sys" version = "0.45.0" diff --git a/bin/reth/src/args/rpc_server_args.rs b/bin/reth/src/args/rpc_server_args.rs index 54f0f040fda0..93a0a0cb5a57 100644 --- a/bin/reth/src/args/rpc_server_args.rs +++ b/bin/reth/src/args/rpc_server_args.rs @@ -283,7 +283,8 @@ impl RpcServerArgs { } /// Create Engine API server. - pub async fn start_auth_server( + #[allow(clippy::too_many_arguments)] + pub async fn start_auth_server( &self, provider: Provider, pool: Pool, @@ -291,6 +292,7 @@ impl RpcServerArgs { executor: Tasks, engine_api: EngineApi, jwt_secret: JwtSecret, + events: Events, ) -> Result where Provider: BlockReaderIdExt @@ -304,6 +306,7 @@ impl RpcServerArgs { Pool: TransactionPool + Clone + 'static, Network: NetworkInfo + Peers + Clone + 'static, Tasks: TaskSpawner + Clone + 'static, + Events: CanonStateSubscriptions + Clone + 'static, { let socket_address = SocketAddr::new(self.auth_addr, self.auth_port); @@ -315,6 +318,7 @@ impl RpcServerArgs { engine_api, socket_address, jwt_secret, + events, ) .await } diff --git a/crates/rpc/rpc-builder/src/auth.rs b/crates/rpc/rpc-builder/src/auth.rs index 7f1158e1cf5c..112143c5aade 100644 --- a/crates/rpc/rpc-builder/src/auth.rs +++ b/crates/rpc/rpc-builder/src/auth.rs @@ -12,15 +12,15 @@ use jsonrpsee::{ }; use reth_network_api::{NetworkInfo, Peers}; use reth_provider::{ - BlockReaderIdExt, ChainSpecProvider, EvmEnvProvider, HeaderProvider, ReceiptProviderIdExt, - StateProviderFactory, + BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, EvmEnvProvider, HeaderProvider, + ReceiptProviderIdExt, StateProviderFactory, }; use reth_rpc::{ eth::{ cache::EthStateCache, gas_oracle::GasPriceOracle, EthFilterConfig, FeeHistoryCache, FeeHistoryCacheConfig, }, - AuthLayer, BlockingTaskPool, Claims, EngineEthApi, EthApi, EthFilter, + AuthLayer, BlockingTaskPool, Claims, EngineEthApi, EthApi, EthFilter, EthPubSub, EthSubscriptionIdProvider, JwtAuthValidator, JwtSecret, }; use reth_rpc_api::{servers::*, EngineApiServer}; @@ -33,7 +33,7 @@ use std::{ /// Configure and launch a _standalone_ auth server with `engine` and a _new_ `eth` namespace. #[allow(clippy::too_many_arguments)] -pub async fn launch( +pub async fn launch( provider: Provider, pool: Pool, network: Network, @@ -41,6 +41,7 @@ pub async fn launch( engine_api: EngineApi, socket_addr: SocketAddr, secret: JwtSecret, + events: Events, ) -> Result where Provider: BlockReaderIdExt @@ -56,6 +57,7 @@ where Network: NetworkInfo + Peers + Clone + 'static, Tasks: TaskSpawner + Clone + 'static, EngineApi: EngineApiServer, + Events: CanonStateSubscriptions + Clone + 'static, { // spawn a new cache task let eth_cache = @@ -68,7 +70,7 @@ where let eth_api = EthApi::with_spawner( provider.clone(), pool.clone(), - network, + network.clone(), eth_cache.clone(), gas_oracle, EthConfig::default().rpc_gas_cap, @@ -79,15 +81,28 @@ where let config = EthFilterConfig::default() .max_logs_per_response(DEFAULT_MAX_LOGS_PER_RESPONSE) .max_blocks_per_filter(DEFAULT_MAX_BLOCKS_PER_FILTER); - let eth_filter = - EthFilter::new(provider, pool, eth_cache.clone(), config, Box::new(executor.clone())); + let eth_pubsub = EthPubSub::with_spawner( + provider.clone(), + pool.clone(), + events.clone(), + network.clone(), + Box::new(executor.clone()), + ); + let eth_filter = EthFilter::new( + provider.clone(), + pool.clone(), + eth_cache.clone(), + config, + Box::new(executor.clone()), + eth_pubsub.clone(), + ); launch_with_eth_api(eth_api, eth_filter, engine_api, socket_addr, secret).await } /// Configure and launch a _standalone_ auth server with existing EthApi implementation. -pub async fn launch_with_eth_api( +pub async fn launch_with_eth_api( eth_api: EthApi, - eth_filter: EthFilter, + eth_filter: EthFilter, engine_api: EngineApi, socket_addr: SocketAddr, secret: JwtSecret, @@ -104,6 +119,7 @@ where Pool: TransactionPool + Clone + 'static, Network: NetworkInfo + Peers + Clone + 'static, EngineApi: EngineApiServer, + Events: CanonStateSubscriptions + Clone + 'static, { // Configure the module and start the server. let mut module = RpcModule::new(()); diff --git a/crates/rpc/rpc-builder/src/eth.rs b/crates/rpc/rpc-builder/src/eth.rs index 8da3405368fd..1e5b5fb3c627 100644 --- a/crates/rpc/rpc-builder/src/eth.rs +++ b/crates/rpc/rpc-builder/src/eth.rs @@ -19,7 +19,7 @@ pub struct EthHandlers { /// The async caching layer used by the eth handlers pub cache: EthStateCache, /// Polling based filter handler available on all transports - pub filter: EthFilter, + pub filter: EthFilter, /// Handler for subscriptions only available for transports that support it (ws, ipc) pub pubsub: EthPubSub, /// The configured tracing call pool diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index b2c7ff3af966..33404ee44a35 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -1210,20 +1210,20 @@ where blocking_task_pool.clone(), fee_history_cache, ); - let filter = EthFilter::new( + let pubsub = EthPubSub::with_spawner( self.provider.clone(), self.pool.clone(), - cache.clone(), - self.config.eth.filter_config(), + self.events.clone(), + self.network.clone(), executor.clone(), ); - - let pubsub = EthPubSub::with_spawner( + let filter = EthFilter::new( self.provider.clone(), self.pool.clone(), - self.events.clone(), - self.network.clone(), - executor, + cache.clone(), + self.config.eth.filter_config(), + executor.clone(), + pubsub.clone(), ); let eth = EthHandlers { api, cache, filter, pubsub, blocking_task_pool }; diff --git a/crates/rpc/rpc-types/Cargo.toml b/crates/rpc/rpc-types/Cargo.toml index 7074686e39ec..cc654b7deabd 100644 --- a/crates/rpc/rpc-types/Cargo.toml +++ b/crates/rpc/rpc-types/Cargo.toml @@ -15,6 +15,10 @@ alloy-primitives = { workspace = true, features = ["rand", "rlp", "serde"] } ethereum_ssz_derive = { version = "0.5", optional = true } ethereum_ssz = { version = "0.5", optional = true } +#async +tokio = { workspace = true, features = ["sync"] } +tokio-util = "0.7" + # misc thiserror.workspace = true itertools.workspace = true diff --git a/crates/rpc/rpc-types/src/eth/filter.rs b/crates/rpc/rpc-types/src/eth/filter.rs index b6c65bd19139..5297ddc21cb8 100644 --- a/crates/rpc/rpc-types/src/eth/filter.rs +++ b/crates/rpc/rpc-types/src/eth/filter.rs @@ -10,7 +10,9 @@ use std::{ collections::HashSet, hash::Hash, ops::{Range, RangeFrom, RangeTo}, + sync::Arc, }; +use tokio::sync::Mutex; /// Helper type to represent a bloom filter used for matching logs. #[derive(Default, Debug)] @@ -245,7 +247,7 @@ impl FilterBlockOption { } /// Filter for -#[derive(Default, Debug, PartialEq, Eq, Clone)] +#[derive(Default, Debug, Clone)] pub struct Filter { /// Filter block options, specifying on which blocks the filter should /// match. @@ -253,10 +255,21 @@ pub struct Filter { pub block_option: FilterBlockOption, /// Address pub address: FilterSet
, - /// Topics (maxmimum of 4) + /// Topics (maximum of 4) pub topics: [Topic; 4], + /// Reorged logs + pub reorged_logs: Arc>>>, } +impl PartialEq for Filter { + fn eq(&self, other: &Self) -> bool { + (&self.block_option, &self.address, &self.topics) == + (&other.block_option, &other.address, &other.topics) + } +} + +impl Eq for Filter {} + impl Filter { /// Creates a new, empty filter pub fn new() -> Self { @@ -619,7 +632,10 @@ impl<'de> Deserialize<'de> for Filter { FilterBlockOption::Range { from_block, to_block } }; - Ok(Filter { block_option, address, topics }) + let reorged_logs: Arc>>> = + Arc::new(Mutex::new(Some(Vec::new()))); + + Ok(Filter { block_option, address, topics, reorged_logs }) } } @@ -1123,6 +1139,7 @@ mod tests { Default::default(), Default::default(), ], + reorged_logs: Arc::new(Mutex::new(Some(Vec::new()))), } } @@ -1162,6 +1179,7 @@ mod tests { block_option: Default::default(), address: Default::default(), topics: Default::default(), + reorged_logs: Arc::new(Mutex::new(Some(Vec::new()))), }; let topics = filter.topics; @@ -1188,6 +1206,7 @@ mod tests { Default::default(), Default::default(), ], + reorged_logs: Arc::new(Mutex::new(Some(Vec::new()))), }; let topics = filter.topics; @@ -1219,6 +1238,7 @@ mod tests { Default::default(), Default::default(), ], + reorged_logs: Arc::new(Mutex::new(Some(Vec::new()))), }; let topics = filter.topics; @@ -1240,6 +1260,7 @@ mod tests { Default::default(), Default::default(), ], + reorged_logs: Arc::new(Mutex::new(Some(Vec::new()))), }; let topics_input = filter.topics; @@ -1257,6 +1278,7 @@ mod tests { block_option: Default::default(), address: rng_address.into(), topics: Default::default(), + reorged_logs: Arc::new(Mutex::new(Some(Vec::new()))), }; let address_bloom = FilteredParams::address_filter(&filter.address); assert!(FilteredParams::matches_address( @@ -1273,6 +1295,7 @@ mod tests { block_option: Default::default(), address: rng_address.into(), topics: Default::default(), + reorged_logs: Arc::new(Mutex::new(Some(Vec::new()))), }; let address_bloom = FilteredParams::address_filter(&filter.address); assert!(!FilteredParams::matches_address( @@ -1323,6 +1346,7 @@ mod tests { .into(), Default::default(), ], + reorged_logs: Arc::new(Mutex::new(Some(Vec::new()))) } ); } @@ -1348,6 +1372,7 @@ mod tests { }, address: Default::default(), topics: Default::default(), + reorged_logs: Arc::new(Mutex::new(Some(Vec::new()))) } ); } diff --git a/crates/rpc/rpc/src/eth/filter.rs b/crates/rpc/rpc/src/eth/filter.rs index 94aed3e80752..c9056d7aec7d 100644 --- a/crates/rpc/rpc/src/eth/filter.rs +++ b/crates/rpc/rpc/src/eth/filter.rs @@ -1,18 +1,25 @@ use super::cache::EthStateCache; use crate::{ eth::{ - error::EthApiError, - logs_utils::{self, append_matching_block_logs}, + error::{EthApiError, EthResult}, + logs_utils, + pubsub::EthPubSub, }, result::{rpc_error_with_code, ToRpcResult}, EthSubscriptionIdProvider, }; use core::fmt; +use futures::{executor, StreamExt}; +use reth_network_api::NetworkInfo; +use tokio_stream::{wrappers::BroadcastStream, Stream}; +use alloy_primitives::B256; use async_trait::async_trait; use jsonrpsee::{core::RpcResult, server::IdProvider}; -use reth_primitives::{IntoRecoveredTransaction, TxHash}; -use reth_provider::{BlockIdReader, BlockReader, EvmEnvProvider, ProviderError}; +use reth_primitives::{BlockHashOrNumber, IntoRecoveredTransaction, Receipt, SealedBlock, TxHash}; +use reth_provider::{ + BlockIdReader, BlockReader, CanonStateSubscriptions, EvmEnvProvider, ProviderError, +}; use reth_rpc_api::EthFilterApiServer; use reth_rpc_types::{ BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log, @@ -23,6 +30,7 @@ use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, Transa use std::{ collections::HashMap, iter::StepBy, + marker::PhantomData, ops::RangeInclusive, sync::Arc, time::{Duration, Instant}, @@ -37,15 +45,19 @@ use tracing::trace; const MAX_HEADERS_RANGE: u64 = 1_000; // with ~530bytes per header this is ~500kb /// `Eth` filter RPC implementation. -pub struct EthFilter { +pub struct EthFilter { /// All nested fields bundled together inner: Arc>, + phantom_events: PhantomData, + phantom_network: PhantomData, } -impl EthFilter +impl EthFilter where - Provider: Send + Sync + 'static, + Provider: BlockReader + EvmEnvProvider + Send + Sync + 'static, Pool: Send + Sync + 'static, + Events: CanonStateSubscriptions + 'static, + Network: NetworkInfo + 'static, { /// Creates a new, shareable instance. /// @@ -61,6 +73,7 @@ where eth_cache: EthStateCache, config: EthFilterConfig, task_spawner: Box, + pubsub: EthPubSub, ) -> Self { let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } = config; @@ -78,7 +91,11 @@ where max_logs_per_response: max_logs_per_response.unwrap_or(usize::MAX), }; - let eth_filter = Self { inner: Arc::new(inner) }; + let eth_filter = Self { + inner: Arc::new(inner), + phantom_events: PhantomData::, + phantom_network: PhantomData::, + }; let this = eth_filter.clone(); eth_filter.inner.task_spawner.clone().spawn_critical( @@ -88,6 +105,14 @@ where }), ); + let also_this = eth_filter.clone(); + eth_filter.inner.task_spawner.clone().spawn_critical( + "eth-filters_update_reorged_logs", + Box::pin(async move { + also_this.update_reorged_logs(pubsub).await; + }), + ); + eth_filter } @@ -120,9 +145,69 @@ where is_valid }) } + + async fn update_reorged_logs(&self, pubsub: EthPubSub) { + let mut stream = self.reorged_logs_stream(pubsub).await; + while let Some((id, logs)) = stream.next().await { + let mut filters = executor::block_on(self.active_filters().inner.lock()); + let active_filter = + filters.get_mut(&id).ok_or(FilterError::FilterNotFound(id)).unwrap(); + if let FilterKind::Log(ref mut filter) = active_filter.kind { + filter.reorged_logs = Arc::new(Mutex::new(Some(logs))); + } + } + } + + /// Reacts to reorged blocks, checks impacted log filters, stores reorged logs in the log filter + pub async fn reorged_logs_stream( + &self, + pubsub: EthPubSub, + ) -> impl Stream)> + '_ { + let mut temp_reorged_blocks = Vec::new(); + + BroadcastStream::new(pubsub.get_chain_events().subscribe_to_canonical_state()) + .map(move |canon_state| { + canon_state.expect("new block subscription never ends; qed").block_receipts() + }) + .flat_map(futures::stream::iter) + .flat_map(move |(block_receipts, removed)| { + let mut reorged_logs: Vec<(FilterId, Vec)> = Vec::new(); + + if removed { + temp_reorged_blocks.push(block_receipts); + + let filters = executor::block_on(self.active_filters().inner.lock()); + + filters.iter().for_each(|(id, active_filter)| { + if let FilterKind::Log(ref filter) = active_filter.kind { + let mut reverted_logs: Vec = Vec::new(); + let filtered_params = FilteredParams::new(Some(*filter.clone())); + + for reorged_block in &mut temp_reorged_blocks { + let mut matching_logs = logs_utils::matching_block_logs( + &filtered_params, + reorged_block.block, + reorged_block.tx_receipts.clone(), + true, + ); + + reverted_logs.append(&mut matching_logs); + if Some(reorged_block.block.hash) == active_filter.block_hash { + reorged_logs.push((id.clone(), reverted_logs.clone())); + } + } + } + }); + } else { + temp_reorged_blocks.clear(); + } + + futures::stream::iter(reorged_logs) + }) + } } -impl EthFilter +impl EthFilter where Provider: BlockReader + BlockIdReader + EvmEnvProvider + 'static, Pool: TransactionPool + 'static, @@ -221,10 +306,13 @@ where } #[async_trait] -impl EthFilterApiServer for EthFilter +impl EthFilterApiServer + for EthFilter where Provider: BlockReader + BlockIdReader + EvmEnvProvider + 'static, Pool: TransactionPool + 'static, + Events: CanonStateSubscriptions + 'static, + Network: NetworkInfo + 'static, { /// Handler for `eth_newFilter` async fn new_filter(&self, filter: Filter) -> RpcResult { @@ -303,15 +391,21 @@ where } } -impl std::fmt::Debug for EthFilter { +impl std::fmt::Debug + for EthFilter +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("EthFilter").finish_non_exhaustive() } } -impl Clone for EthFilter { +impl Clone for EthFilter { fn clone(&self) -> Self { - Self { inner: Arc::clone(&self.inner) } + Self { + inner: Arc::clone(&self.inner), + phantom_events: PhantomData::, + phantom_network: PhantomData::, + } } } @@ -390,12 +484,15 @@ where /// Installs a new filter and returns the new identifier. async fn install_filter(&self, kind: FilterKind) -> RpcResult { let last_poll_block_number = self.provider.best_block_number().to_rpc_result()?; + let last_poll_block_hash = + self.provider.block_hash(last_poll_block_number).to_rpc_result()?; let id = FilterId::from(self.id_provider.next_id()); let mut filters = self.active_filters.inner.lock().await; filters.insert( id.clone(), ActiveFilter { block: last_poll_block_number, + block_hash: last_poll_block_hash, last_poll_timestamp: Instant::now(), kind, }, @@ -537,6 +634,8 @@ pub struct ActiveFilters { struct ActiveFilter { /// At which block the filter was polled last. block: u64, + /// Hash of the block at which the filter was polled last. + block_hash: Option, /// Last time this filter was polled. last_poll_timestamp: Instant, /// What kind of filter it is. diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index ded49d2a1bde..34fcfaab5855 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -60,6 +60,11 @@ impl EthPubSub let inner = EthPubSubInner { provider, pool, chain_events, network }; Self { inner: Arc::new(inner), subscription_task_spawner } } + + /// Method to get chain events for canonical state subscription + pub fn get_chain_events(&self) -> &Events { + &self.inner.chain_events + } } #[async_trait::async_trait]