Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subgraph composition: Use block cache to get blocks for subgraph triggers #5606

Draft
wants to merge 3 commits into
base: krishna/subgraph-comp-entity-ops-wasm
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ impl Chain {
// caller can spawn.
pub async fn cheapest_adapter(&self) -> Arc<EthereumAdapter> {
let adapters = match self.client.as_ref() {
ChainClient::Firehose(_) => panic!("no adapter with firehose"),
ChainClient::Firehose(_, _) => panic!("no adapter with firehose"),
ChainClient::Rpc(adapter) => adapter,
};
adapters.cheapest().await.unwrap()
Expand Down Expand Up @@ -472,7 +472,7 @@ impl Blockchain for Chain {
)
.await
}
ChainClient::Firehose(_) => {
ChainClient::Firehose(_, _) => {
self.block_stream_builder
.build_firehose(
self,
Expand All @@ -498,7 +498,7 @@ impl Blockchain for Chain {
number: BlockNumber,
) -> Result<BlockPtr, IngestorError> {
match self.client.as_ref() {
ChainClient::Firehose(endpoints) => endpoints
ChainClient::Firehose(endpoints, _) => endpoints
.endpoint()
.await?
.block_ptr_for_number::<HeaderOnlyBlock>(logger, number)
Expand Down Expand Up @@ -557,7 +557,7 @@ impl Blockchain for Chain {

async fn block_ingestor(&self) -> anyhow::Result<Box<dyn BlockIngestor>> {
let ingestor: Box<dyn BlockIngestor> = match self.chain_client().as_ref() {
ChainClient::Firehose(_) => {
ChainClient::Firehose(_, _) => {
let ingestor = FirehoseBlockIngestor::<HeaderOnlyBlock, Self>::new(
self.chain_store.cheap_clone(),
self.chain_client(),
Expand Down Expand Up @@ -852,7 +852,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
use graph::prelude::LightEthereumBlockExt;

let block = match self.chain_client.as_ref() {
ChainClient::Firehose(_) => Some(BlockPtr {
ChainClient::Firehose(_, _) => Some(BlockPtr {
hash: BlockHash::from(vec![0xff; 32]),
number: block.number.saturating_sub(1),
}),
Expand Down
103 changes: 92 additions & 11 deletions chain/ethereum/src/ethereum_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,45 @@ impl EthereumAdapter {
.buffered(ENV_VARS.block_batch_size)
}

/// Request blocks by number through JSON-RPC.
fn load_blocks_by_numbers_rpc(
&self,
logger: Logger,
numbers: Vec<BlockNumber>,
) -> impl Stream<Item = Arc<LightEthereumBlock>, Error = Error> + Send {
let web3 = self.web3.clone();

stream::iter_ok::<_, Error>(numbers.into_iter().map(move |number| {
let web3 = web3.clone();
retry(format!("load block {}", number), &logger)
.limit(ENV_VARS.request_retries)
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
.run(move || {
Box::pin(
web3.eth()
.block_with_txs(BlockId::Number(Web3BlockNumber::Number(
number.into(),
))),
)
.compat()
.from_err::<Error>()
.and_then(move |block| {
block.map(Arc::new).ok_or_else(|| {
anyhow::anyhow!(
"Ethereum node did not find block with number {:?}",
number
)
})
})
.compat()
})
.boxed()
.compat()
.from_err()
}))
.buffered(ENV_VARS.block_batch_size)
}

/// Request blocks ptrs for numbers through JSON-RPC.
///
/// Reorg safety: If ids are numbers, they must be a final blocks.
Expand Down Expand Up @@ -1650,26 +1689,68 @@ impl EthereumAdapterTrait for EthereumAdapter {
Ok(decoded)
}

// This is a ugly temporary implementation to get the block ptrs for a range of blocks
/// Load Ethereum blocks in bulk by number, returning results as they come back as a Stream.
async fn load_blocks_by_numbers(
&self,
logger: Logger,
chain_store: Arc<dyn ChainStore>,
block_numbers: HashSet<BlockNumber>,
) -> Box<dyn Stream<Item = Arc<LightEthereumBlock>, Error = Error> + Send> {
let block_hashes = block_numbers
let blocks_map: BTreeMap<i32, Vec<json::Value>> = chain_store
.cheap_clone()
.blocks_by_numbers(block_numbers.iter().map(|&b| b.into()).collect::<Vec<_>>())
.await
.map_err(|e| {
error!(&logger, "Error accessing block cache {}", e);
e
})
.unwrap_or_default();

let mut blocks: Vec<Arc<LightEthereumBlock>> = blocks_map
.into_iter()
.map(|number| {
chain_store
.block_hashes_by_block_number(number)
.unwrap()
.first()
.unwrap()
.as_h256()
.filter_map(|(_number, values)| {
if values.len() == 1 {
json::from_value(values[0].clone()).ok()
} else {
None
}
})
.collect::<HashSet<_>>();
.collect::<Vec<_>>();

self.load_blocks(logger, chain_store, block_hashes).await
let missing_blocks: Vec<i32> = block_numbers
.into_iter()
.filter(|&number| !blocks.iter().any(|block| block.number() == number))
.collect();

if !missing_blocks.is_empty() {
debug!(
logger,
"Loading {} block(s) not in the block cache",
missing_blocks.len()
);
}

Box::new(
self.load_blocks_by_numbers_rpc(logger.clone(), missing_blocks)
.collect()
.map(move |new_blocks| {
let upsert_blocks: Vec<_> = new_blocks
.iter()
.map(|block| BlockFinality::Final(block.clone()))
.collect();
let block_refs: Vec<_> = upsert_blocks
.iter()
.map(|block| block as &dyn graph::blockchain::Block)
.collect();
if let Err(e) = chain_store.upsert_light_blocks(block_refs.as_slice()) {
error!(logger, "Error writing to block cache {}", e);
}
blocks.extend(new_blocks);
blocks.sort_by_key(|block| block.number);
stream::iter_ok(blocks)
})
.flatten_stream(),
)
}

/// Load Ethereum blocks in bulk, returning results as they come back as a Stream.
Expand Down
13 changes: 9 additions & 4 deletions graph/src/blockchain/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@ use anyhow::anyhow;
// Substreams only requires the FirehoseEndpoints.
#[derive(Debug)]
pub enum ChainClient<C: Blockchain> {
Firehose(FirehoseEndpoints),
Firehose(FirehoseEndpoints, Option<C::Client>),
Rpc(C::Client),
}

impl<C: Blockchain> ChainClient<C> {
pub fn new_firehose(firehose_endpoints: FirehoseEndpoints) -> Self {
Self::Firehose(firehose_endpoints)
Self::Firehose(firehose_endpoints, None)
}

pub fn new_firehose_with_rpc(firehose_endpoints: FirehoseEndpoints, rpc: C::Client) -> Self {
Self::Firehose(firehose_endpoints, Some(rpc))
}

pub fn new_rpc(rpc: C::Client) -> Self {
Expand All @@ -26,21 +30,22 @@ impl<C: Blockchain> ChainClient<C> {

pub fn is_firehose(&self) -> bool {
match self {
ChainClient::Firehose(_) => true,
ChainClient::Firehose(_, _) => true,
ChainClient::Rpc(_) => false,
}
}

pub async fn firehose_endpoint(&self) -> anyhow::Result<Arc<FirehoseEndpoint>> {
match self {
ChainClient::Firehose(endpoints) => endpoints.endpoint().await,
ChainClient::Firehose(endpoints, _) => endpoints.endpoint().await,
_ => Err(anyhow!("firehose endpoint requested on rpc chain client")),
}
}

pub fn rpc(&self) -> anyhow::Result<&C::Client> {
match self {
Self::Rpc(rpc) => Ok(rpc),
Self::Firehose(_, Some(rpc)) => Ok(rpc),
_ => Err(anyhow!("rpc endpoint requested on firehose chain client")),
}
}
Expand Down
6 changes: 6 additions & 0 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,12 @@ pub trait ChainStore: Send + Sync + 'static {
hashes: Vec<BlockHash>,
) -> Result<Vec<serde_json::Value>, Error>;

/// Returns the blocks present in the store for the given block numbers.
async fn blocks_by_numbers(
self: Arc<Self>,
numbers: Vec<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<serde_json::Value>>, Error>;

/// Get the `offset`th ancestor of `block_hash`, where offset=0 means the block matching
/// `block_hash` and offset=1 means its parent. If `root` is passed, short-circuit upon finding
/// a child of `root`. Returns None if unable to complete due to missing blocks in the chain
Expand Down
5 changes: 4 additions & 1 deletion node/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,10 @@ pub async fn networks_as_chains(
let eth_adapters = networks.ethereum_rpcs(chain_id.clone());

let cc = if firehose_endpoints.len() > 0 {
ChainClient::<graph_chain_ethereum::Chain>::new_firehose(firehose_endpoints)
ChainClient::<graph_chain_ethereum::Chain>::new_firehose_with_rpc(
firehose_endpoints,
eth_adapters.clone(),
)
} else {
ChainClient::<graph_chain_ethereum::Chain>::new_rpc(eth_adapters.clone())
};
Expand Down
Loading
Loading