Skip to content

Commit

Permalink
Merge pull request #27 from Arindam2407/reorg_new
Browse files Browse the repository at this point in the history
Tested changes
  • Loading branch information
Arindam2407 authored Jan 15, 2024
2 parents 1cc3576 + 771ee59 commit a476bd1
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 49 deletions.
6 changes: 5 additions & 1 deletion bin/reth/src/args/rpc_server_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,14 +331,16 @@ impl RpcServerArgs {
}

/// Create Engine API server.
pub async fn start_auth_server<Provider, Pool, Network, Tasks, EngineT>(
#[allow(clippy::too_many_arguments)]
pub async fn start_auth_server<Provider, Pool, Network, Tasks, EngineT, Events>(
&self,
provider: Provider,
pool: Pool,
network: Network,
executor: Tasks,
engine_api: EngineApi<Provider, EngineT>,
jwt_secret: JwtSecret,
events: Events,
) -> Result<AuthServerHandle, RpcError>
where
Provider: BlockReaderIdExt
Expand All @@ -353,6 +355,7 @@ impl RpcServerArgs {
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
EngineT: EngineTypes + 'static,
Events: CanonStateSubscriptions + Clone + 'static,
{
let socket_address = SocketAddr::new(self.auth_addr, self.auth_port);

Expand All @@ -364,6 +367,7 @@ impl RpcServerArgs {
engine_api,
socket_address,
jwt_secret,
events,
)
.await
}
Expand Down
38 changes: 30 additions & 8 deletions crates/rpc/rpc-builder/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ use jsonrpsee::{
use reth_network_api::{NetworkInfo, Peers};
use reth_node_api::EngineTypes;
use reth_provider::{
BlockReaderIdExt, ChainSpecProvider, EvmEnvProvider, HeaderProvider, ReceiptProviderIdExt,
StateProviderFactory,
BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, EvmEnvProvider, HeaderProvider,
ReceiptProviderIdExt, StateProviderFactory,
};
use reth_rpc::{
eth::{
cache::EthStateCache, gas_oracle::GasPriceOracle, EthFilterConfig, FeeHistoryCache,
FeeHistoryCacheConfig,
},
AuthLayer, BlockingTaskPool, Claims, EngineEthApi, EthApi, EthFilter,
AuthLayer, BlockingTaskPool, Claims, EngineEthApi, EthApi, EthFilter, EthPubSub,
EthSubscriptionIdProvider, JwtAuthValidator, JwtSecret,
};
use reth_rpc_api::{servers::*, EngineApiServer};
Expand All @@ -35,14 +35,15 @@ use std::{

/// Configure and launch a _standalone_ auth server with `engine` and a _new_ `eth` namespace.
#[allow(clippy::too_many_arguments)]
pub async fn launch<Provider, Pool, Network, Tasks, EngineApi, EngineT>(
pub async fn launch<Provider, Pool, Network, Tasks, EngineApi, EngineT, Events>(
provider: Provider,
pool: Pool,
network: Network,
executor: Tasks,
engine_api: EngineApi,
socket_addr: SocketAddr,
secret: JwtSecret,
events: Events,
) -> Result<AuthServerHandle, RpcError>
where
Provider: BlockReaderIdExt
Expand All @@ -59,6 +60,7 @@ where
Tasks: TaskSpawner + Clone + 'static,
EngineT: EngineTypes,
EngineApi: EngineApiServer<EngineT>,
Events: CanonStateSubscriptions + Clone + 'static,
{
// spawn a new cache task
let eth_cache =
Expand All @@ -71,7 +73,7 @@ where
let eth_api = EthApi::with_spawner(
provider.clone(),
pool.clone(),
network,
network.clone(),
eth_cache.clone(),
gas_oracle,
EthConfig::default().rpc_gas_cap,
Expand All @@ -82,9 +84,29 @@ where
let config = EthFilterConfig::default()
.max_logs_per_response(DEFAULT_MAX_LOGS_PER_RESPONSE)
.max_blocks_per_filter(DEFAULT_MAX_BLOCKS_PER_FILTER);
let eth_filter =
EthFilter::new(provider, pool, eth_cache.clone(), config, Box::new(executor.clone()));
launch_with_eth_api(eth_api, eth_filter, engine_api, socket_addr, secret).await
let eth_pubsub = EthPubSub::with_spawner(
provider.clone(),
pool.clone(),
events.clone(),
network.clone(),
Box::new(executor.clone()),
);
let eth_filter = EthFilter::new(
provider.clone(),
pool.clone(),
eth_cache.clone(),
config,
Box::new(executor.clone()),
eth_pubsub.clone(),
);
launch_with_eth_api::<Provider, Pool, Network, EngineApi, EngineT>(
eth_api,
eth_filter,
engine_api,
socket_addr,
secret,
)
.await
}

/// Configure and launch a _standalone_ auth server with existing EthApi implementation.
Expand Down
15 changes: 8 additions & 7 deletions crates/rpc/rpc-builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1236,20 +1236,21 @@ where
blocking_task_pool.clone(),
fee_history_cache,
);
let filter = EthFilter::new(
let pubsub = EthPubSub::with_spawner(
self.provider.clone(),
self.pool.clone(),
cache.clone(),
self.config.eth.filter_config(),
self.events.clone(),
self.network.clone(),
executor.clone(),
);

let pubsub = EthPubSub::with_spawner(
let filter = EthFilter::new(
self.provider.clone(),
self.pool.clone(),
self.events.clone(),
self.network.clone(),
executor,
cache.clone(),
self.config.eth.filter_config(),
executor.clone(),
pubsub.clone(),
);

let eth = EthHandlers { api, cache, filter, pubsub, blocking_task_pool };
Expand Down
120 changes: 111 additions & 9 deletions crates/rpc/rpc/src/eth/filter.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
use super::cache::EthStateCache;
use crate::{
eth::{
error::EthApiError,
logs_utils::{self, append_matching_block_logs},
},
eth::{error::EthApiError, logs_utils, pubsub::EthPubSub},
result::{rpc_error_with_code, ToRpcResult},
EthSubscriptionIdProvider,
};
use core::fmt;
use futures::{executor, StreamExt};
use reth_network_api::NetworkInfo;
use tokio_stream::{wrappers::BroadcastStream, Stream};

use alloy_primitives::B256;
use async_trait::async_trait;
use jsonrpsee::{core::RpcResult, server::IdProvider};
use reth_primitives::{IntoRecoveredTransaction, TxHash};
use reth_provider::{BlockIdReader, BlockReader, EvmEnvProvider, ProviderError};
use reth_provider::{
BlockIdReader, BlockReader, CanonStateSubscriptions, EvmEnvProvider, ProviderError,
};
use reth_rpc_api::EthFilterApiServer;
use reth_rpc_types::{
BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log,
Expand Down Expand Up @@ -44,7 +47,7 @@ pub struct EthFilter<Provider, Pool> {

impl<Provider, Pool> EthFilter<Provider, Pool>
where
Provider: Send + Sync + 'static,
Provider: BlockReader + EvmEnvProvider + Send + Sync + 'static,
Pool: Send + Sync + 'static,
{
/// Creates a new, shareable instance.
Expand All @@ -55,13 +58,18 @@ where
/// See also [EthFilterConfig].
///
/// This also spawns a task that periodically clears stale filters.
pub fn new(
pub fn new<Events, Network>(
provider: Provider,
pool: Pool,
eth_cache: EthStateCache,
config: EthFilterConfig,
task_spawner: Box<dyn TaskSpawner>,
) -> Self {
pubsub: EthPubSub<Provider, Pool, Events, Network>,
) -> Self
where
Events: CanonStateSubscriptions + 'static,
Network: NetworkInfo + 'static,
{
let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
config;
let inner = EthFilterInner {
Expand All @@ -88,6 +96,14 @@ where
}),
);

let also_this = eth_filter.clone();
eth_filter.inner.task_spawner.clone().spawn_critical(
"eth-filters_update_reorged_logs",
Box::pin(async move {
also_this.update_reorged_logs(pubsub).await;
}),
);

eth_filter
}

Expand Down Expand Up @@ -120,6 +136,83 @@ where
is_valid
})
}

/// Endless future that updates the reorged logs for impacted filters
async fn update_reorged_logs<Events, Network>(
&self,
pubsub: EthPubSub<Provider, Pool, Events, Network>,
) where
Events: CanonStateSubscriptions + 'static,
Network: NetworkInfo + 'static,
{
let mut stream = self.reorged_logs_stream(pubsub).await;
while let Some((id, mut logs)) = stream.next().await {
let mut filters = executor::block_on(self.active_filters().inner.lock());
let active_filter =
filters.get_mut(&id).ok_or(FilterError::FilterNotFound(id)).unwrap();
let mut guard = active_filter.reorged_logs.lock().await;
if let Some(reorged_logs_vec) = guard.as_mut() {
reorged_logs_vec.append(&mut logs);
}
}
}

/// Reacts to reorged blocks, checks impacted log filters, stores reorged logs in the log filter
pub async fn reorged_logs_stream<Events, Network>(
&self,
pubsub: EthPubSub<Provider, Pool, Events, Network>,
) -> impl Stream<Item = (FilterId, Vec<Log>)> + '_
where
Events: CanonStateSubscriptions + 'static,
Network: NetworkInfo + 'static,
{
let mut temp_reorged_blocks = Vec::new();

BroadcastStream::new(pubsub.get_chain_events().subscribe_to_canonical_state())
.map(move |canon_state| {
canon_state.expect("new block subscription never ends; qed").block_receipts()
})
.flat_map(futures::stream::iter)
.flat_map(move |(block_receipts, removed)| {
let mut reorged_logs: Vec<(FilterId, Vec<Log>)> = Vec::new();

if removed {
temp_reorged_blocks.push(block_receipts);

let filters = executor::block_on(self.active_filters().inner.lock());

filters.iter().for_each(|(id, active_filter)| {
if let FilterKind::Log(ref filter) = active_filter.kind {
let mut reverted_logs: Vec<Log> = Vec::new();
let filtered_params = FilteredParams::new(Some(*filter.clone()));

for reorged_block in &mut temp_reorged_blocks {
let mut matching_logs =
logs_utils::matching_block_logs_with_tx_hashes(
&filtered_params,
reorged_block.block,
reorged_block
.tx_receipts
.clone()
.iter()
.map(|(tx, receipt)| (*tx, receipt)),
true,
);

reverted_logs.append(&mut matching_logs);
if Some(reorged_block.block.hash) == active_filter.block_hash {
reorged_logs.push((id.clone(), reverted_logs.clone()));
}
}
}
});
} else {
temp_reorged_blocks.clear();
}

futures::stream::iter(reorged_logs)
})
}
}

impl<Provider, Pool> EthFilter<Provider, Pool>
Expand Down Expand Up @@ -390,14 +483,19 @@ where
/// Installs a new filter and returns the new identifier.
async fn install_filter(&self, kind: FilterKind) -> RpcResult<FilterId> {
let last_poll_block_number = self.provider.best_block_number().to_rpc_result()?;
let last_poll_block_hash =
self.provider.block_hash(last_poll_block_number).to_rpc_result()?;
let id = FilterId::from(self.id_provider.next_id());
let mut filters = self.active_filters.inner.lock().await;
let reorged_logs: Arc<Mutex<Option<Vec<Log>>>> = Arc::new(Mutex::new(Some(Vec::new())));
filters.insert(
id.clone(),
ActiveFilter {
block: last_poll_block_number,
block_hash: last_poll_block_hash,
last_poll_timestamp: Instant::now(),
kind,
reorged_logs,
},
);
Ok(id)
Expand Down Expand Up @@ -450,7 +548,7 @@ where
};

if let Some(receipts) = self.eth_cache.get_receipts(block_hash).await? {
append_matching_block_logs(
logs_utils::append_matching_block_logs(
&mut all_logs,
&self.provider,
&filter_params,
Expand Down Expand Up @@ -537,10 +635,14 @@ pub struct ActiveFilters {
struct ActiveFilter {
/// At which block the filter was polled last.
block: u64,
/// Hash of the block at which the filter was polled last.
block_hash: Option<B256>,
/// Last time this filter was polled.
last_poll_timestamp: Instant,
/// What kind of filter it is.
kind: FilterKind,
/// Reorged logs
reorged_logs: Arc<Mutex<Option<Vec<Log>>>>,
}

/// A receiver for pending transactions that returns all new transactions since the last poll.
Expand Down
Loading

0 comments on commit a476bd1

Please sign in to comment.