Skip to content

Commit

Permalink
Added improved implementation for reorged logs subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
root authored and root committed Feb 8, 2024
1 parent 1a8440a commit 9cb3ab5
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 26 deletions.
5 changes: 4 additions & 1 deletion crates/node-core/src/args/rpc_server_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ impl RpcServerArgs {

/// Create Engine API server.
#[allow(clippy::too_many_arguments)]
pub async fn start_auth_server<Provider, Pool, Network, Tasks, EngineT, EvmConfig>(
pub async fn start_auth_server<Provider, Pool, Network, Tasks, EngineT, EvmConfig, Events>(
&self,
provider: Provider,
pool: Pool,
Expand All @@ -394,6 +394,7 @@ impl RpcServerArgs {
engine_api: EngineApi<Provider, EngineT>,
jwt_secret: JwtSecret,
evm_config: EvmConfig,
events: Events,
) -> Result<AuthServerHandle, RpcError>
where
Provider: BlockReaderIdExt
Expand All @@ -409,6 +410,7 @@ impl RpcServerArgs {
Tasks: TaskSpawner + Clone + 'static,
EngineT: EngineTypes + 'static,
EvmConfig: EvmEnvConfig + 'static,
Events: CanonStateSubscriptions + Clone + 'static,
{
let socket_address = SocketAddr::new(self.auth_addr, self.auth_port);

Expand All @@ -421,6 +423,7 @@ impl RpcServerArgs {
socket_address,
jwt_secret,
evm_config,
events,
)
.await
}
Expand Down
30 changes: 23 additions & 7 deletions crates/rpc/rpc-builder/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use jsonrpsee::{
use reth_network_api::{NetworkInfo, Peers};
use reth_node_api::{EngineTypes, EvmEnvConfig};
use reth_provider::{
BlockReaderIdExt, ChainSpecProvider, EvmEnvProvider, HeaderProvider, ReceiptProviderIdExt,
StateProviderFactory,
BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, EvmEnvProvider, HeaderProvider,
ReceiptProviderIdExt, StateProviderFactory,
};
use reth_rpc::{
eth::{
Expand All @@ -35,7 +35,7 @@ use std::{

/// Configure and launch a _standalone_ auth server with `engine` and a _new_ `eth` namespace.
#[allow(clippy::too_many_arguments)]
pub async fn launch<Provider, Pool, Network, Tasks, EngineApi, EngineT, EvmConfig>(
pub async fn launch<Provider, Pool, Network, Tasks, EngineApi, EngineT, EvmConfig, Events>(
provider: Provider,
pool: Pool,
network: Network,
Expand All @@ -44,6 +44,7 @@ pub async fn launch<Provider, Pool, Network, Tasks, EngineApi, EngineT, EvmConfi
socket_addr: SocketAddr,
secret: JwtSecret,
evm_config: EvmConfig,
events: Events,
) -> Result<AuthServerHandle, RpcError>
where
Provider: BlockReaderIdExt
Expand All @@ -61,6 +62,7 @@ where
EngineT: EngineTypes + 'static,
EngineApi: EngineApiServer<EngineT>,
EvmConfig: EvmEnvConfig + 'static,
Events: CanonStateSubscriptions + Clone + 'static,
{
// spawn a new cache task
let eth_cache = EthStateCache::spawn_with(
Expand All @@ -77,7 +79,7 @@ where
let eth_api = EthApi::with_spawner(
provider.clone(),
pool.clone(),
network,
network.clone(),
eth_cache.clone(),
gas_oracle,
EthConfig::default().rpc_gas_cap,
Expand All @@ -89,9 +91,23 @@ where
let config = EthFilterConfig::default()
.max_logs_per_response(DEFAULT_MAX_LOGS_PER_RESPONSE)
.max_blocks_per_filter(DEFAULT_MAX_BLOCKS_PER_FILTER);
let eth_filter =
EthFilter::new(provider, pool, eth_cache.clone(), config, Box::new(executor.clone()));
launch_with_eth_api(eth_api, eth_filter, engine_api, socket_addr, secret).await

let eth_filter = EthFilter::new::<Events, Network>(
provider.clone(),
pool.clone(),
eth_cache.clone(),
config,
Box::new(executor.clone()),
events.clone(),
);
launch_with_eth_api::<Provider, Pool, Network, EngineApi, EngineT, EvmConfig>(
eth_api,
eth_filter,
engine_api,
socket_addr,
secret,
)
.await
}

/// Configure and launch a _standalone_ auth server with existing EthApi implementation.
Expand Down
13 changes: 7 additions & 6 deletions crates/rpc/rpc-builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1322,20 +1322,21 @@ where
fee_history_cache,
self.evm_config.clone(),
);
let filter = EthFilter::new(
let pubsub = EthPubSub::with_spawner(
self.provider.clone(),
self.pool.clone(),
cache.clone(),
self.config.eth.filter_config(),
self.events.clone(),
self.network.clone(),
executor.clone(),
);

let pubsub = EthPubSub::with_spawner(
let filter = EthFilter::new::<Events, Network>(
self.provider.clone(),
self.pool.clone(),
cache.clone(),
self.config.eth.filter_config(),
executor.clone(),
self.events.clone(),
self.network.clone(),
executor,
);

let eth = EthHandlers { api, cache, filter, pubsub, blocking_task_pool };
Expand Down
131 changes: 122 additions & 9 deletions crates/rpc/rpc/src/eth/filter.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
use super::cache::EthStateCache;
use crate::{
eth::{
error::EthApiError,
logs_utils::{self, append_matching_block_logs},
},
eth::{error::EthApiError, logs_utils},
result::{rpc_error_with_code, ToRpcResult},
EthSubscriptionIdProvider,
};
use core::fmt;
use futures::StreamExt;
use reth_network_api::NetworkInfo;
use tokio_stream::{wrappers::BroadcastStream, Stream};

use alloy_primitives::B256;
use async_trait::async_trait;
use jsonrpsee::{core::RpcResult, server::IdProvider};
use reth_primitives::{IntoRecoveredTransaction, TxHash};
use reth_provider::{BlockIdReader, BlockReader, EvmEnvProvider, ProviderError};
use reth_provider::{
chain::BlockReceipts, BlockIdReader, BlockReader, CanonStateSubscriptions, EvmEnvProvider,
ProviderError,
};
use reth_rpc_api::EthFilterApiServer;
use reth_rpc_types::{
BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log,
Expand Down Expand Up @@ -44,7 +48,7 @@ pub struct EthFilter<Provider, Pool> {

impl<Provider, Pool> EthFilter<Provider, Pool>
where
Provider: Send + Sync + 'static,
Provider: BlockReader + EvmEnvProvider + Send + Sync + 'static,
Pool: Send + Sync + 'static,
{
/// Creates a new, shareable instance.
Expand All @@ -55,13 +59,18 @@ where
/// See also [EthFilterConfig].
///
/// This also spawns a task that periodically clears stale filters.
pub fn new(
pub fn new<Events, Network>(
provider: Provider,
pool: Pool,
eth_cache: EthStateCache,
config: EthFilterConfig,
task_spawner: Box<dyn TaskSpawner>,
) -> Self {
events: Events,
) -> Self
where
Events: CanonStateSubscriptions + 'static,
Network: NetworkInfo + 'static,
{
let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
config;
let inner = EthFilterInner {
Expand All @@ -88,6 +97,14 @@ where
}),
);

let also_this = eth_filter.clone();
eth_filter.inner.task_spawner.clone().spawn_critical(
"eth-filters_update_reorged_logs",
Box::pin(async move {
also_this.update_reorged_logs::<Events, Network>(events).await;
}),
);

eth_filter
}

Expand Down Expand Up @@ -120,6 +137,93 @@ where
is_valid
})
}

/// Endless future that updates the reorged logs for impacted filters
async fn update_reorged_logs<Events, Network>(&self, events: Events)
where
Events: CanonStateSubscriptions + 'static,
Network: NetworkInfo + 'static,
{
let mut stream = self.block_receipts_stream::<Events, Network>(events).await;

let mut temp_reorged_blocks = Vec::new();
let mut reverted_logs: Vec<Log> = Vec::new();
let mut checkpoint_block: u64 = 1;

while let Some((block_receipts, removed)) = stream.next().await {
let mut filters = self.active_filters().inner.lock().await;
let mut reorged_logs: Vec<(FilterId, Vec<Log>)> = Vec::new();

if removed {
let block_number = block_receipts.block.number;
temp_reorged_blocks.push(block_receipts);

for reorged_block in &mut temp_reorged_blocks {
if checkpoint_block < reorged_block.block.number {
filters.iter().for_each(|(id, active_filter)| {
if block_number <= active_filter.block {
if let FilterKind::Log(ref filter) = active_filter.kind {
let filtered_params =
FilteredParams::new(Some(*filter.clone()));

let mut matching_logs =
logs_utils::matching_block_logs_with_tx_hashes(
&filtered_params,
reorged_block.block,
reorged_block
.tx_receipts
.clone()
.iter()
.map(|(tx, receipt)| (*tx, receipt)),
true,
);

reverted_logs.append(&mut matching_logs);
if Some(reorged_block.block.hash) == active_filter.block_hash {
reorged_logs.push((id.clone(), reverted_logs.clone()));
}
}
}
});
}
}
checkpoint_block = block_number;
} else {
temp_reorged_blocks.clear();
reverted_logs.clear();
checkpoint_block = 1;
}

if let Some((id, mut logs)) = reorged_logs.pop() {
let active_filter =
filters.get_mut(&id).ok_or(FilterError::FilterNotFound(id)).unwrap();
let guard = &mut active_filter.reorged_logs;
if let Some(reorged_logs_vec) = guard.as_mut() {
reorged_logs_vec.append(&mut logs);
}
}
}
}

/// Generates a stream of block receipts
pub async fn block_receipts_stream<Events, Network>(
&self,
events: Events,
) -> impl Stream<Item = (BlockReceipts, bool)> + '_
where
Events: CanonStateSubscriptions + 'static,
Network: NetworkInfo + 'static,
{
BroadcastStream::new(events.subscribe_to_canonical_state())
.map(move |canon_state| {
canon_state.expect("new block subscription never ends; qed").block_receipts()
})
.flat_map(futures::stream::iter)
.flat_map(move |(block_receipts, removed)| {
let block_stream: Vec<(BlockReceipts, bool)> = vec![(block_receipts, removed)];
futures::stream::iter(block_stream)
})
}
}

impl<Provider, Pool> EthFilter<Provider, Pool>
Expand Down Expand Up @@ -390,14 +494,19 @@ where
/// Installs a new filter and returns the new identifier.
async fn install_filter(&self, kind: FilterKind) -> RpcResult<FilterId> {
let last_poll_block_number = self.provider.best_block_number().to_rpc_result()?;
let last_poll_block_hash =
self.provider.block_hash(last_poll_block_number).to_rpc_result()?;
let id = FilterId::from(self.id_provider.next_id());
let mut filters = self.active_filters.inner.lock().await;
let reorged_logs: Option<Vec<Log>> = Some(Vec::new());
filters.insert(
id.clone(),
ActiveFilter {
block: last_poll_block_number,
block_hash: last_poll_block_hash,
last_poll_timestamp: Instant::now(),
kind,
reorged_logs,
},
);
Ok(id)
Expand Down Expand Up @@ -450,7 +559,7 @@ where
};

if let Some(receipts) = self.eth_cache.get_receipts(block_hash).await? {
append_matching_block_logs(
logs_utils::append_matching_block_logs(
&mut all_logs,
&self.provider,
&filter_params,
Expand Down Expand Up @@ -537,10 +646,14 @@ pub struct ActiveFilters {
struct ActiveFilter {
/// At which block the filter was polled last.
block: u64,
/// Hash of the block at which the filter was polled last.
block_hash: Option<B256>,
/// Last time this filter was polled.
last_poll_timestamp: Instant,
/// What kind of filter it is.
kind: FilterKind,
/// Reorged logs
reorged_logs: Option<Vec<Log>>,
}

/// A receiver for pending transactions that returns all new transactions since the last poll.
Expand Down
11 changes: 8 additions & 3 deletions crates/rpc/rpc/src/eth/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ impl<Provider, Pool, Events, Network> EthPubSub<Provider, Pool, Events, Network>
let inner = EthPubSubInner { provider, pool, chain_events, network };
Self { inner: Arc::new(inner), subscription_task_spawner }
}

/// Method to get chain events for canonical state subscription
pub fn get_chain_events(&self) -> &Events {
&self.inner.chain_events
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -134,7 +139,7 @@ where
),
))
});
return pipe_from_stream(accepted_sink, stream).await
return pipe_from_stream(accepted_sink, stream).await;
}
Params::Bool(false) | Params::None => {
// only hashes requested
Expand Down Expand Up @@ -164,7 +169,7 @@ where
// send the current status immediately
let msg = SubscriptionMessage::from_json(&current_sub_res)?;
if accepted_sink.send(msg).await.is_err() {
return Ok(())
return Ok(());
}

while (canon_state.next().await).is_some() {
Expand All @@ -178,7 +183,7 @@ where
let sync_status = pubsub.sync_status(current_syncing).await;
let msg = SubscriptionMessage::from_json(&sync_status)?;
if accepted_sink.send(msg).await.is_err() {
break
break;
}
}
}
Expand Down

0 comments on commit 9cb3ab5

Please sign in to comment.