Skip to content

Commit

Permalink
Merge #923
Browse files Browse the repository at this point in the history
923: Handle expired orders in the app r=klochowicz a=klochowicz

- prune expired orders in the app when maker or coordinator is offline
- add a test that ensures we detect lack of orders

TODO:
- [x] Disable open/close buttons when there's no 'best price' for a given direction
- [x] disable 'close position' button if there's no price in that direction

![image](https://github.com/get10101/10101/assets/8319440/5f49f547-1b3a-479c-925f-caf37c408025)


Co-authored-by: Mariusz Klochowicz <[email protected]>
  • Loading branch information
bors[bot] and klochowicz authored Jul 12, 2023
2 parents a046cc9 + a2f71b2 commit 71c8237
Show file tree
Hide file tree
Showing 17 changed files with 267 additions and 168 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ jobs:
- name: Generate FFI bindings
run: just gen
- name: Running cargo tests
run: cargo test
run: RUST_BACKTRACE=1 cargo test
- name: Running flutter tests
run: just flutter-test

Expand All @@ -117,7 +117,7 @@ jobs:
run: |
curl -d '{"address":"bcrt1qylgu6ffkp3p0m8tw8kp4tt2dmdh755f4r5dq7s", "amount":"0.1"}' -H "Content-Type: application/json" -X POST http://localhost:3000/faucet
- name: Run slow tests
run: cargo test -p ln-dlc-node -- --ignored --nocapture --test-threads=1
run: RUST_BACKTRACE=1 cargo test -p ln-dlc-node -- --ignored --nocapture --test-threads=1

e2e-tests:
runs-on: ubuntu-latest
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions coordinator/src/orderbook/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod db;
pub mod routes;
pub mod trading;
pub mod websocket;

#[cfg(test)]
mod tests;
139 changes: 2 additions & 137 deletions coordinator/src/orderbook/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ use crate::orderbook::db;
use crate::orderbook::db::orders;
use crate::orderbook::trading::match_order;
use crate::orderbook::trading::notify_traders;
use crate::orderbook::websocket::websocket_connection;
use crate::routes::AppState;
use crate::AppError;
use anyhow::Context;
use anyhow::Result;
use autometrics::autometrics;
use axum::extract::ws::Message;
use axum::extract::ws::WebSocket;
use axum::extract::ws::WebSocketUpgrade;
use axum::extract::Path;
use axum::extract::Query;
Expand All @@ -20,16 +19,11 @@ use bitcoin::secp256k1::PublicKey;
use diesel::r2d2::ConnectionManager;
use diesel::r2d2::PooledConnection;
use diesel::PgConnection;
use futures::SinkExt;
use futures::StreamExt;
use orderbook_commons::create_sign_message;
use orderbook_commons::FilledWith;
use orderbook_commons::NewOrder;
use orderbook_commons::Order;
use orderbook_commons::OrderType;
use orderbook_commons::OrderbookMsg;
use orderbook_commons::OrderbookRequest;
use orderbook_commons::Signature;
use rust_decimal::Decimal;
use serde::de;
use serde::Deserialize;
Expand All @@ -38,14 +32,10 @@ use serde::Serialize;
use std::fmt;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast::Sender;
use tokio::sync::mpsc;
use tracing::instrument;
use uuid::Uuid;

const WEBSOCKET_SEND_TIMEOUT: Duration = Duration::from_secs(5);

#[derive(Debug, Deserialize)]
pub struct AllOrdersParams {
#[serde(default, deserialize_with = "empty_string_as_none")]
Expand Down Expand Up @@ -231,130 +221,5 @@ pub async fn websocket_handler(
ws: WebSocketUpgrade,
State(state): State<Arc<AppState>>,
) -> impl IntoResponse {
ws.on_upgrade(|socket| websocket(socket, state))
}

// This function deals with a single websocket connection, i.e., a single
// connected client / user, for which we will spawn two independent tasks (for
// receiving / sending messages).
async fn websocket(stream: WebSocket, state: Arc<AppState>) {
// By splitting, we can send and receive at the same time.
let (mut sender, mut receiver) = stream.split();

// We subscribe *before* sending the "joined" message, so that we will also
// display it to our client.
let mut rx = state.tx_pricefeed.subscribe();

let mut conn = match state.pool.clone().get() {
Ok(conn) => conn,
Err(err) => {
tracing::error!("Could not get connection to db pool {err:#}");
return;
}
};

let orders = match orderbook::db::orders::all(&mut conn, false) {
Ok(orders) => orders,
Err(error) => {
tracing::error!("Could not load all orders from db {error:#}");
return;
}
};

// Now send the "all orders" to the new client.
if let Ok(msg) = serde_json::to_string(&OrderbookMsg::AllOrders(orders)) {
let _ = sender.send(Message::Text(msg)).await;
}

let (local_sender, mut local_receiver) = mpsc::channel::<OrderbookMsg>(100);

let mut local_recv_task = tokio::spawn(async move {
while let Some(local_msg) = local_receiver.recv().await {
match serde_json::to_string(&local_msg) {
Ok(msg) => {
if let Err(err) = tokio::time::timeout(
WEBSOCKET_SEND_TIMEOUT,
sender.send(Message::Text(msg.clone())),
)
.await
{
tracing::error!("Could not forward message {msg} : {err:#}");
return;
}
}
Err(error) => {
tracing::warn!("Could not deserialize message {error:#}");
}
}
}
});

// Spawn the first task that will receive broadcast messages and send
// messages over the websocket to our client.
let mut send_task = {
let local_sender = local_sender.clone();
tokio::spawn(async move {
while let Ok(st) = rx.recv().await {
if let Err(error) = local_sender.send(st).await {
tracing::error!("Could not send message {error:#}");
return;
}
}
})
};

// Spawn a task that takes messages from the websocket
let local_sender = local_sender.clone();
let mut recv_task = tokio::spawn(async move {
while let Some(Ok(Message::Text(text))) = receiver.next().await {
match serde_json::from_str(text.as_str()) {
Ok(OrderbookRequest::Authenticate(Signature { signature, pubkey })) => {
let msg = create_sign_message();
match signature.verify(&msg, &pubkey) {
Ok(_) => {
if let Err(e) = local_sender.send(OrderbookMsg::Authenticated).await {
tracing::error!("Could not respond to user {e:#}");
return;
}

let mut authenticated_users = state.authenticated_users.lock();
authenticated_users.insert(pubkey, local_sender.clone());
}
Err(err) => {
if let Err(er) = local_sender
.send(OrderbookMsg::InvalidAuthentication(format!(
"Could not authenticate {err:#}"
)))
.await
{
tracing::error!(
"Failed to notify user about invalid authentication: {er:#}"
);
return;
}
}
}
}
Err(err) => {
tracing::trace!("Could not derserialize msg: {text} {err:#}");
}
}
}
});

// If any one of the tasks run to completion, we abort the other.
tokio::select! {
_ = (&mut send_task) => {
recv_task.abort();
local_recv_task.abort()
},
_ = (&mut recv_task) => {
send_task.abort();
local_recv_task.abort()
},
_ = (&mut local_recv_task) => {
recv_task.abort();
send_task.abort();
},
};
ws.on_upgrade(|socket| websocket_connection(socket, state))
}
140 changes: 140 additions & 0 deletions coordinator/src/orderbook/websocket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
use crate::orderbook;
use crate::routes::AppState;
use axum::extract::ws::Message;
use axum::extract::ws::WebSocket;
use futures::SinkExt;
use futures::StreamExt;
use orderbook_commons::create_sign_message;
use orderbook_commons::OrderbookMsg;
use orderbook_commons::OrderbookRequest;
use orderbook_commons::Signature;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;

const WEBSOCKET_SEND_TIMEOUT: Duration = Duration::from_secs(5);

// This function deals with a single websocket connection, i.e., a single
// connected client / user, for which we will spawn two independent tasks (for
// receiving / sending messages).
pub async fn websocket_connection(stream: WebSocket, state: Arc<AppState>) {
// By splitting, we can send and receive at the same time.
let (mut sender, mut receiver) = stream.split();

// We subscribe *before* sending the "joined" message, so that we will also
// display it to our client.
let mut rx = state.tx_pricefeed.subscribe();

let mut conn = match state.pool.clone().get() {
Ok(conn) => conn,
Err(err) => {
tracing::error!("Could not get connection to db pool {err:#}");
return;
}
};

let orders = match orderbook::db::orders::all(&mut conn, false) {
Ok(orders) => orders,
Err(error) => {
tracing::error!("Could not load all orders from db {error:#}");
return;
}
};

// Now send the "all orders" to the new client.
if let Ok(msg) = serde_json::to_string(&OrderbookMsg::AllOrders(orders)) {
let _ = sender.send(Message::Text(msg)).await;
}

let (local_sender, mut local_receiver) = mpsc::channel::<OrderbookMsg>(100);

let mut local_recv_task = tokio::spawn(async move {
while let Some(local_msg) = local_receiver.recv().await {
match serde_json::to_string(&local_msg) {
Ok(msg) => {
if let Err(err) = tokio::time::timeout(
WEBSOCKET_SEND_TIMEOUT,
sender.send(Message::Text(msg.clone())),
)
.await
{
tracing::error!("Could not forward message {msg} : {err:#}");
return;
}
}
Err(error) => {
tracing::warn!("Could not deserialize message {error:#}");
}
}
}
});

// Spawn the first task that will receive broadcast messages and send
// messages over the websocket to our client.
let mut send_task = {
let local_sender = local_sender.clone();
tokio::spawn(async move {
while let Ok(st) = rx.recv().await {
if let Err(error) = local_sender.send(st).await {
tracing::error!("Could not send message {error:#}");
return;
}
}
})
};

// Spawn a task that takes messages from the websocket
let local_sender = local_sender.clone();
let mut recv_task = tokio::spawn(async move {
while let Some(Ok(Message::Text(text))) = receiver.next().await {
match serde_json::from_str(text.as_str()) {
Ok(OrderbookRequest::Authenticate(Signature { signature, pubkey })) => {
let msg = create_sign_message();
match signature.verify(&msg, &pubkey) {
Ok(_) => {
if let Err(e) = local_sender.send(OrderbookMsg::Authenticated).await {
tracing::error!("Could not respond to user {e:#}");
return;
}

let mut authenticated_users = state.authenticated_users.lock();
authenticated_users.insert(pubkey, local_sender.clone());
}
Err(err) => {
if let Err(er) = local_sender
.send(OrderbookMsg::InvalidAuthentication(format!(
"Could not authenticate {err:#}"
)))
.await
{
tracing::error!(
"Failed to notify user about invalid authentication: {er:#}"
);
return;
}
}
}
}
Err(err) => {
tracing::trace!("Could not derserialize msg: {text} {err:#}");
}
}
}
});

// If any one of the tasks run to completion, we abort the other.
tokio::select! {
_ = (&mut send_task) => {
recv_task.abort();
local_recv_task.abort()
},
_ = (&mut recv_task) => {
send_task.abort();
local_recv_task.abort()
},
_ = (&mut local_recv_task) => {
recv_task.abort();
send_task.abort();
},
};
}
1 change: 1 addition & 0 deletions crates/tests-e2e/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ coordinator-commons = { path = "../coordinator-commons" }
flutter_rust_bridge = "1.78.0"
ln-dlc-node = { path = "../ln-dlc-node" }
native = { path = "../../mobile/native" }
orderbook-commons = { path = "../orderbook-commons" }
quote = "1.0.28"
reqwest = { version = "0.11", default-features = false, features = ["json"] }
serde = { version = "1.0.152", features = ["serde_derive"] }
Expand Down
2 changes: 2 additions & 0 deletions crates/tests-e2e/src/fund.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub async fn fund_app_with_faucet(client: &Client, funding_amount: u64) -> Resul
}

async fn pay_with_faucet(client: &Client, invoice: String) -> Result<()> {
tracing::info!("Paying invoice with faucet: {}", invoice);

#[derive(serde::Serialize)]
struct PayInvoice {
payment_request: String,
Expand Down
Loading

0 comments on commit 71c8237

Please sign in to comment.