Skip to content

Commit

Permalink
End progress for the night, controller and p2p changes are in, need e…
Browse files Browse the repository at this point in the history
…ntrypoint and testing
  • Loading branch information
erikreppel authored and ligustah committed Apr 26, 2024
1 parent 3435ec1 commit 4de52f4
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 62 deletions.
2 changes: 2 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ mod test {
admin_api_secret: None,
rate_limit_rps: 1,
boot_nodes: BootNodes::Chain,
sync_lookback_hours: 0,
};

let names = config.premint_names();
Expand Down Expand Up @@ -259,6 +260,7 @@ mod test {
admin_api_secret: None,
rate_limit_rps: 1,
boot_nodes: BootNodes::None,
sync_lookback_hours: 0,
};

let names = config.premint_names();
Expand Down
35 changes: 8 additions & 27 deletions src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ pub struct Controller {
external_commands: mpsc::Receiver<ControllerCommands>,
store: PremintStorage,
rules: RulesEngine<PremintStorage>,
trusted_peers: Vec<PeerId>,
inclusion_mode: ChainInclusionMode,

config: Config,
}

Expand All @@ -101,8 +100,6 @@ impl Controller {
external_commands,
store,
rules,
trusted_peers: config.trusted_peers(),
inclusion_mode: config.chain_inclusion_mode,
config,
}
}
Expand Down Expand Up @@ -229,7 +226,7 @@ impl Controller {
tracing::debug!("Marked as seen onchain {:?}", claim.clone());
}

