Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: merge service crate #506

Merged
merged 3 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 82 additions & 106 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ members = [
"vault",
"bitcoin",
"faucet",
"service",
"runner"
]

Expand Down
2 changes: 1 addition & 1 deletion faucet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ async-trait = "0.1.40"
futures = "0.3.5"
git-version = "0.3.4"
lazy_static = "1.4.0"
tracing = { version = "0.1", features = ["log"] }

reqwest = { version = "0.11.11", features = ["json"] }
url = "2.2.2"

# Workspace dependencies
runtime = { path = "../runtime" }
service = { path = "../service" }

[dev-dependencies]
serial_test = "0.9.0"
Expand Down
1 change: 1 addition & 0 deletions faucet/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod error;
mod http;
mod service;

use clap::Parser;
use error::Error;
Expand Down
48 changes: 48 additions & 0 deletions faucet/src/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use futures::{future::Either, Future, FutureExt};
pub use runtime::{ShutdownReceiver, ShutdownSender};

pub enum TerminationStatus<Res> {
Cancelled,
Completed(Res),
}

pub async fn on_shutdown(shutdown_tx: ShutdownSender, future2: impl Future) {
let mut shutdown_rx = shutdown_tx.subscribe();
let future1 = shutdown_rx.recv().fuse();

let _ = future1.await;
future2.await;
}

async fn run_cancelable<F, Res>(mut shutdown_rx: ShutdownReceiver, future2: F) -> TerminationStatus<Res>
where
F: Future<Output = Res>,
{
let future1 = shutdown_rx.recv().fuse();
let future2 = future2.fuse();

futures::pin_mut!(future1);
futures::pin_mut!(future2);

match futures::future::select(future1, future2).await {
Either::Left((_, _)) => TerminationStatus::Cancelled,
Either::Right((res, _)) => TerminationStatus::Completed(res),
}
}

pub async fn wait_or_shutdown<F, E>(shutdown_tx: ShutdownSender, future2: F) -> Result<(), E>
where
F: Future<Output = Result<(), E>>,
{
match run_cancelable(shutdown_tx.subscribe(), future2).await {
TerminationStatus::Cancelled => {
tracing::trace!("Received shutdown signal");
Ok(())
}
TerminationStatus::Completed(res) => {
tracing::trace!("Sending shutdown signal");
let _ = shutdown_tx.send(());
res
}
}
}
30 changes: 0 additions & 30 deletions service/Cargo.toml

This file was deleted.

38 changes: 0 additions & 38 deletions service/src/error.rs

This file was deleted.

7 changes: 5 additions & 2 deletions vault/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ uses-bitcoind = [] # run tests relying on bitcoind regtest node

[dependencies]
thiserror = "1.0"
backoff = { version = "0.3.0", features = ["tokio"] }
clap = { version = "4.0.17", features = ["derive"]}
tokio = { version = "1.0", features = ["full"] }
tokio-stream = { version = "0.1.9", features = ["sync"] }
tokio-metrics = { version = "0.1.0", default-features = false }
serde = "1.0.136"
hyper = { version = "0.14.27" }
hyper-tls = "0.5.0"
warp = "0.3.2"
serde = { version = "1.0.136", features = ["derive"] }
parity-scale-codec = "3.0.0"
hex = "0.4.2"
futures = "0.3.5"
Expand Down Expand Up @@ -45,7 +49,6 @@ jsonrpc-core-client = { version = "18.0.0", features = ["http", "tls"] }
# Workspace dependencies
bitcoin = { path = "../bitcoin", features = ["cli"] }
runtime = { path = "../runtime" }
service = { path = "../service" }
faucet-rpc = { package = "faucet", path = "../faucet" }

# Substrate dependencies
Expand Down
5 changes: 3 additions & 2 deletions service/src/cli.rs → vault/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::trace;
use clap::Parser;
use std::str::FromStr;

