Skip to content

Commit

Permalink
fix bench inject (#125)
Browse files Browse the repository at this point in the history
  • Loading branch information
ermalkaleci authored Nov 1, 2023
1 parent d34cdd8 commit cd27653
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 29 deletions.
58 changes: 40 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ governor = { version = "0.6.0", path = "./vendor/governor/governor" }

[dev-dependencies]
criterion = { version = "0.5.1", features = ["async_tokio", "html_reports"] }
pprof = { version = "0.12.1", features = ["flamegraph", "criterion"] }
pprof = { version = "0.13.0", features = ["flamegraph", "criterion"] }
futures-util = "0.3.15"
jsonrpc-ws-server = { version = "18.0.0" }
jsonrpc-http-server = { version = "18.0.0" }
Expand All @@ -56,7 +56,7 @@ name = "bench"
harness = false

[target.'cfg(tokio_unstable)'.dependencies]
console-subscriber = "0.1.8"
console-subscriber = "0.2.0"

[profile.release]
codegen-units = 1
Expand Down
13 changes: 13 additions & 0 deletions benches/bench/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,19 @@ pub async fn ws_server(handle: tokio::runtime::Handle, url: &str) -> (String, js
},
)
.unwrap();
module
.register_subscription(
"chain_subscribeFinalizedHeads",
"chain_finalizedHead",
"chain_unsubscribeFinalizedHeads",
|_params, pending, _ctx| async move {
let sink = pending.accept().await?;
let msg = SubscriptionMessage::from_json(&serde_json::json!({ "number": "0x4321" }))?;
sink.send(msg).await?;
Ok(())
},
)
.unwrap();

let addr = format!("ws://{}", server.local_addr().unwrap());
let handle = server.start(module);
Expand Down
19 changes: 10 additions & 9 deletions benches/bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use subway::{
config::{Config, MergeStrategy, MethodParam, MiddlewaresConfig, RpcDefinitions, RpcMethod, RpcSubscription},
extensions::{client::ClientConfig, server::ServerConfig, ExtensionsConfig},
server::start_server,
utils::TypeRegistryRef,
};

mod helpers;
Expand Down Expand Up @@ -119,7 +120,7 @@ trait RequestBencher {
let rt = TokioRuntime::new().unwrap();
let (_url1, _server1) = rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_ONE_ENDPOINT));
let (_url2, _server2) = rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_TWO_ENDPOINT));
let (url, _server) = rt.block_on(server());
let (url, _server, _registry) = rt.block_on(server());
ws_custom_headers_handshake(&rt, crit, &url, "ws_custom_headers_handshake", Self::REQUEST_TYPE);
ws_concurrent_conn_calls(
&rt,
Expand All @@ -138,7 +139,7 @@ trait RequestBencher {
let rt = TokioRuntime::new().unwrap();
let (_url1, _server1) = rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_ONE_ENDPOINT));
let (_url2, _server2) = rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_TWO_ENDPOINT));
let (url, _server) = rt.block_on(server());
let (url, _server, _registry) = rt.block_on(server());
ws_concurrent_conn_calls(
&rt,
crit,
Expand All @@ -161,7 +162,7 @@ trait RequestBencher {
let rt = TokioRuntime::new().unwrap();
let (_url1, _server1) = rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_ONE_ENDPOINT));
let (_url2, _server2) = rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_TWO_ENDPOINT));
let (url, _server) = rt.block_on(server());
let (url, _server, _registry) = rt.block_on(server());
ws_concurrent_conn_calls(
&rt,
crit,
Expand All @@ -176,7 +177,7 @@ trait RequestBencher {
let rt = TokioRuntime::new().unwrap();
let (_url1, _server1) = rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_ONE_ENDPOINT));
let (_url2, _server2) = rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_TWO_ENDPOINT));
let (url, _server) = rt.block_on(server());
let (url, _server, _registry) = rt.block_on(server());
let client = Arc::new(rt.block_on(ws_client(&url)));
sub_round_trip(&rt, crit, client, "subscriptions");
}
Expand All @@ -185,7 +186,7 @@ trait RequestBencher {
let rt = TokioRuntime::new().unwrap();
let (_url1, _server1) = rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_ONE_ENDPOINT));
let (_url2, _server2) = rt.block_on(helpers::ws_server(rt.handle().clone(), SERVER_TWO_ENDPOINT));
let (url, _server) = rt.block_on(server());
let (url, _server, _registry) = rt.block_on(server());
let client = Arc::new(rt.block_on(ws_client(&url)));
ws_inject_calls(&rt, crit, client, "ws_inject_calls", Self::REQUEST_TYPE);
}
Expand Down Expand Up @@ -218,7 +219,7 @@ fn config() -> Config {
http_methods: Vec::new(),
}),
substrate_api: Some(SubstrateApiConfig {
stale_timeout_seconds: u64::MAX,
stale_timeout_seconds: 5_000,
}),
..Default::default()
},
Expand Down Expand Up @@ -302,10 +303,10 @@ fn config() -> Config {
}
}

async fn server() -> (String, jsonrpsee::server::ServerHandle) {
async fn server() -> (String, jsonrpsee::server::ServerHandle, TypeRegistryRef) {
let config = config();
let (addr, handle, _) = start_server(config).await.unwrap();
(format!("ws://{}", addr), handle)
let (addr, handle, registry) = start_server(config).await.unwrap();
(format!("ws://{}", addr), handle, registry)
}

fn ws_concurrent_conn_calls(
Expand Down

0 comments on commit cd27653

Please sign in to comment.