Skip to content

Commit

Permalink
merge subscription propagate error (#132)
Browse files Browse the repository at this point in the history
  • Loading branch information
ermalkaleci authored Nov 13, 2023
1 parent e7e5111 commit a2d7e57
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 41 deletions.
43 changes: 35 additions & 8 deletions src/integration_tests/upstream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::{
config::{Config, MiddlewaresConfig, RpcDefinitions, RpcSubscription},
config::{Config, MergeStrategy, MiddlewaresConfig, RpcDefinitions, RpcSubscription},
extensions::{
client::{mock::TestServerBuilder, Client, ClientConfig},
merge_subscription::MergeSubscriptionConfig,
server::ServerConfig,
ExtensionsConfig,
},
Expand All @@ -14,9 +15,14 @@ async fn upstream_error_propagate() {
let unsubscribe_mock = "mock_unsub";
let update_mock = "mock";

let subscribe_merge_mock = "mock_merge_sub";
let unsubscribe_merge_mock = "mock_merge_unsub";
let update_merge_mock = "mock_merge";

let mut builder = TestServerBuilder::new();

builder.register_error_subscription(subscribe_mock, update_mock, unsubscribe_mock);
builder.register_error_subscription(subscribe_merge_mock, unsubscribe_merge_mock, update_merge_mock);

let (addr, _upstream_handle) = builder.build().await;

Expand All @@ -33,20 +39,31 @@ async fn upstream_error_propagate() {
request_timeout_seconds: 120,
http_methods: Vec::new(),
}),
merge_subscription: Some(MergeSubscriptionConfig {
keep_alive_seconds: Some(1),
}),
..Default::default()
},
middlewares: MiddlewaresConfig {
methods: vec![],
subscriptions: vec!["upstream".to_string()],
subscriptions: vec!["merge_subscription".to_string(), "upstream".to_string()],
},
rpcs: RpcDefinitions {
methods: vec![],
subscriptions: vec![RpcSubscription {
subscribe: subscribe_mock.to_string(),
unsubscribe: unsubscribe_mock.to_string(),
name: update_mock.to_string(),
merge_strategy: None,
}],
subscriptions: vec![
RpcSubscription {
subscribe: subscribe_mock.to_string(),
unsubscribe: unsubscribe_mock.to_string(),
name: update_mock.to_string(),
merge_strategy: None,
},
RpcSubscription {
subscribe: subscribe_merge_mock.to_string(),
unsubscribe: unsubscribe_merge_mock.to_string(),
name: update_merge_mock.to_string(),
merge_strategy: Some(MergeStrategy::Replace),
},
],
aliases: vec![],
},
};
Expand All @@ -57,6 +74,16 @@ async fn upstream_error_propagate() {
let client = Client::with_endpoints([format!("ws://{addr}")]).unwrap();
let result = client.subscribe(subscribe_mock, vec![], unsubscribe_mock).await;

assert!(result
.err()
.unwrap()
.to_string()
.contains("Inability to pay some fees (e.g. account balance too low)"));

let result = client
.subscribe(subscribe_merge_mock, vec![], unsubscribe_merge_mock)
.await;

assert!(result
.err()
.unwrap()
Expand Down
4 changes: 2 additions & 2 deletions src/middlewares/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub struct SubscriptionRequest {
pub subscribe: String,
pub params: Vec<JsonValue>,
pub unsubscribe: String,
pub sink: PendingSubscriptionSink,
pub pending_sink: PendingSubscriptionSink,
}

impl Debug for SubscriptionRequest {
Expand All @@ -64,7 +64,7 @@ impl Debug for SubscriptionRequest {
.field("subscribe", &self.subscribe)
.field("params", &self.params)
.field("unsubscribe", &self.unsubscribe)
.field("sink_id", &self.sink.connection_id())
.field("sink_id", &self.pending_sink.connection_id())
.finish()
}
}
Expand Down
66 changes: 43 additions & 23 deletions src/middlewares/subscriptions/merge_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
extensions::{client::Client, merge_subscription::MergeSubscription},
middleware::{Middleware, MiddlewareBuilder, NextFn, RpcSubscription},
middlewares::{SubscriptionRequest, SubscriptionResult},
utils::{CacheKey, TypeRegistry, TypeRegistryRef},
utils::{errors, CacheKey, TypeRegistry, TypeRegistryRef},
};

#[derive(Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -87,8 +87,10 @@ impl MergeSubscriptionMiddleware {
subscribe: String,
params: Vec<JsonValue>,
unsubscribe: String,
) -> Result<Box<dyn FnOnce() -> broadcast::Receiver<SubscriptionMessage> + Sync + Send + 'static>, StringError>
{
) -> Result<
Box<dyn FnOnce() -> broadcast::Receiver<SubscriptionMessage> + Sync + Send + 'static>,
jsonrpsee::core::Error,
> {
if let Some(tx) = self.upstream_subs.read().await.get(&key).cloned() {
tracing::trace!("Found existing upstream subscription for {}", &subscribe);
return Ok(Box::new(move || tx.subscribe()));
Expand Down Expand Up @@ -204,36 +206,54 @@ impl Middleware<SubscriptionRequest, Result<(), StringError>> for MergeSubscript
) -> Result<(), StringError> {
let key = CacheKey::new(&request.subscribe, &request.params);

let sink = request.sink.accept().await?;
let SubscriptionRequest {
subscribe,
params,
unsubscribe,
pending_sink,
} = request;

if let Some(current_value) = self
.current_values
.read()
let subscribe = match self
.get_upstream_subscription(key.clone(), subscribe, params.to_owned(), unsubscribe)
.await
.get(&key)
.map(|x| SubscriptionMessage::from_json(&x).ok())
.unwrap_or(None)
{
if let Err(e) = sink.send(current_value).await {
tracing::trace!("subscription sink closed {e:?}");
Ok(subscribe) => subscribe,
Err(err) => {
pending_sink.reject(errors::map_error(err)).await;
return Ok(());
}
}
};

// accept pending subscription
let sink = match pending_sink.accept().await {
Ok(sink) => sink,
Err(e) => {
tracing::trace!("Failed to accept pending subscription {e:?}");
return Ok(());
}
};

let subscribe = self
.get_upstream_subscription(
key,
request.subscribe.to_owned(),
request.params.to_owned(),
request.unsubscribe,
)
.await?;
let current_values = self.current_values.clone();

// broadcast new values
// send any current value and broadcast new values
tokio::spawn(async move {
// create receiver inside task to avoid msg been broadcast before stream.recv() is hit
// read lock before subscribing to make sure we don't miss any value
let read_lock = current_values.read().await;
let mut stream = subscribe();

// send current value if any
if let Some(current_value) = read_lock
.get(&key)
.map(|x| SubscriptionMessage::from_json(&x).ok())
.unwrap_or(None)
{
if let Err(e) = sink.send(current_value).await {
tracing::trace!("subscription sink closed {e:?}");
return;
}
}
drop(read_lock);

loop {
tokio::select! {
resp = stream.recv() => {
Expand Down
16 changes: 9 additions & 7 deletions src/middlewares/subscriptions/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,18 @@ impl Middleware<SubscriptionRequest, SubscriptionResult> for UpstreamMiddleware
_context: TypeRegistry,
_next: NextFn<SubscriptionRequest, SubscriptionResult>,
) -> SubscriptionResult {
let sink = request.sink;
let SubscriptionRequest {
subscribe,
params,
unsubscribe,
pending_sink,
} = request;

let result = self
.client
.subscribe(&request.subscribe, request.params, &request.unsubscribe)
.await;
let result = self.client.subscribe(&subscribe, params, &unsubscribe).await;

let (mut subscription, sink) = match result {
// subscription was successful, accept the sink
Ok(sub) => match sink.accept().await {
Ok(sub) => match pending_sink.accept().await {
Ok(sink) => (sub, sink),
Err(e) => {
tracing::trace!("Failed to accept pending subscription {:?}", e);
Expand All @@ -65,7 +67,7 @@ impl Middleware<SubscriptionRequest, SubscriptionResult> for UpstreamMiddleware
},
// subscription failed, reject the sink
Err(e) => {
sink.reject(errors::map_error(e)).await;
pending_sink.reject(errors::map_error(e)).await;
return Ok(());
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ pub async fn start_server(config: Config) -> anyhow::Result<SubwayServerHandle>
subscribe: subscribe_name.into(),
params,
unsubscribe: unsubscribe_name.into(),
sink,
pending_sink: sink,
},
result_tx,
timeout,
Expand Down

0 comments on commit a2d7e57

Please sign in to comment.