Skip to content

Commit 110fcc5

Browse files
committed
add Firehose/RPC fallback for stale block cache in ancestor_block
1 parent 85f942b commit 110fcc5

File tree

1 file changed

+103
-59
lines changed

1 file changed

+103
-59
lines changed

chain/ethereum/src/chain.rs

Lines changed: 103 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use graph::blockchain::{
1010
use graph::components::network_provider::ChainName;
1111
use graph::components::store::{DeploymentCursorTracker, SourceableStore};
1212
use graph::data::subgraph::UnifiedMappingApiVersion;
13-
use graph::firehose::{FirehoseEndpoint, ForkStep};
13+
use graph::firehose::{FirehoseEndpoint, FirehoseEndpoints, ForkStep};
1414
use graph::futures03::TryStreamExt;
1515
use graph::prelude::{
1616
retry, BlockHash, ComponentLoggerConfig, ElasticComponentLoggerConfig, EthereumBlock,
@@ -1037,32 +1037,62 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
10371037
root: Option<BlockHash>,
10381038
) -> Result<Option<BlockFinality>, Error> {
10391039
let ptr_for_log = ptr.clone();
1040-
let block: Option<EthereumBlock> = self
1040+
let cached = self
10411041
.chain_store
10421042
.cheap_clone()
10431043
.ancestor_block(ptr, offset, root)
1044-
.await?
1045-
.map(|(json_value, block_ptr)| {
1046-
json::from_value(json_value.clone()).map_err(|e| {
1047-
warn!(
1048-
self.logger,
1049-
"Failed to deserialize cached ancestor block {} (offset {} from {}): {}. \
1050-
This may indicate stale cache data from a previous version.",
1051-
block_ptr.hash_hex(),
1052-
offset,
1053-
ptr_for_log.hash_hex(),
1054-
e
1055-
);
1056-
e
1057-
})
1058-
})
1059-
.transpose()?;
1060-
Ok(block.map(|block| {
1061-
BlockFinality::NonFinal(EthereumBlockWithCalls {
1044+
.await?;
1045+
1046+
let Some((json_value, block_ptr)) = cached else {
1047+
return Ok(None);
1048+
};
1049+
1050+
match json::from_value::<EthereumBlock>(json_value.clone()) {
1051+
Ok(block) => Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls {
10621052
ethereum_block: block,
10631053
calls: None,
1064-
})
1065-
}))
1054+
}))),
1055+
Err(e) => {
1056+
warn!(
1057+
self.logger,
1058+
"Failed to deserialize cached ancestor block {} (offset {} from {}): {}. \
1059+
This may indicate stale cache data from a previous version. \
1060+
Falling back to Firehose/RPC.",
1061+
block_ptr.hash_hex(),
1062+
offset,
1063+
ptr_for_log.hash_hex(),
1064+
e
1065+
);
1066+
1067+
match self.chain_client.as_ref() {
1068+
ChainClient::Firehose(endpoints) => {
1069+
let block = self
1070+
.fetch_block_with_firehose(endpoints, &block_ptr)
1071+
.await?;
1072+
let ethereum_block: EthereumBlockWithCalls = (&block).try_into()?;
1073+
Ok(Some(BlockFinality::NonFinal(ethereum_block)))
1074+
}
1075+
ChainClient::Rpc(adapters) => {
1076+
match self
1077+
.fetch_light_block_with_rpc(adapters, &block_ptr)
1078+
.await?
1079+
{
1080+
Some(light_block) => {
1081+
let ethereum_block = EthereumBlock {
1082+
block: light_block,
1083+
transaction_receipts: vec![],
1084+
};
1085+
Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls {
1086+
ethereum_block,
1087+
calls: None,
1088+
})))
1089+
}
1090+
None => Ok(None),
1091+
}
1092+
}
1093+
}
1094+
}
1095+
}
10661096
}
10671097

