Skip to content

Commit

Permalink
ensure incremental head (#96)
Browse files Browse the repository at this point in the history
  • Loading branch information
ermalkaleci authored Sep 25, 2023
1 parent 1c5fa4e commit 5d1019c
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 29 deletions.
10 changes: 10 additions & 0 deletions src/extensions/api/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,16 @@ impl EthApi {
let number = super::get_number(&val)?;
let hash = super::get_hash(&val)?;

if let Err(e) = super::validate_new_head(&finalized_head_tx, number, &hash)
{
tracing::error!("Error in background task: {e}");
client
.rotate_endpoint()
.await
.expect("Failed to rotate endpoint");
break;
}

tracing::debug!("New finalized head: {number} {hash}");
finalized_head_tx.send_replace(Some((hash, number)));
}
Expand Down
20 changes: 20 additions & 0 deletions src/extensions/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,23 @@ pub(crate) fn get_hash(val: &JsonValue) -> anyhow::Result<JsonValue> {
}
Err(anyhow::Error::msg("Hash not found"))
}

pub(crate) fn validate_new_head(
tx: &watch::Sender<Option<(JsonValue, u64)>>,
number: u64,
hash: &JsonValue,
) -> anyhow::Result<()> {
if let Some((current_hash, current_number)) = tx.borrow().as_ref() {
if *current_number > number {
return Err(anyhow::Error::msg("Head number is not increasing, current_number: {current_number} new_number: {number}"));
}

if *current_number == number && current_hash != hash {
return Err(anyhow::Error::msg(
"Head number is the same but hash is not matching, current_hash: {current_hash} new_hash: {hash}"
));
}
}

Ok(())
}
22 changes: 16 additions & 6 deletions src/extensions/api/substrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ impl SubstrateApi {

let number = super::get_number(&val)?;

let res = client
let hash = client
.request("chain_getBlockHash", vec![number.into()])
.await?;

tracing::debug!("New head: {number} {res}");
head_tx.send_replace(Some((res, number)));
tracing::debug!("New head: {number} {hash}");
head_tx.send_replace(Some((hash, number)));
} else {
break;
}
Expand Down Expand Up @@ -145,12 +145,22 @@ impl SubstrateApi {
while let Some(Ok(val)) = sub.next().await {
let number = super::get_number(&val)?;

let res = client
let hash = client
.request("chain_getBlockHash", vec![number.into()])
.await?;

tracing::debug!("New finalized head: {number} {res}");
finalized_head_tx.send_replace(Some((res, number)));
if let Err(e) = super::validate_new_head(&finalized_head_tx, number, &hash)
{
tracing::error!("Error in background task: {e}");
client
.rotate_endpoint()
.await
.expect("Failed to rotate endpoint");
break;
}

tracing::debug!("New finalized head: {number} {hash}");
finalized_head_tx.send_replace(Some((hash, number)));
}

Ok::<(), anyhow::Error>(())
Expand Down
170 changes: 147 additions & 23 deletions src/extensions/api/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,89 +75,89 @@ async fn get_head_finalized_head() {
// access value before subscription is established

let h1 = tokio::spawn(async move {
assert_eq!(head.read().await, (json!("0xabcd"), 0x1234));
assert_eq!(head.read().await, (json!("0xaa"), 0x01));
// should be able to read it multiple times
assert_eq!(head.read().await, (json!("0xabcd"), 0x1234));
assert_eq!(head.read().await, (json!("0xaa"), 0x01));
});

let (_, head_sink) = head_rx.recv().await.unwrap();
head_sink
.send(SubscriptionMessage::from_json(&json!({ "number": "0x1234" })).unwrap())
.send(SubscriptionMessage::from_json(&json!({ "number": "0x01" })).unwrap())
.await
.unwrap();

{
let (params, tx) = block_rx.recv().await.unwrap();
assert_eq!(params, json!([0x1234]));
tx.send(json!("0xabcd")).unwrap();
assert_eq!(params, json!([0x01]));
tx.send(json!("0xaa")).unwrap();
}

let (_, finalized_head_sink) = finalized_head_rx.recv().await.unwrap();
finalized_head_sink
.send(SubscriptionMessage::from_json(&json!({ "number": "0x4321" })).unwrap())
.send(SubscriptionMessage::from_json(&json!({ "number": "0x01" })).unwrap())
.await
.unwrap();

{
let (params, tx) = block_rx.recv().await.unwrap();
assert_eq!(params, json!([0x4321]));
tx.send(json!("0xdcba")).unwrap();
assert_eq!(params, json!([0x01]));
tx.send(json!("0xaa")).unwrap();
}

// read after subscription is established

let h2 = tokio::spawn(async move {
let val = finalized_head.read().await;
assert_eq!(val, (json!("0xdcba"), 0x4321));
assert_eq!(val, (json!("0xaa"), 0x01));
});

// new head

head_sink
.send(SubscriptionMessage::from_json(&json!({ "number": "0x1122" })).unwrap())
.send(SubscriptionMessage::from_json(&json!({ "number": "0x02" })).unwrap())
.await
.unwrap();

{
let (params, tx) = block_rx.recv().await.unwrap();
assert_eq!(params, json!([0x1122]));
tx.send(json!("0xaabb")).unwrap();
assert_eq!(params, json!([0x02]));
tx.send(json!("0xbb")).unwrap();
}

let finalized_head = api.get_finalized_head();
// still old value
assert_eq!(finalized_head.read().await, (json!("0xdcba"), 0x4321));
assert_eq!(finalized_head.read().await, (json!("0xaa"), 0x01));

// wait a bit for the value to be updated
tokio::time::sleep(std::time::Duration::from_millis(1)).await;

let head = api.get_head();
assert_eq!(head.read().await, (json!("0xaabb"), 0x1122));
assert_eq!(head.read().await, (json!("0xbb"), 0x02));

// new finalized head
finalized_head_sink
.send(SubscriptionMessage::from_json(&json!({ "number": "0x2233" })).unwrap())
.send(SubscriptionMessage::from_json(&json!({ "number": "0x03" })).unwrap())
.await
.unwrap();
finalized_head_sink
.send(SubscriptionMessage::from_json(&json!({ "number": "0x3344" })).unwrap())
.send(SubscriptionMessage::from_json(&json!({ "number": "0x04" })).unwrap())
.await
.unwrap();

{
let (params, tx) = block_rx.recv().await.unwrap();
assert_eq!(params, json!([0x2233]));
tx.send(json!("0xbbcc")).unwrap();
assert_eq!(params, json!([0x03]));
tx.send(json!("0xcc")).unwrap();

let (params, tx) = block_rx.recv().await.unwrap();
assert_eq!(params, json!([0x3344]));
tx.send(json!("0xccdd")).unwrap();
assert_eq!(params, json!([0x04]));
tx.send(json!("0xdd")).unwrap();
}

// wait a bit for the value to be updated
tokio::time::sleep(std::time::Duration::from_millis(1)).await;

assert_eq!(finalized_head.read().await, (json!("0xccdd"), 0x3344));
assert_eq!(finalized_head.read().await, (json!("0xdd"), 0x04));

h1.await.unwrap();
h2.await.unwrap();
Expand All @@ -170,7 +170,7 @@ async fn rotate_endpoint_on_stale() {
let (addr2, server2, mut head_rx2, _, mut block_rx2) = create_server().await;

let client = Client::new([format!("ws://{addr}"), format!("ws://{addr2}")]).unwrap();
let api = SubstrateApi::new(Arc::new(client), std::time::Duration::from_millis(10));
let api = SubstrateApi::new(Arc::new(client), std::time::Duration::from_millis(100));

let head = api.get_head();
let h1 = tokio::spawn(async move {
Expand Down Expand Up @@ -209,7 +209,7 @@ async fn rotate_endpoint_on_stale() {
assert_eq!(api.get_head().read().await, (json!("0xbcde"), 0x2345));

// wait for timeout
tokio::time::sleep(std::time::Duration::from_millis(15)).await;
tokio::time::sleep(std::time::Duration::from_millis(150)).await;

// stale
assert!(head_sink.is_closed());
Expand All @@ -235,3 +235,127 @@ async fn rotate_endpoint_on_stale() {
server.stop().unwrap();
server2.stop().unwrap();
}

#[tokio::test]
async fn rotate_endpoint_on_head_mismatch() {
let (addr1, server1, mut head_rx1, mut finalized_head_rx1, mut block_rx1) =
create_server().await;
let (addr2, server2, mut head_rx2, mut finalized_head_rx2, mut block_rx2) =
create_server().await;

let client = Client::new([format!("ws://{addr1}"), format!("ws://{addr2}")]).unwrap();

let client = Arc::new(client);
// TODO: investigate why it takes a while to connect to another endpoint
let api = SubstrateApi::new(client.clone(), std::time::Duration::from_millis(5_000));

let head = api.get_head();
let finalized_head = api.get_finalized_head();
let h1 = tokio::spawn(async move {
assert_eq!(head.read().await, (json!("0xaa"), 1));
assert_eq!(finalized_head.read().await, (json!("0xaa"), 1));
});

// initial connection
let (_, head_sink) = head_rx1.recv().await.unwrap();
head_sink
.send(SubscriptionMessage::from_json(&json!({ "number": "0x01" })).unwrap())
.await
.unwrap();
{
let (params, tx) = block_rx1.recv().await.unwrap();
assert_eq!(params, json!([0x01]));
tx.send(json!("0xaa")).unwrap();
}

let (_, finalized_head_sink) = finalized_head_rx1.recv().await.unwrap();
finalized_head_sink
.send(SubscriptionMessage::from_json(&json!({ "number": "0x01" })).unwrap())
.await
.unwrap();
{
let (params, tx) = block_rx1.recv().await.unwrap();
assert_eq!(params, json!([0x01]));
tx.send(json!("0xaa")).unwrap();
}

h1.await.unwrap();

// not stale
head_sink
.send(SubscriptionMessage::from_json(&json!({ "number": "0x02" })).unwrap())
.await
.unwrap();
{
let (params, tx) = block_rx1.recv().await.unwrap();
assert_eq!(params, json!([0x02]));
tx.send(json!("0xbb")).unwrap();
}
finalized_head_sink
.send(SubscriptionMessage::from_json(&json!({ "number": "0x02" })).unwrap())
.await
.unwrap();
{
let (params, tx) = block_rx1.recv().await.unwrap();
assert_eq!(params, json!([0x02]));
tx.send(json!("0xbb")).unwrap();
}

// wait a bit to process tasks
tokio::time::sleep(std::time::Duration::from_millis(1)).await;

assert_eq!(api.get_finalized_head().read().await, (json!("0xbb"), 0x02));

// stale server finalized head 1, trigger rotate endpoint
finalized_head_sink
.send(SubscriptionMessage::from_json(&json!({ "number": "0x01" })).unwrap())
.await
.unwrap();
{
let (params, tx) = block_rx1.recv().await.unwrap();
assert_eq!(params, json!([0x01]));
tx.send(json!("0xaa")).unwrap();
}

// current finalized head is still 2
assert_eq!(api.get_finalized_head().read().await, (json!("0xbb"), 0x02));

let (_, head_sink2) = head_rx2.recv().await.unwrap();
head_sink2
.send(SubscriptionMessage::from_json(&json!({ "number": "0x03" })).unwrap())
.await
.unwrap();

let (params, tx) = block_rx2.recv().await.unwrap();
assert_eq!(params, json!([0x03]));
tx.send(json!("0xcc")).unwrap();

let (_, finalized_head_sink2) = finalized_head_rx2.recv().await.unwrap();
finalized_head_sink2
.send(SubscriptionMessage::from_json(&json!({ "number": "0x03" })).unwrap())
.await
.unwrap();

let (params, tx) = block_rx2.recv().await.unwrap();
assert_eq!(params, json!([0x03]));
tx.send(json!("0xcc")).unwrap();

head_sink2
.send(SubscriptionMessage::from_json(&json!({ "number": "0x04" })).unwrap())
.await
.unwrap();

let (params, tx) = block_rx2.recv().await.unwrap();
assert_eq!(params, json!([0x04]));
tx.send(json!("0xdd")).unwrap();

// wait a bit to process tasks
tokio::time::sleep(std::time::Duration::from_millis(1)).await;

// current head=4 and finalized_head=3
assert_eq!(api.get_head().read().await, (json!("0xdd"), 0x04));
assert_eq!(api.get_finalized_head().read().await, (json!("0xcc"), 0x03));

server1.stop().unwrap();
server2.stop().unwrap();
}

0 comments on commit 5d1019c

Please sign in to comment.