Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tested changes #27

Merged
merged 1 commit into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion bin/reth/src/args/rpc_server_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,14 +331,16 @@ impl RpcServerArgs {
}

/// Create Engine API server.
pub async fn start_auth_server<Provider, Pool, Network, Tasks, EngineT>(
#[allow(clippy::too_many_arguments)]
pub async fn start_auth_server<Provider, Pool, Network, Tasks, EngineT, Events>(
&self,
provider: Provider,
pool: Pool,
network: Network,
executor: Tasks,
engine_api: EngineApi<Provider, EngineT>,
jwt_secret: JwtSecret,
events: Events,
) -> Result<AuthServerHandle, RpcError>
where
Provider: BlockReaderIdExt
Expand All @@ -353,6 +355,7 @@ impl RpcServerArgs {
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
EngineT: EngineTypes + 'static,
Events: CanonStateSubscriptions + Clone + 'static,
{
let socket_address = SocketAddr::new(self.auth_addr, self.auth_port);

Expand All @@ -364,6 +367,7 @@ impl RpcServerArgs {
engine_api,
socket_address,
jwt_secret,
events,
)
.await
}
Expand Down
38 changes: 30 additions & 8 deletions crates/rpc/rpc-builder/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ use jsonrpsee::{
use reth_network_api::{NetworkInfo, Peers};
use reth_node_api::EngineTypes;
use reth_provider::{
BlockReaderIdExt, ChainSpecProvider, EvmEnvProvider, HeaderProvider, ReceiptProviderIdExt,
StateProviderFactory,
BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, EvmEnvProvider, HeaderProvider,
ReceiptProviderIdExt, StateProviderFactory,
};
use reth_rpc::{
eth::{
cache::EthStateCache, gas_oracle::GasPriceOracle, EthFilterConfig, FeeHistoryCache,
FeeHistoryCacheConfig,
},
AuthLayer, BlockingTaskPool, Claims, EngineEthApi, EthApi, EthFilter,
AuthLayer, BlockingTaskPool, Claims, EngineEthApi, EthApi, EthFilter, EthPubSub,
EthSubscriptionIdProvider, JwtAuthValidator, JwtSecret,
};
use reth_rpc_api::{servers::*, EngineApiServer};
Expand All @@ -35,14 +35,15 @@ use std::{

/// Configure and launch a _standalone_ auth server with `engine` and a _new_ `eth` namespace.
#[allow(clippy::too_many_arguments)]
pub async fn launch<Provider, Pool, Network, Tasks, EngineApi, EngineT>(
pub async fn launch<Provider, Pool, Network, Tasks, EngineApi, EngineT, Events>(
provider: Provider,
pool: Pool,
network: Network,
executor: Tasks,
engine_api: EngineApi,
socket_addr: SocketAddr,
secret: JwtSecret,
events: Events,
) -> Result<AuthServerHandle, RpcError>
where
Provider: BlockReaderIdExt
Expand All @@ -59,6 +60,7 @@ where
Tasks: TaskSpawner + Clone + 'static,
EngineT: EngineTypes,
EngineApi: EngineApiServer<EngineT>,
Events: CanonStateSubscriptions + Clone + 'static,
{
// spawn a new cache task
let eth_cache =
Expand All @@ -71,7 +73,7 @@ where
let eth_api = EthApi::with_spawner(
provider.clone(),
pool.clone(),
network,
network.clone(),
eth_cache.clone(),
gas_oracle,
EthConfig::default().rpc_gas_cap,
Expand All @@ -82,9 +84,29 @@ where
let config = EthFilterConfig::default()
.max_logs_per_response(DEFAULT_MAX_LOGS_PER_RESPONSE)
.max_blocks_per_filter(DEFAULT_MAX_BLOCKS_PER_FILTER);
let eth_filter =
EthFilter::new(provider, pool, eth_cache.clone(), config, Box::new(executor.clone()));
launch_with_eth_api(eth_api, eth_filter, engine_api, socket_addr, secret).await
let eth_pubsub = EthPubSub::with_spawner(
provider.clone(),
pool.clone(),
events.clone(),
network.clone(),
Box::new(executor.clone()),
);
let eth_filter = EthFilter::new(
provider.clone(),
pool.clone(),
eth_cache.clone(),
config,
Box::new(executor.clone()),
eth_pubsub.clone(),
);
launch_with_eth_api::<Provider, Pool, Network, EngineApi, EngineT>(
eth_api,
eth_filter,
engine_api,
socket_addr,
secret,
)
.await
}

/// Configure and launch a _standalone_ auth server with existing EthApi implementation.
Expand Down
15 changes: 8 additions & 7 deletions crates/rpc/rpc-builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1236,20 +1236,21 @@ where
blocking_task_pool.clone(),
fee_history_cache,
);
let filter = EthFilter::new(
let pubsub = EthPubSub::with_spawner(
self.provider.clone(),
self.pool.clone(),
cache.clone(),
self.config.eth.filter_config(),
self.events.clone(),
self.network.clone(),
executor.clone(),
);

let pubsub = EthPubSub::with_spawner(
let filter = EthFilter::new(
self.provider.clone(),
self.pool.clone(),
self.events.clone(),
self.network.clone(),
executor,
cache.clone(),
self.config.eth.filter_config(),
executor.clone(),
pubsub.clone(),
);

let eth = EthHandlers { api, cache, filter, pubsub, blocking_task_pool };
Expand Down
120 changes: 111 additions & 9 deletions crates/rpc/rpc/src/eth/filter.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
use super::cache::EthStateCache;
use crate::{
eth::{
error::EthApiError,
logs_utils::{self, append_matching_block_logs},
},
eth::{error::EthApiError, logs_utils, pubsub::EthPubSub},
result::{rpc_error_with_code, ToRpcResult},
EthSubscriptionIdProvider,
};
use core::fmt;
use futures::{executor, StreamExt};
use reth_network_api::NetworkInfo;
use tokio_stream::{wrappers::BroadcastStream, Stream};

use alloy_primitives::B256;
use async_trait::async_trait;
use jsonrpsee::{core::RpcResult, server::IdProvider};
use reth_primitives::{IntoRecoveredTransaction, TxHash};
use reth_provider::{BlockIdReader, BlockReader, EvmEnvProvider, ProviderError};
use reth_provider::{
BlockIdReader, BlockReader, CanonStateSubscriptions, EvmEnvProvider, ProviderError,
};
use reth_rpc_api::EthFilterApiServer;
use reth_rpc_types::{
BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log,
Expand Down Expand Up @@ -44,7 +47,7 @@ pub struct EthFilter<Provider, Pool> {

impl<Provider, Pool> EthFilter<Provider, Pool>
where
Provider: Send + Sync + 'static,
Provider: BlockReader + EvmEnvProvider + Send + Sync + 'static,
Pool: Send + Sync + 'static,
{
/// Creates a new, shareable instance.
Expand All @@ -55,13 +58,18 @@ where
/// See also [EthFilterConfig].
///
/// This also spawns a task that periodically clears stale filters.
pub fn new(
pub fn new<Events, Network>(
provider: Provider,
pool: Pool,
eth_cache: EthStateCache,
config: EthFilterConfig,
task_spawner: Box<dyn TaskSpawner>,
) -> Self {
pubsub: EthPubSub<Provider, Pool, Events, Network>,
) -> Self
where
Events: CanonStateSubscriptions + 'static,
Network: NetworkInfo + 'static,
{
let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
config;
let inner = EthFilterInner {
Expand All @@ -88,6 +96,14 @@ where
}),
);

let also_this = eth_filter.clone();
eth_filter.inner.task_spawner.clone().spawn_critical(
"eth-filters_update_reorged_logs",
Box::pin(async move {
also_this.update_reorged_logs(pubsub).await;
}),
);

eth_filter
}

Expand Down Expand Up @@ -120,6 +136,83 @@ where
is_valid
})
}

/// Endless future that updates the reorged logs for impacted filters
async fn update_reorged_logs<Events, Network>(
&self,
pubsub: EthPubSub<Provider, Pool, Events, Network>,
) where
Events: CanonStateSubscriptions + 'static,
Network: NetworkInfo + 'static,
{
let mut stream = self.reorged_logs_stream(pubsub).await;
while let Some((id, mut logs)) = stream.next().await {
let mut filters = executor::block_on(self.active_filters().inner.lock());
let active_filter =
filters.get_mut(&id).ok_or(FilterError::FilterNotFound(id)).unwrap();
let mut guard = active_filter.reorged_logs.lock().await;
if let Some(reorged_logs_vec) = guard.as_mut() {
reorged_logs_vec.append(&mut logs);
}
}
}

/// Reacts to reorged blocks, checks impacted log filters, stores reorged logs in the log filter
pub async fn reorged_logs_stream<Events, Network>(
&self,
pubsub: EthPubSub<Provider, Pool, Events, Network>,
) -> impl Stream<Item = (FilterId, Vec<Log>)> + '_
where
Events: CanonStateSubscriptions + 'static,
Network: NetworkInfo + 'static,
{
let mut temp_reorged_blocks = Vec::new();

BroadcastStream::new(pubsub.get_chain_events().subscribe_to_canonical_state())
.map(move |canon_state| {
canon_state.expect("new block subscription never ends; qed").block_receipts()
})
.flat_map(futures::stream::iter)
.flat_map(move |(block_receipts, removed)| {
let mut reorged_logs: Vec<(FilterId, Vec<Log>)> = Vec::new();

if removed {
temp_reorged_blocks.push(block_receipts);

let filters = executor::block_on(self.active_filters().inner.lock());

filters.iter().for_each(|(id, active_filter)| {
if let FilterKind::Log(ref filter) = active_filter.kind {
let mut reverted_logs: Vec<Log> = Vec::new();
let filtered_params = FilteredParams::new(Some(*filter.clone()));

for reorged_block in &mut temp_reorged_blocks {
let mut matching_logs =
logs_utils::matching_block_logs_with_tx_hashes(
&filtered_params,
reorged_block.block,
reorged_block
.tx_receipts
.clone()
.iter()
.map(|(tx, receipt)| (*tx, receipt)),
true,
);

reverted_logs.append(&mut matching_logs);
if Some(reorged_block.block.hash) == active_filter.block_hash {
reorged_logs.push((id.clone(), reverted_logs.clone()));
}
}
}
});
} else {
temp_reorged_blocks.clear();
}

futures::stream::iter(reorged_logs)
})
}
}

impl<Provider, Pool> EthFilter<Provider, Pool>
Expand Down Expand Up @@ -390,14 +483,19 @@ where
/// Installs a new filter and returns the new identifier.
async fn install_filter(&self, kind: FilterKind) -> RpcResult<FilterId> {
let last_poll_block_number = self.provider.best_block_number().to_rpc_result()?;
let last_poll_block_hash =
self.provider.block_hash(last_poll_block_number).to_rpc_result()?;
let id = FilterId::from(self.id_provider.next_id());
let mut filters = self.active_filters.inner.lock().await;
let reorged_logs: Arc<Mutex<Option<Vec<Log>>>> = Arc::new(Mutex::new(Some(Vec::new())));
filters.insert(
id.clone(),
ActiveFilter {
block: last_poll_block_number,
block_hash: last_poll_block_hash,
last_poll_timestamp: Instant::now(),
kind,
reorged_logs,
},
);
Ok(id)
Expand Down Expand Up @@ -450,7 +548,7 @@ where
};

if let Some(receipts) = self.eth_cache.get_receipts(block_hash).await? {
append_matching_block_logs(
logs_utils::append_matching_block_logs(
&mut all_logs,
&self.provider,
&filter_params,
Expand Down Expand Up @@ -537,10 +635,14 @@ pub struct ActiveFilters {
struct ActiveFilter {
/// At which block the filter was polled last.
block: u64,
/// Hash of the block at which the filter was polled last.
block_hash: Option<B256>,
/// Last time this filter was polled.
last_poll_timestamp: Instant,
/// What kind of filter it is.
kind: FilterKind,
/// Reorged logs
reorged_logs: Arc<Mutex<Option<Vec<Log>>>>,
}

/// A receiver for pending transactions that returns all new transactions since the last poll.
Expand Down
Loading
Loading