From cb0ef3dca08f3e9cc164ed904b732558a9148ebf Mon Sep 17 00:00:00 2001 From: esraa Date: Tue, 6 Aug 2024 18:35:25 +0300 Subject: [PATCH] Refactor `TranslatorSv2` --- roles/translator/Cargo.toml | 4 + roles/translator/src/lib/mod.rs | 265 +++++++++++++++++++++++ roles/translator/src/lib/proxy_config.rs | 67 ++++++ roles/translator/src/main.rs | 256 +--------------------- 4 files changed, 339 insertions(+), 253 deletions(-) diff --git a/roles/translator/Cargo.toml b/roles/translator/Cargo.toml index c7abf70481..a7c762a058 100644 --- a/roles/translator/Cargo.toml +++ b/roles/translator/Cargo.toml @@ -10,6 +10,10 @@ repository = "https://github.com/stratum-mining/stratum" name = "translator_sv2" path = "src/lib/mod.rs" +[[bin]] +name = "translator_sv2" +path = "src/main.rs" + [dependencies] stratum-common = { version = "1.0.0", path = "../../common" } async-channel = "1.5.1" diff --git a/roles/translator/src/lib/mod.rs b/roles/translator/src/lib/mod.rs index 2075e4569d..98dbc829e6 100644 --- a/roles/translator/src/lib/mod.rs +++ b/roles/translator/src/lib/mod.rs @@ -1,3 +1,26 @@ +use async_channel::{bounded, unbounded}; +use futures::FutureExt; +use rand::Rng; +pub use roles_logic_sv2::utils::Mutex; +use status::Status; +use std::{ + net::{IpAddr, SocketAddr}, + str::FromStr, + sync::Arc, + time::Duration, +}; + +use tokio::{ + sync::broadcast, + task::{self, AbortHandle}, +}; +use tracing::{debug, error, info, warn}; +pub use v1::server_to_client; + +use proxy_config::ProxyConfig; + +use crate::status::State; + pub mod downstream_sv1; pub mod error; pub mod proxy; @@ -5,3 +28,245 @@ pub mod proxy_config; pub mod status; pub mod upstream_sv2; pub mod utils; + +#[derive(Clone, Debug)] +pub struct TranslatorSv2 { + config: ProxyConfig, +} + +impl TranslatorSv2 { + pub fn new(config: ProxyConfig) -> Self { + Self { config } + } + + pub async fn start(self) { + let (tx_status, rx_status) = unbounded(); + + let target = Arc::new(Mutex::new(vec![0; 32])); + + // Sender/Receiver to send SV1 `mining.notify` message from the `Bridge` to the `Downstream` + let (tx_sv1_notify, _rx_sv1_notify): ( + broadcast::Sender, + broadcast::Receiver, + ) = broadcast::channel(10); + + let task_collector: Arc>> = + Arc::new(Mutex::new(Vec::new())); + + self.internal_start( + tx_sv1_notify.clone(), + target.clone(), + tx_status.clone(), + task_collector.clone(), + ) + .await; + + debug!("Starting up signal listener"); + let task_collector_ = task_collector.clone(); + + debug!("Starting up status listener"); + // Check all tasks if is_finished() is true, if so exit + loop { + let task_status = tokio::select! { + task_status = rx_status.recv().fuse() => task_status, + }; + let task_status: Status = task_status.unwrap(); + + match task_status.state { + // Should only be sent by the downstream listener + State::DownstreamShutdown(err) => { + error!("SHUTDOWN from: {}", err); + break; + } + State::BridgeShutdown(err) => { + error!("SHUTDOWN from: {}", err); + break; + } + State::UpstreamShutdown(err) => { + error!("SHUTDOWN from: {}", err); + break; + } + State::UpstreamTryReconnect(err) => { + error!("SHUTDOWN from: {}", err); + + // wait a random amount of time between 0 and 3000ms + // if all the downstreams try to reconnect at the same time, the upstream may fail + let mut rng = rand::thread_rng(); + let wait_time = rng.gen_range(0..=3000); + tokio::time::sleep(Duration::from_millis(wait_time)).await; + + // kill al the tasks + let task_collector_aborting = task_collector_.clone(); + kill_tasks(task_collector_aborting.clone()); + + warn!("Trying reconnecting to upstream"); + self.internal_start( + tx_sv1_notify.clone(), + target.clone(), + tx_status.clone(), + task_collector_.clone(), + ) + .await; + } + State::Healthy(msg) => { + info!("HEALTHY message: {}", msg); + } + } + } + } + + async fn internal_start( + &self, + tx_sv1_notify: broadcast::Sender>, + target: Arc>>, + tx_status: async_channel::Sender>, + task_collector: Arc>>, + ) { + let proxy_config = self.config.clone(); + // Sender/Receiver to send a SV2 `SubmitSharesExtended` from the `Bridge` to the `Upstream` + // (Sender>, Receiver>) + let (tx_sv2_submit_shares_ext, rx_sv2_submit_shares_ext) = bounded(10); + + // `tx_sv1_bridge` sender is used by `Downstream` to send a `DownstreamMessages` message to + // `Bridge` via the `rx_sv1_downstream` receiver + // (Sender, Receiver) + let (tx_sv1_bridge, rx_sv1_downstream) = unbounded(); + + // Sender/Receiver to send a SV2 `NewExtendedMiningJob` message from the `Upstream` to the + // `Bridge` + // (Sender>, Receiver>) + let (tx_sv2_new_ext_mining_job, rx_sv2_new_ext_mining_job) = bounded(10); + + // Sender/Receiver to send a new extranonce from the `Upstream` to this `main` function to be + // passed to the `Downstream` upon a Downstream role connection + // (Sender, Receiver) + let (tx_sv2_extranonce, rx_sv2_extranonce) = bounded(1); + + // Sender/Receiver to send a SV2 `SetNewPrevHash` message from the `Upstream` to the `Bridge` + // (Sender>, Receiver>) + let (tx_sv2_set_new_prev_hash, rx_sv2_set_new_prev_hash) = bounded(10); + + // Format `Upstream` connection address + let upstream_addr = SocketAddr::new( + IpAddr::from_str(&proxy_config.upstream_address) + .expect("Failed to parse upstream address!"), + proxy_config.upstream_port, + ); + + let diff_config = Arc::new(Mutex::new(proxy_config.upstream_difficulty_config.clone())); + let task_collector_upstream = task_collector.clone(); + // Instantiate a new `Upstream` (SV2 Pool) + let upstream = match upstream_sv2::Upstream::new( + upstream_addr, + proxy_config.upstream_authority_pubkey, + rx_sv2_submit_shares_ext, + tx_sv2_set_new_prev_hash, + tx_sv2_new_ext_mining_job, + proxy_config.min_extranonce2_size, + tx_sv2_extranonce, + status::Sender::Upstream(tx_status.clone()), + target.clone(), + diff_config.clone(), + task_collector_upstream, + ) + .await + { + Ok(upstream) => upstream, + Err(e) => { + error!("Failed to create upstream: {}", e); + return; + } + }; + let task_collector_init_task = task_collector.clone(); + // Spawn a task to do all of this init work so that the main thread + // can listen for signals and failures on the status channel. This + // allows for the tproxy to fail gracefully if any of these init tasks + //fail + let task = task::spawn(async move { + // Connect to the SV2 Upstream role + match upstream_sv2::Upstream::connect( + upstream.clone(), + proxy_config.min_supported_version, + proxy_config.max_supported_version, + ) + .await + { + Ok(_) => info!("Connected to Upstream!"), + Err(e) => { + error!("Failed to connect to Upstream EXITING! : {}", e); + return; + } + } + + // Start receiving messages from the SV2 Upstream role + if let Err(e) = upstream_sv2::Upstream::parse_incoming(upstream.clone()) { + error!("failed to create sv2 parser: {}", e); + return; + } + + debug!("Finished starting upstream listener"); + // Start task handler to receive submits from the SV1 Downstream role once it connects + if let Err(e) = upstream_sv2::Upstream::handle_submit(upstream.clone()) { + error!("Failed to create submit handler: {}", e); + return; + } + + // Receive the extranonce information from the Upstream role to send to the Downstream role + // once it connects also used to initialize the bridge + let (extended_extranonce, up_id) = rx_sv2_extranonce.recv().await.unwrap(); + loop { + let target: [u8; 32] = target.safe_lock(|t| t.clone()).unwrap().try_into().unwrap(); + if target != [0; 32] { + break; + }; + async_std::task::sleep(std::time::Duration::from_millis(100)).await; + } + + let task_collector_bridge = task_collector_init_task.clone(); + // Instantiate a new `Bridge` and begins handling incoming messages + let b = proxy::Bridge::new( + rx_sv1_downstream, + tx_sv2_submit_shares_ext, + rx_sv2_set_new_prev_hash, + rx_sv2_new_ext_mining_job, + tx_sv1_notify.clone(), + status::Sender::Bridge(tx_status.clone()), + extended_extranonce, + target, + up_id, + task_collector_bridge, + ); + proxy::Bridge::start(b.clone()); + + // Format `Downstream` connection address + let downstream_addr = SocketAddr::new( + IpAddr::from_str(&proxy_config.downstream_address).unwrap(), + proxy_config.downstream_port, + ); + + let task_collector_downstream = task_collector_init_task.clone(); + // Accept connections from one or more SV1 Downstream roles (SV1 Mining Devices) + downstream_sv1::Downstream::accept_connections( + downstream_addr, + tx_sv1_bridge, + tx_sv1_notify, + status::Sender::DownstreamListener(tx_status.clone()), + b, + proxy_config.downstream_difficulty_config, + diff_config, + task_collector_downstream, + ); + }); // End of init task + let _ = + task_collector.safe_lock(|t| t.push((task.abort_handle(), "init task".to_string()))); + } +} + +fn kill_tasks(task_collector: Arc>>) { + let _ = task_collector.safe_lock(|t| { + while let Some(handle) = t.pop() { + handle.0.abort(); + warn!("Killed task: {:?}", handle.1); + } + }); +} diff --git a/roles/translator/src/lib/proxy_config.rs b/roles/translator/src/lib/proxy_config.rs index d0a8357261..17f4d977d0 100644 --- a/roles/translator/src/lib/proxy_config.rs +++ b/roles/translator/src/lib/proxy_config.rs @@ -15,6 +15,42 @@ pub struct ProxyConfig { pub upstream_difficulty_config: UpstreamDifficultyConfig, } +pub struct TranslatorProxyUpstream { + address: String, + port: u16, + authority_pubkey: Secp256k1PublicKey, + difficulty_config: UpstreamDifficultyConfig, +} + +pub struct TranslatorProxyDownstream { + address: String, + port: u16, + difficulty_config: DownstreamDifficultyConfig, +} + +impl ProxyConfig { + pub fn new( + upstream: TranslatorProxyUpstream, + downstream: TranslatorProxyDownstream, + max_supported_version: u16, + min_supported_version: u16, + min_extranonce2_size: u16, + ) -> Self { + Self { + upstream_address: upstream.address, + upstream_port: upstream.port, + upstream_authority_pubkey: upstream.authority_pubkey, + downstream_address: downstream.address, + downstream_port: downstream.port, + max_supported_version, + min_supported_version, + min_extranonce2_size, + downstream_difficulty_config: downstream.difficulty_config, + upstream_difficulty_config: upstream.difficulty_config, + } + } +} + #[derive(Debug, Deserialize, Clone)] pub struct DownstreamDifficultyConfig { pub min_individual_miner_hashrate: f32, @@ -25,6 +61,21 @@ pub struct DownstreamDifficultyConfig { pub timestamp_of_last_update: u64, } +impl DownstreamDifficultyConfig { + pub fn new( + min_individual_miner_hashrate: f32, + shares_per_minute: f32, + submits_since_last_update: u32, + timestamp_of_last_update: u64, + ) -> Self { + Self { + min_individual_miner_hashrate, + shares_per_minute, + submits_since_last_update, + timestamp_of_last_update, + } + } +} impl PartialEq for DownstreamDifficultyConfig { fn eq(&self, other: &Self) -> bool { other.min_individual_miner_hashrate.round() as u32 @@ -41,3 +92,19 @@ pub struct UpstreamDifficultyConfig { #[serde(default = "bool::default")] pub should_aggregate: bool, } + +impl UpstreamDifficultyConfig { + pub fn new( + channel_diff_update_interval: u32, + channel_nominal_hashrate: f32, + timestamp_of_last_update: u64, + should_aggregate: bool, + ) -> Self { + Self { + channel_diff_update_interval, + channel_nominal_hashrate, + timestamp_of_last_update, + should_aggregate, + } + } +} diff --git a/roles/translator/src/main.rs b/roles/translator/src/main.rs index fc8e91de94..2e969716bc 100644 --- a/roles/translator/src/main.rs +++ b/roles/translator/src/main.rs @@ -3,25 +3,14 @@ mod args; mod lib; use args::Args; -use async_channel::{bounded, unbounded}; use error::{Error, ProxyResult}; -use futures::{select, FutureExt}; use lib::{downstream_sv1, error, proxy, proxy_config, status, upstream_sv2}; use proxy_config::ProxyConfig; -use rand::Rng; -use roles_logic_sv2::utils::Mutex; -use std::{ - net::{IpAddr, SocketAddr}, - str::FromStr, - sync::Arc, -}; use ext_config::{Config, File, FileFormat}; -use tokio::{sync::broadcast, task, task::AbortHandle, time::Duration}; -use v1::server_to_client; -use crate::status::{State, Status}; -use tracing::{debug, error, info, warn}; +use tracing::{error, info}; + /// Process CLI args, if any. #[allow(clippy::result_large_err)] fn process_cli_args<'a>() -> ProxyResult<'a, ProxyConfig> { @@ -56,244 +45,5 @@ async fn main() { }; info!("Proxy Config: {:?}", &proxy_config); - let (tx_status, rx_status) = unbounded(); - - let target = Arc::new(Mutex::new(vec![0; 32])); - - // Sender/Receiver to send SV1 `mining.notify` message from the `Bridge` to the `Downstream` - let (tx_sv1_notify, _rx_sv1_notify): ( - broadcast::Sender, - broadcast::Receiver, - ) = broadcast::channel(10); - - let task_collector: Arc>> = Arc::new(Mutex::new(Vec::new())); - - start( - tx_sv1_notify.clone(), - target.clone(), - tx_status.clone(), - task_collector.clone(), - proxy_config.clone(), - ) - .await; - - debug!("Starting up signal listener"); - let task_collector_ = task_collector.clone(); - - let mut interrupt_signal_future = Box::pin(tokio::signal::ctrl_c().fuse()); - debug!("Starting up status listener"); - // Check all tasks if is_finished() is true, if so exit - loop { - let task_status = select! { - task_status = rx_status.recv().fuse() => task_status, - interrupt_signal = interrupt_signal_future => { - match interrupt_signal { - Ok(()) => { - info!("Interrupt received"); - }, - Err(err) => { - error!("Unable to listen for interrupt signal: {}", err); - // we also shut down in case of error - }, - } - break; - } - }; - let task_status: Status = task_status.unwrap(); - - match task_status.state { - // Should only be sent by the downstream listener - State::DownstreamShutdown(err) => { - error!("SHUTDOWN from: {}", err); - break; - } - State::BridgeShutdown(err) => { - error!("SHUTDOWN from: {}", err); - break; - } - State::UpstreamShutdown(err) => { - error!("SHUTDOWN from: {}", err); - break; - } - State::UpstreamTryReconnect(err) => { - error!("SHUTDOWN from: {}", err); - - // wait a random amount of time between 0 and 3000ms - // if all the downstreams try to reconnect at the same time, the upstream may fail - let mut rng = rand::thread_rng(); - let wait_time = rng.gen_range(0..=3000); - tokio::time::sleep(Duration::from_millis(wait_time)).await; - - // kill al the tasks - let task_collector_aborting = task_collector_.clone(); - kill_tasks(task_collector_aborting.clone()); - - warn!("Trying reconnecting to upstream"); - start( - tx_sv1_notify.clone(), - target.clone(), - tx_status.clone(), - task_collector_.clone(), - proxy_config.clone(), - ) - .await; - } - State::Healthy(msg) => { - info!("HEALTHY message: {}", msg); - } - } - } -} - -fn kill_tasks(task_collector: Arc>>) { - let _ = task_collector.safe_lock(|t| { - while let Some(handle) = t.pop() { - handle.0.abort(); - warn!("Killed task: {:?}", handle.1); - } - }); -} - -async fn start<'a>( - tx_sv1_notify: broadcast::Sender>, - target: Arc>>, - tx_status: async_channel::Sender>, - task_collector: Arc>>, - proxy_config: ProxyConfig, -) { - // Sender/Receiver to send a SV2 `SubmitSharesExtended` from the `Bridge` to the `Upstream` - // (Sender>, Receiver>) - let (tx_sv2_submit_shares_ext, rx_sv2_submit_shares_ext) = bounded(10); - - // `tx_sv1_bridge` sender is used by `Downstream` to send a `DownstreamMessages` message to - // `Bridge` via the `rx_sv1_downstream` receiver - // (Sender, Receiver) - let (tx_sv1_bridge, rx_sv1_downstream) = unbounded(); - - // Sender/Receiver to send a SV2 `NewExtendedMiningJob` message from the `Upstream` to the - // `Bridge` - // (Sender>, Receiver>) - let (tx_sv2_new_ext_mining_job, rx_sv2_new_ext_mining_job) = bounded(10); - - // Sender/Receiver to send a new extranonce from the `Upstream` to this `main` function to be - // passed to the `Downstream` upon a Downstream role connection - // (Sender, Receiver) - let (tx_sv2_extranonce, rx_sv2_extranonce) = bounded(1); - - // Sender/Receiver to send a SV2 `SetNewPrevHash` message from the `Upstream` to the `Bridge` - // (Sender>, Receiver>) - let (tx_sv2_set_new_prev_hash, rx_sv2_set_new_prev_hash) = bounded(10); - - // Format `Upstream` connection address - let upstream_addr = SocketAddr::new( - IpAddr::from_str(&proxy_config.upstream_address) - .expect("Failed to parse upstream address!"), - proxy_config.upstream_port, - ); - - let diff_config = Arc::new(Mutex::new(proxy_config.upstream_difficulty_config.clone())); - let task_collector_upstream = task_collector.clone(); - // Instantiate a new `Upstream` (SV2 Pool) - let upstream = match upstream_sv2::Upstream::new( - upstream_addr, - proxy_config.upstream_authority_pubkey, - rx_sv2_submit_shares_ext, - tx_sv2_set_new_prev_hash, - tx_sv2_new_ext_mining_job, - proxy_config.min_extranonce2_size, - tx_sv2_extranonce, - status::Sender::Upstream(tx_status.clone()), - target.clone(), - diff_config.clone(), - task_collector_upstream, - ) - .await - { - Ok(upstream) => upstream, - Err(e) => { - error!("Failed to create upstream: {}", e); - return; - } - }; - let task_collector_init_task = task_collector.clone(); - // Spawn a task to do all of this init work so that the main thread - // can listen for signals and failures on the status channel. This - // allows for the tproxy to fail gracefully if any of these init tasks - //fail - let task = task::spawn(async move { - // Connect to the SV2 Upstream role - match upstream_sv2::Upstream::connect( - upstream.clone(), - proxy_config.min_supported_version, - proxy_config.max_supported_version, - ) - .await - { - Ok(_) => info!("Connected to Upstream!"), - Err(e) => { - error!("Failed to connect to Upstream EXITING! : {}", e); - return; - } - } - - // Start receiving messages from the SV2 Upstream role - if let Err(e) = upstream_sv2::Upstream::parse_incoming(upstream.clone()) { - error!("failed to create sv2 parser: {}", e); - return; - } - - debug!("Finished starting upstream listener"); - // Start task handler to receive submits from the SV1 Downstream role once it connects - if let Err(e) = upstream_sv2::Upstream::handle_submit(upstream.clone()) { - error!("Failed to create submit handler: {}", e); - return; - } - - // Receive the extranonce information from the Upstream role to send to the Downstream role - // once it connects also used to initialize the bridge - let (extended_extranonce, up_id) = rx_sv2_extranonce.recv().await.unwrap(); - loop { - let target: [u8; 32] = target.safe_lock(|t| t.clone()).unwrap().try_into().unwrap(); - if target != [0; 32] { - break; - }; - async_std::task::sleep(std::time::Duration::from_millis(100)).await; - } - - let task_collector_bridge = task_collector_init_task.clone(); - // Instantiate a new `Bridge` and begins handling incoming messages - let b = proxy::Bridge::new( - rx_sv1_downstream, - tx_sv2_submit_shares_ext, - rx_sv2_set_new_prev_hash, - rx_sv2_new_ext_mining_job, - tx_sv1_notify.clone(), - status::Sender::Bridge(tx_status.clone()), - extended_extranonce, - target, - up_id, - task_collector_bridge, - ); - proxy::Bridge::start(b.clone()); - - // Format `Downstream` connection address - let downstream_addr = SocketAddr::new( - IpAddr::from_str(&proxy_config.downstream_address).unwrap(), - proxy_config.downstream_port, - ); - - let task_collector_downstream = task_collector_init_task.clone(); - // Accept connections from one or more SV1 Downstream roles (SV1 Mining Devices) - downstream_sv1::Downstream::accept_connections( - downstream_addr, - tx_sv1_bridge, - tx_sv1_notify, - status::Sender::DownstreamListener(tx_status.clone()), - b, - proxy_config.downstream_difficulty_config, - diff_config, - task_collector_downstream, - ); - }); // End of init task - let _ = task_collector.safe_lock(|t| t.push((task.abort_handle(), "init task".to_string()))); + lib::TranslatorSv2::new(proxy_config).start().await; }