if self.inclusion_mode == ChainInclusionMode::Check {
if self.config.chain_inclusion_mode == ChainInclusionMode::Check {
if let Err(err) = self
.swarm_command_sender
.send(SwarmCommand::SendOnchainMintFound(claim))
Expand Down Expand Up @@ -263,7 +260,7 @@ impl Controller {
}

async fn handle_event_onchain_claim(&self, peer_claim: PeerInclusionClaim) -> eyre::Result<()> {
match self.inclusion_mode {
match self.config.chain_inclusion_mode {
ChainInclusionMode::Check | ChainInclusionMode::Verify => {
let claim = peer_claim.claim;
let premint = self
Expand All @@ -286,7 +283,11 @@ impl Controller {
}
}
ChainInclusionMode::Trust => {
if self.trusted_peers.contains(&peer_claim.from_peer_id) {
if self
.config
.trusted_peers()
.contains(&peer_claim.from_peer_id)
{
self.store
.mark_seen_on_chain(peer_claim.claim.clone())
.await?;
Expand All @@ -304,26 +305,6 @@ impl Controller {
}
}
}

/// temporary solution for full state from a known other node via their http api.
/// We should migrate to syncing based on peer_id & libp2p request_response
async fn api_sync(&self, api_url: String) -> eyre::Result<()> {
let seconds = self.config.sync_lookback_hours * 60 * 60;
let from_time = SystemTime::now().sub(Duration::from_secs(seconds));

let from_time = chrono::Utc::now().sub(chrono::Duration::hours(
self.config.sync_lookback_hours as i64,
));

let url = reqwest::Url::parse_with_params(
api_url.as_str(),
&[("from", serde_json::to_string(&from_time)?)],
)?;

// reqwest::get()

Ok(())
}
}

#[derive(Clone)]
Expand Down
99 changes: 76 additions & 23 deletions src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,24 @@ use libp2p::kad::store::MemoryStore;
use libp2p::kad::GetProvidersOk::FoundProviders;
use libp2p::kad::{Addresses, QueryResult, RecordKey};
use libp2p::multiaddr::Protocol;
use libp2p::request_response::{Message, ProtocolSupport};
use libp2p::request_response::{InboundRequestId, Message, ProtocolSupport, ResponseChannel};
use libp2p::swarm::{ConnectionId, NetworkBehaviour, NetworkInfo, SwarmEvent};
use libp2p::{
gossipsub, kad, noise, request_response, tcp, yamux, Multiaddr, PeerId, StreamProtocol,
};
use serde::{Deserialize, Serialize};
use sha256::digest;
use std::hash::Hasher;
use std::time::Duration;
use tokio::select;
use tower::load::pending_requests;
use tower_http::request_id;

#[derive(NetworkBehaviour)]
pub struct MintpoolBehaviour {
gossipsub: gossipsub::Behaviour,
kad: kad::Behaviour<MemoryStore>,
identify: libp2p::identify::Behaviour,
ping: libp2p::ping::Behaviour,
request_response: request_response::cbor::Behaviour<QueryOptions, Vec<PremintTypes>>,
request_response: request_response::cbor::Behaviour<QueryOptions, SyncResponse>,
}

pub struct SwarmController {
Expand Down Expand Up @@ -301,7 +300,12 @@ impl SwarmController {
}

SwarmEvent::Behaviour(MintpoolBehaviourEvent::RequestResponse(event)) => {
self.handle_request_response_event(event).await
match self.handle_request_response_event(event).await {
Ok(_) => {}
Err(err) => {
tracing::error!("Error handling request response event: {:?}", err);
}
}
}

SwarmEvent::Dialing { peer_id, .. } => {
Expand Down Expand Up @@ -570,7 +574,7 @@ impl SwarmController {

async fn handle_request_response_event(
&mut self,
event: request_response::Event<QueryOptions, Vec<PremintTypes>>,
event: request_response::Event<QueryOptions, SyncResponse>,
) -> eyre::Result<()> {
match event {
request_response::Event::Message { peer, message } => match message {
Expand All @@ -579,23 +583,12 @@ impl SwarmController {
request,
channel,
} => {
tracing::info!(
request_id = request_id.to_string(),
"processing request for sync"
);
let (snd, recv) = tokio::sync::oneshot::channel();
self.event_sender
.send(P2PEvent::SyncRequest {
query: request,
channel: snd,
})
.await?;
let result = recv.await??;
let resp = self.make_sync_response(request_id, request).await;
self.swarm
.behaviour_mut()
.request_response
.send_response(channel, result)
.map_err(|e| eyre::eyre!("Error sending response: {:?}", e))?
.send_response(channel, resp)
.map_err(|e| eyre::eyre!("Error sending response: {:?}", e))?;
}
Message::Response {
request_id,
Expand All @@ -605,15 +598,69 @@ impl SwarmController {
request_id = request_id.to_string(),
"received response for sync"
);
self.event_sender
.send(P2PEvent::SyncResponse { premints: response })
.await?;
match response {
SyncResponse::Premints(premints) => {
self.event_sender
.send(P2PEvent::SyncResponse { premints })
.await?;
}
SyncResponse::Error(err) => {
tracing::error!(
request_id = request_id.to_string(),
error = err,
"error received to our sync request"
);
}
}
}
},
other => tracing::info!("mintpool sync request/response event: {:?}", other),
}
Ok(())
}

// Makes a Response for a request to sync from another node
async fn make_sync_response(
&mut self,
request_id: InboundRequestId,
request: QueryOptions,
) -> SyncResponse {
tracing::info!(
request_id = request_id.to_string(),
"processing request for sync"
);
match self.make_sync_response_query(request).await {
Ok(premints) => SyncResponse::Premints(premints),
Err(err) => {
tracing::error!(
request_id = request_id.to_string(),
error = err.to_string(),
"error processing sync request"
);
SyncResponse::Error(err.to_string())
}
}
}

// inner function to make propagating errors that occur during query easier to work with
async fn make_sync_response_query(
&mut self,
request: QueryOptions,
) -> eyre::Result<Vec<PremintTypes>> {
let (snd, recv) = tokio::sync::oneshot::channel();
self.event_sender
.send(P2PEvent::SyncRequest {
query: request,
channel: snd,
})
.await
.map_err(|_| eyre::eyre!("Controller error"))?;
let result = recv
.await
.map_err(|_| eyre::eyre!("Channel error"))?
.map_err(|_| eyre::eyre!("Query error"))?;
Ok(result)
}
}

fn gossipsub_message_id(message: &gossipsub::Message) -> gossipsub::MessageId {
Expand Down Expand Up @@ -643,6 +690,12 @@ pub struct NetworkState {
pub all_external_addresses: Vec<Multiaddr>,
}

#[derive(Debug, Serialize, Deserialize)]
pub enum SyncResponse {
Premints(Vec<PremintTypes>),
Error(String),
}

fn announce_topic() -> gossipsub::IdentTopic {
gossipsub::IdentTopic::new("mintpool::announce")
}
2 changes: 1 addition & 1 deletion tests/api_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ mod api_test {
async fn make_test_router(config: &Config) -> Router {
let mut rules = RulesEngine::new(config);
rules.add_default_rules();
let ctl = start_p2p_services(config, rules).await.unwrap();
let ctl = start_p2p_services(config.clone(), rules).await.unwrap();

let router = api::router_with_defaults(config);
let state = AppState::from(config, ctl.clone()).await;
Expand Down
2 changes: 1 addition & 1 deletion tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub mod mintpool_build {
let config = make_config(start_port + i, peer_limit);

let ctl = mintpool::run::start_p2p_services(
&config,
config.clone(),
RulesEngine::new_with_default_rules(&config),
)
.await
Expand Down
29 changes: 19 additions & 10 deletions tests/e2e_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async fn test_zora_premint_v2_e2e() {
// set this so CHAINS will use the anvil rpc rather than the one in chains.json
env::set_var("CHAIN_7777777_RPC_WSS", anvil.ws_endpoint());

let ctl = run::start_p2p_services(&config, RulesEngine::new_with_default_rules(&config))
let ctl = run::start_p2p_services(config.clone(), RulesEngine::new_with_default_rules(&config))
.await
.unwrap();
run::start_watch_chain::<ZoraPremintV2>(&config, ctl.clone()).await;
Expand Down Expand Up @@ -208,14 +208,20 @@ async fn test_verify_e2e() {
// Start 3 nodes, one in check, one in verify, one in trust (trusts node 1)
// ============================================================================================

let ctl1 = run::start_p2p_services(&config1, RulesEngine::new_with_default_rules(&config1))
.await
.unwrap();
let ctl1 = run::start_p2p_services(
config1.clone(),
RulesEngine::new_with_default_rules(&config1),
)
.await
.unwrap();
run::start_watch_chain::<ZoraPremintV2>(&config1, ctl1.clone()).await;

let ctl2 = run::start_p2p_services(&config2, RulesEngine::new_with_default_rules(&config2))
.await
.unwrap();
let ctl2 = run::start_p2p_services(
config2.clone(),
RulesEngine::new_with_default_rules(&config2),
)
.await
.unwrap();

let node_info = ctl1.get_node_info().await.unwrap();

Expand All @@ -225,9 +231,12 @@ async fn test_verify_e2e() {
config3.chain_inclusion_mode = ChainInclusionMode::Trust;
config3.trusted_peers = Some(node_info.peer_id.to_string());

let ctl3 = run::start_p2p_services(&config3, RulesEngine::new_with_default_rules(&config3))
.await
.unwrap();
let ctl3 = run::start_p2p_services(
config3.clone(),
RulesEngine::new_with_default_rules(&config3),
)
.await
.unwrap();

connect_all_to_first(vec![ctl1.clone(), ctl2.clone(), ctl3.clone()]).await;

Expand Down

0 comments on commit 4de52f4

Please sign in to comment.