Expand Down Expand Up @@ -44,8 +45,8 @@ impl FromStr for LoggingFormat {
impl LoggingFormat {
pub fn init_subscriber(&self) {
match *self {
Self::Full => crate::trace::init_subscriber(),
Self::Json => crate::trace::init_json_subscriber(),
Self::Full => trace::init_subscriber(),
Self::Json => trace::init_json_subscriber(),
}
}
}
Expand Down
40 changes: 15 additions & 25 deletions service/src/lib.rs → vault/src/connection_manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
pub use crate::{
cli::{LoggingFormat, MonitoringConfig, RestartPolicy, ServiceConfig},
trace::init_subscriber,
Error,
};
use async_trait::async_trait;
use backoff::Error as BackoffError;
use bitcoin::{cli::BitcoinOpts as BitcoinConfig, BitcoinCoreApi, Error as BitcoinError};
use futures::{future::Either, Future, FutureExt};
use governor::{Quota, RateLimiter};
Expand All @@ -7,22 +13,14 @@ use runtime::{
cli::ConnectionOpts as ParachainConfig, CurrencyId, InterBtcParachain as BtcParachain, InterBtcSigner, PrettyPrint,
RuntimeCurrencyInfo, VaultId,
};
use std::{fmt, sync::Arc, time::Duration};

mod cli;
mod error;
mod trace;

pub use cli::{LoggingFormat, MonitoringConfig, RestartPolicy, ServiceConfig};
pub use error::Error;
pub use runtime::{ShutdownReceiver, ShutdownSender};
pub use trace::init_subscriber;
use std::{sync::Arc, time::Duration};
pub use warp;

pub type DynBitcoinCoreApi = Arc<dyn BitcoinCoreApi + Send + Sync>;

#[async_trait]
pub trait Service<Config, InnerError> {
pub trait Service<Config> {
const NAME: &'static str;
const VERSION: &'static str;

Expand All @@ -36,7 +34,7 @@ pub trait Service<Config, InnerError> {
constructor: Box<dyn Fn(VaultId) -> Result<DynBitcoinCoreApi, BitcoinError> + Send + Sync>,
keyname: String,
) -> Self;
async fn start(&self) -> Result<(), Error<InnerError>>;
async fn start(&self) -> Result<(), BackoffError<Error>>;
}

pub struct ConnectionManager<Config: Clone, F: Fn()> {
Expand Down Expand Up @@ -77,9 +75,7 @@ impl<Config: Clone + Send + 'static, F: Fn()> ConnectionManager<Config, F> {
}
}

pub async fn start<S: Service<Config, InnerError>, InnerError: fmt::Display>(
&self,
) -> Result<(), Error<InnerError>> {
pub async fn start<S: Service<Config>>(&self) -> Result<(), Error> {
loop {
tracing::info!("Version: {}", S::VERSION);
tracing::info!("AccountId: {}", self.signer.account_id.pretty_print());
Expand Down Expand Up @@ -138,18 +134,20 @@ impl<Config: Clone + Send + 'static, F: Fn()> ConnectionManager<Config, F> {
Box::new(constructor),
self.db_path.clone(),
);

match service.start().await {
Err(err @ Error::Abort(_)) => {
Err(err @ backoff::Error::Permanent(_)) => {
tracing::warn!("Disconnected: {}", err);
return Err(err);
return Err(err.into());
}
Err(err) => {
tracing::warn!("Disconnected: {}", err);
}
_ => {
tracing::warn!("Disconnected");
}
}
};

// propagate shutdown signal from main tasks
let _ = shutdown_tx.send(());

Expand Down Expand Up @@ -223,11 +221,3 @@ where
{
tokio::spawn(run_cancelable(shutdown_rx, future));
}

pub async fn on_shutdown(shutdown_tx: ShutdownSender, future2: impl Future) {
let mut shutdown_rx = shutdown_tx.subscribe();
let future1 = shutdown_rx.recv().fuse();

let _ = future1.await;
future2.await;
}
29 changes: 24 additions & 5 deletions vault/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::string::FromUtf8Error;

use bitcoin::Error as BitcoinError;
use jsonrpc_core_client::RpcError;
use parity_scale_codec::Error as CodecError;
use rocksdb::Error as RocksDbError;
use runtime::Error as RuntimeError;
use serde_json::Error as SerdeJsonError;
use std::{io::Error as IoError, num::ParseIntError, string::FromUtf8Error};
use thiserror::Error;
use tokio::task::JoinError as TokioJoinError;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;

#[derive(Error, Debug)]
Expand Down Expand Up @@ -44,10 +44,29 @@ pub enum Error {
FromUtf8Error(#[from] FromUtf8Error),
#[error("BroadcastStreamRecvError: {0}")]
BroadcastStreamRecvError(#[from] BroadcastStreamRecvError),
#[error("Client has shutdown")]
ClientShutdown,
#[error("OsString parsing error")]
OsStringError,
#[error("File already exists")]
FileAlreadyExists,
#[error("There is a services already running on the system, with pid {0}")]
ServiceAlreadyRunning(u32),
#[error("Process with pid {0} not found")]
ProcessNotFound(String),
#[error("ParseIntError: {0}")]
ParseIntError(#[from] ParseIntError),
#[error("TokioError: {0}")]
TokioError(#[from] TokioJoinError),
#[error("System I/O error: {0}")]
IoError(#[from] IoError),
}

impl From<Error> for service::Error<Error> {
fn from(err: Error) -> Self {
Self::Retry(err)
nakul1010 marked this conversation as resolved.
Show resolved Hide resolved
impl From<backoff::Error<Error>> for Error {
fn from(err: backoff::Error<Error>) -> Self {
match err {
backoff::Error::Permanent(err) => err,
backoff::Error::Transient(err) => err,
}
}
}
11 changes: 8 additions & 3 deletions vault/src/execution.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
use crate::{error::Error, metrics::update_bitcoin_metrics, system::VaultData, VaultIdManager, YIELD_RATE};
use crate::{
error::Error,
metrics::update_bitcoin_metrics,
service::{spawn_cancelable, DynBitcoinCoreApi, ShutdownSender},
system::VaultData,
VaultIdManager, YIELD_RATE,
};
use bitcoin::{
Error as BitcoinError, Hash, SatPerVbyte, Transaction, TransactionExt, TransactionMetadata, Txid,
BLOCK_INTERVAL as BITCOIN_BLOCK_INTERVAL,
Expand All @@ -11,7 +17,6 @@ use runtime::{
RedeemRequestStatus, ReplacePallet, ReplaceRequestStatus, SecurityPallet, UtilFuncs, VaultId, VaultRegistryPallet,
H256,
};
use service::{spawn_cancelable, DynBitcoinCoreApi, Error as ServiceError, ShutdownSender};
use std::{collections::HashMap, convert::TryInto, time::Duration};
use tokio::time::sleep;
use tokio_stream::wrappers::BroadcastStream;
Expand Down Expand Up @@ -475,7 +480,7 @@ pub async fn execute_open_requests(
num_confirmations: u32,
payment_margin: Duration,
auto_rbf: bool,
) -> Result<(), ServiceError<Error>> {
) -> Result<(), Error> {
let parachain_rpc = &parachain_rpc;
let vault_id = parachain_rpc.get_account_id().clone();

Expand Down
Loading