Skip to content

Commit

Permalink
Merge branch 'proxologist' into health-endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
kostekIV authored Dec 14, 2023
2 parents caa1ced + c0ee78a commit 1d0715d
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 50 deletions.
43 changes: 0 additions & 43 deletions .github/workflows/docker.yml

This file was deleted.

4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ name: Test

on:
push:
branches: [ "master" ]
branches: [ "proxologist" ]
paths-ignore:
- '**/README.md'
pull_request:
branches: [ "master" ]
branches: [ "proxologist" ]

env:
CARGO_TERM_COLOR: always
Expand Down
5 changes: 2 additions & 3 deletions config.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
extensions:
client:
endpoints:
- wss://acala-rpc.dwellir.com
- wss://acala-rpc-0.aca-api.network
- ws://localhost:9944
event_bus:
substrate_api:
stale_timeout_seconds: 180 # rotate endpoint if no new blocks for 3 minutes
Expand All @@ -14,7 +13,7 @@ extensions:
merge_subscription:
keep_alive_seconds: 60
server:
port: 9944
port: 9934
listen_address: '0.0.0.0'
max_connections: 2000
http_methods:
Expand Down
4 changes: 4 additions & 0 deletions rpc_configs/substrate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ methods:
- name: accountId
ty: AccountId

- method: alephNode_ready
cache:
size: 0

subscriptions:
- subscribe: author_submitAndWatchExtrinsic
unsubscribe: author_unwatchExtrinsic
Expand Down
5 changes: 4 additions & 1 deletion src/extensions/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use super::{Extension, ExtensionRegistry};
use crate::extensions::rate_limit::{MethodWeights, RateLimitBuilder, XFF};

mod proxy_get_request;
mod ready_get_request;
use proxy_get_request::{ProxyGetRequestLayer, ProxyGetRequestMethod};
use ready_get_request::ReadyProxyLayer;

pub struct SubwayServerBuilder {
pub config: ServerConfig,
Expand Down Expand Up @@ -124,7 +126,8 @@ impl SubwayServerBuilder {
.collect(),
)
.expect("Invalid health config"),
);
)
.layer(ReadyProxyLayer);

let rpc_module = rpc_module.clone();
let stop_handle = stop_handle.clone();
Expand Down
2 changes: 1 addition & 1 deletion src/extensions/server/proxy_get_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ where
}
}

mod response {
pub mod response {
use jsonrpsee::types::{error::ErrorCode, ErrorObjectOwned, Id, Response, ResponsePayload};

const JSON: &str = "application/json; charset=utf-8";
Expand Down
145 changes: 145 additions & 0 deletions src/extensions/server/ready_get_request.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
use std::{
error::Error,
future::Future,
pin::Pin,
task::{Context, Poll},
};

use crate::extensions::server::proxy_get_request::response::internal_error;
use futures::TryFutureExt;
use hyper::{
header::{ACCEPT, CONTENT_TYPE},
http::HeaderValue,
Body, Method, Request, Response, Uri,
};
use jsonrpsee::types::{Id, RequestSer};
use tower::{Layer, Service};

type BoxedError = Box<dyn Error + Send + Sync + 'static>;
type PinnedFuture = Pin<Box<dyn Future<Output = Result<Response<Body>, BoxedError>> + Send + 'static>>;

/// Layer that catches requests to '/ready' endpoint redirects them to rpc call `alephNode_ready`
/// and translate the response to plain get response 200 or 503.
#[derive(Debug, Clone)]
pub struct ReadyProxyLayer;

impl<S> Layer<S> for ReadyProxyLayer {
type Service = ReadyRequest<S>;

fn layer(&self, inner: S) -> Self::Service {
ReadyRequest::new(inner)
}
}

fn ok_response() -> hyper::Response<hyper::Body> {
hyper::Response::builder()
.status(hyper::StatusCode::OK)
.body(Body::empty())
.expect("Unable to parse response body for type conversion")
}

fn bad_response() -> hyper::Response<hyper::Body> {
hyper::Response::builder()
.status(hyper::StatusCode::BAD_GATEWAY)
.body(Body::empty())
.expect("Unable to parse response body for type conversion")
}

#[derive(Debug, Clone)]
pub struct ReadyRequest<S> {
inner: S,
}

impl<S> ReadyRequest<S>
where
S: Service<Request<Body>, Response = Response<Body>>,
S::Response: 'static,
S::Error: Into<BoxedError> + 'static,
S::Future: Send + 'static,
{
#[inline]
fn poll_ready_inner(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxedError>> {
self.inner.poll_ready(cx).map_err(Into::into)
}

fn prepare_request(req: &mut Request<Body>) {
const RPC_METHOD_NAME: &str = "alephNode_ready";

*req.method_mut() = Method::POST;
// Precautionary remove the URI.
*req.uri_mut() = Uri::from_static("/");

// Requests must have the following headers:
req.headers_mut()
.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
req.headers_mut()
.insert(ACCEPT, HeaderValue::from_static("application/json"));

// Adjust the body to reflect the method call.
let body = Body::from(
serde_json::to_string(&RequestSer::borrowed(&Id::Number(0), &RPC_METHOD_NAME, None))
.expect("Valid request; qed"),
);
*req.body_mut() = body;
}

pub fn call_proxy(&mut self, mut req: Request<Body>) -> PinnedFuture {
const METHOD_NAME: &str = "/ready";

if req.uri().path() != METHOD_NAME {
return Box::pin(self.inner.call(req).map_err(Into::into));
}

Self::prepare_request(&mut req);

let fut = self.inner.call(req);

let res_fut = async move {
let res = fut.await.map_err(|err| err.into())?;

let body = res.into_body();
let bytes = hyper::body::to_bytes(body).await?;
#[derive(serde::Deserialize, Debug)]
struct RpcPayload {
result: bool,
}

let response = match serde_json::from_slice::<RpcPayload>(&bytes) {
Ok(RpcPayload { result }) if result => ok_response(),
Ok(_) => bad_response(),
_ => internal_error(),
};

Ok(response)
};

Box::pin(res_fut)
}
}

impl<S> ReadyRequest<S> {
pub fn new(inner: S) -> Self {
Self { inner }
}
}

impl<S> Service<Request<Body>> for ReadyRequest<S>
where
S: Service<Request<Body>, Response = Response<Body>>,
S::Response: 'static,
S::Error: Into<BoxedError> + 'static,
S::Future: Send + 'static,
{
type Response = Response<Body>;
type Error = BoxedError;
type Future = PinnedFuture;

#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_ready_inner(cx)
}

fn call(&mut self, req: Request<Body>) -> Self::Future {
self.call_proxy(req)
}
}

0 comments on commit 1d0715d

Please sign in to comment.