10681098
async fn parent_ptr(&self, block: &BlockPtr) -> Result<Option<BlockPtr>, Error> {
@@ -1093,50 +1123,64 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
10931123
}
10941124

10951125
// If not in store, fetch from Firehose
1096-
let endpoint = endpoints.endpoint().await?;
1097-
let logger = self.logger.clone();
1098-
let retry_log_message =
1099-
format!("get_block_by_ptr for block {} with firehose", block);
1100-
let block = block.clone();
1101-
1102-
retry(retry_log_message, &logger)
1103-
.limit(ENV_VARS.request_retries)
1104-
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
1105-
.run(move || {
1106-
let endpoint = endpoint.cheap_clone();
1107-
let logger = logger.cheap_clone();
1108-
let block = block.clone();
1109-
async move {
1110-
endpoint
1111-
.get_block_by_ptr::<codec::Block>(&block, &logger)
1112-
.await
1113-
.context(format!(
1114-
"Failed to fetch block by ptr {} from firehose",
1115-
block
1116-
))
1117-
}
1118-
})
1126+
self.fetch_block_with_firehose(endpoints, block)
11191127
.await?
11201128
.parent_ptr()
11211129
}
1122-
ChainClient::Rpc(adapters) => {
1123-
let blocks = adapters
1124-
.cheapest_with(&self.capabilities)
1125-
.await?
1126-
.load_blocks(
1127-
self.logger.cheap_clone(),
1128-
self.chain_store.cheap_clone(),
1129-
HashSet::from_iter(Some(block.hash.as_b256())),
1130-
)
1131-
.await?;
1132-
assert_eq!(blocks.len(), 1);
1133-
1134-
blocks[0].parent_ptr()
1135-
}
1130+
ChainClient::Rpc(adapters) => self
1131+
.fetch_light_block_with_rpc(adapters, block)
1132+
.await?
1133+
.expect("block must exist for parent_ptr")
1134+
.parent_ptr(),
11361135
};
11371136

11381137
Ok(block)
11391138
}
1139+
1140+
async fn fetch_block_with_firehose(
1141+
&self,
1142+
endpoints: &FirehoseEndpoints,
1143+
block_ptr: &BlockPtr,
1144+
) -> Result<codec::Block, Error> {
1145+
let endpoint = endpoints.endpoint().await?;
1146+
let logger = self.logger.clone();
1147+
let retry_log_message = format!("fetch_block_with_firehose {}", block_ptr);
1148+
let block_ptr = block_ptr.clone();
1149+
1150+
retry(retry_log_message, &logger)
1151+
.limit(ENV_VARS.request_retries)
1152+
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
1153+
.run(move || {
1154+
let endpoint = endpoint.cheap_clone();
1155+
let logger = logger.cheap_clone();
1156+
let block_ptr = block_ptr.clone();
1157+
async move {
1158+
endpoint
1159+
.get_block_by_ptr::<codec::Block>(&block_ptr, &logger)
1160+
.await
1161+
.context(format!("Failed to fetch block {} from firehose", block_ptr))
1162+
}
1163+
})
1164+
.await
1165+
}
1166+
1167+
async fn fetch_light_block_with_rpc(
1168+
&self,
1169+
adapters: &EthereumNetworkAdapters,
1170+
block_ptr: &BlockPtr,
1171+
) -> Result<Option<Arc<LightEthereumBlock>>, Error> {
1172+
let blocks = adapters
1173+
.cheapest_with(&self.capabilities)
1174+
.await?
1175+
.load_blocks(
1176+
self.logger.cheap_clone(),
1177+
self.chain_store.cheap_clone(),
1178+
HashSet::from_iter(Some(block_ptr.hash.as_b256())),
1179+
)
1180+
.await?;
1181+
1182+
Ok(blocks.into_iter().next())
1183+
}
11401184
}
11411185

11421186
pub struct FirehoseMapper {

0 commit comments

Comments
 (0)