Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deal with expired position the right way #1246

Merged
merged 14 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

- Find match on an expired position
- Show loading screen when app starts with an expired position

## [1.2.6] - 2023-09-06

## [1.2.5] - 2023-09-06
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions coordinator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ prometheus = "0.13.3"
rand = "0.8.5"
serde = "1.0.147"
serde_json = "1"
thiserror = "1.0"
tokio-metrics = "0.2.2"
toml = "0.7.3"
tracing = "0.1.37"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- This file should undo anything in `up.sql`
ALTER TABLE
orders DROP COLUMN "order_state";

DROP TYPE "OrderState_Type";
12 changes: 12 additions & 0 deletions coordinator/migrations/2023-09-08-090152_add_order_state/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- Your SQL goes here
CREATE TYPE "OrderState_Type" AS ENUM (
'Open',
'Matched',
'Taken',
'Failed'
);

ALTER TABLE "orders" ADD COLUMN "order_state" "OrderState_Type" NOT NULL DEFAULT 'Open';

UPDATE orders SET order_state = 'Taken' WHERE taken = true;
holzeis marked this conversation as resolved.
Show resolved Hide resolved
UPDATE orders SET order_state = 'Failed' WHERE taken = false and order_type = 'market';
holzeis marked this conversation as resolved.
Show resolved Hide resolved
3 changes: 3 additions & 0 deletions coordinator/migrations/2023-09-08-090153_add_matches/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- This file should undo anything in `up.sql`
DROP TABLE "matches";
DROP TYPE "MatchState_Type";
21 changes: 21 additions & 0 deletions coordinator/migrations/2023-09-08-090153_add_matches/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- Your SQL goes here
CREATE TYPE "MatchState_Type" AS ENUM (
'Pending',
'Filled',
'Failed'
);

CREATE TABLE "matches" (
id UUID PRIMARY KEY,
match_state "MatchState_Type" NOT NULL,
order_id UUID REFERENCES orders (trader_order_id) NOT NULL,
trader_id TEXT NOT NULL,
-- The order id of the counter party to that match
match_order_id UUID REFERENCES orders (trader_order_id) NOT NULL,
-- The trader id of the counter party to that match
match_trader_id TEXT NOT NULL,
execution_price REAL NOT NULL,
quantity REAL NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- This file should undo anything in `up.sql`
ALTER TABLE orders
DROP COLUMN "contract_symbol",
DROP COLUMN "leverage";
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- Your SQL goes here
ALTER TABLE "orders"
ADD COLUMN "contract_symbol" "ContractSymbol_Type" NOT NULL DEFAULT 'BtcUsd',
ADD COLUMN "leverage" REAL NOT NULL DEFAULT 1.0;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- This file should undo anything in `up.sql`
ALTER TABLE orders
DROP COLUMN "order_reason";

DROP TYPE "OrderReason_Type";
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- Your SQL goes here
CREATE TYPE "OrderReason_Type" AS ENUM (
'Manual',
'Expired'
);

ALTER TABLE "orders"
ADD COLUMN "order_reason" "OrderReason_Type" NOT NULL DEFAULT 'Manual';
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- This file should undo anything in `up.sql`
ALTER TABLE "orders"
ADD COLUMN "taken" BOOLEAN NOT NULL DEFAULT FALSE;

UPDATE orders SET taken = true WHERE order_state = 'Taken';
3 changes: 3 additions & 0 deletions coordinator/migrations/2023-09-08-090156_remove_taken/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- This file should undo anything in `up.sql`
ALTER TABLE "orders"
DROP COLUMN "taken";
13 changes: 12 additions & 1 deletion coordinator/src/bin/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use coordinator::node::storage::NodeStorage;
use coordinator::node::unrealized_pnl;
use coordinator::node::Node;
use coordinator::notification_service::NotificationService;
use coordinator::orderbook::trading;
use coordinator::routes::router;
use coordinator::run_migration;
use coordinator::settings::Settings;
Expand All @@ -30,6 +31,7 @@ use std::net::Ipv4Addr;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::sync::watch;
use tokio::task::spawn_blocking;
use tracing::metadata::LevelFilter;
Expand Down Expand Up @@ -190,12 +192,19 @@ async fn main() -> Result<()> {
}
});

let (tx_price_feed, _rx) = broadcast::channel(100);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 Let's not do this if no one is actually receiving?!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the idea is that eventually someone will read the price feed from the coordinator? I would just leave that out until we actually need it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I noticed that before this PR we already had an apparently useless update_pricefeed function which would just log updates on Trace. Let's just remove that altogether imo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I am mistaken, but the tx_price_feed is received in the websocket.rs and is useful because with it I can broadcast the updated price to all connected users instead of having to send it to everyone individually.

// We subscribe *before* sending the "joined" message, so that we will also
// display it to our client.
let mut rx = state.tx_price_feed.subscribe();

cc: @bonomat

let (_handle, trading_sender) = trading::start(pool.clone(), tx_price_feed.clone());

tokio::spawn({
let node = node.clone();
let trading_sender = trading_sender.clone();
async move {
loop {
tokio::time::sleep(EXPIRED_POSITION_SYNC_INTERVAL).await;
expired_positions::close(node.clone()).await;
if let Err(e) = expired_positions::close(node.clone(), trading_sender.clone()).await
{
tracing::error!("Failed to close expired positions! Error: {e:#}");
}
}
}
});
Expand Down Expand Up @@ -224,6 +233,8 @@ async fn main() -> Result<()> {
exporter,
opts.p2p_announcement_addresses(),
NODE_ALIAS,
trading_sender,
tx_price_feed,
);

let notification_service = NotificationService::new(opts.fcm_api_key);
Expand Down
86 changes: 81 additions & 5 deletions coordinator/src/node.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use crate::db;
use crate::node::storage::NodeStorage;
use crate::orderbook::db::matches;
use crate::orderbook::db::orders;
use crate::position::models::leverage_long;
use crate::position::models::leverage_short;
use crate::position::models::NewPosition;
use crate::position::models::Position;
use crate::trade::models::NewTrade;
use anyhow::anyhow;
use anyhow::bail;
use anyhow::ensure;
use anyhow::Context;
Expand All @@ -14,6 +17,7 @@ use bitcoin::secp256k1::PublicKey;
use coordinator_commons::TradeParams;
use diesel::r2d2::ConnectionManager;
use diesel::r2d2::Pool;
use diesel::Connection;
use diesel::PgConnection;
use dlc_manager::contract::contract_input::ContractInput;
use dlc_manager::contract::contract_input::ContractInputInfo;
Expand All @@ -39,15 +43,19 @@ use ln_dlc_node::node::dlc_message_name;
use ln_dlc_node::node::sub_channel_message_name;
use ln_dlc_node::node::RunningNode;
use ln_dlc_node::WalletSettings;
use orderbook_commons::MatchState;
use orderbook_commons::OrderState;
use rust_decimal::prelude::ToPrimitive;
use rust_decimal::Decimal;
use std::sync::Arc;
use time::OffsetDateTime;
use tokio::sync::RwLock;
use trade::cfd::calculate_long_liquidation_price;
use trade::cfd::calculate_margin;
use trade::cfd::calculate_short_liquidation_price;
use trade::cfd::BTCUSD_MAX_PRICE;
use trade::Direction;
use uuid::Uuid;

pub mod channel_opening_fee;
pub mod closed_positions;
Expand Down Expand Up @@ -135,22 +143,49 @@ impl Node {

pub async fn trade(&self, trade_params: &TradeParams) -> Result<Invoice> {
let (fee_payment_hash, invoice) = self.fee_invoice_taker(trade_params).await?;
let mut connection = self.pool.get()?;

let order_id = trade_params.filled_with.order_id;
let order = orders::get_with_id(&mut connection, order_id)?
.with_context(|| format!("Could not find order with id {order_id}"))?;

ensure!(
order.expiry > OffsetDateTime::now_utc(),
"Can't execute a trade on an expired order"
);
ensure!(
order.order_state == OrderState::Matched,
"Can't execute trade with in invalid state {:?}",
order.order_state
);

let trader_id = trade_params.pubkey.to_string();
let order_id = trade_params.filled_with.order_id.to_string();
tracing::info!(trader_id, order_id, "Executing match");

match self.decide_trade_action(&trade_params.pubkey)? {
TradeAction::Open => {
ensure!(
self.settings.read().await.allow_opening_positions,
"Opening positions is disabled"
);
self.open_position(trade_params, fee_payment_hash).await?
if let Err(e) = self.open_position(trade_params, fee_payment_hash).await {
update_order_and_match(
&mut connection,
trade_params.filled_with.order_id,
MatchState::Failed,
OrderState::Failed,
)?;

bail!("Failed to initiate open position: trader_id={trader_id}, order_id={order_id}. Error: {e:#}")
}
}
TradeAction::Close(channel_id) => {
let peer_id = trade_params.pubkey;
tracing::info!(?trade_params, channel_id = %hex::encode(channel_id), %peer_id, "Closing position");

let closing_price = trade_params.average_execution_price();

let mut connection = self.pool.get()?;
let position = match db::positions::Position::get_open_position_by_trader(
&mut connection,
trade_params.pubkey.to_string(),
Expand All @@ -159,11 +194,35 @@ impl Node {
None => bail!("Failed to find open position : {}", trade_params.pubkey),
};

self.close_position(&position, closing_price, channel_id, fee_payment_hash)
.await?
if let Err(e) = self
.close_position(&position, closing_price, channel_id, fee_payment_hash)
.await
{
update_order_and_match(
&mut connection,
trade_params.filled_with.order_id,
MatchState::Failed,
OrderState::Failed,
)?;
holzeis marked this conversation as resolved.
Show resolved Hide resolved

bail!("trader_id={trader_id}, order_id={order_id}, Failed to initiate close position. Error: {e:#}")
}
}
};

tracing::info!(
trader_id,
order_id,
"Successfully processed match, setting match to Filled"
);

update_order_and_match(
&mut connection,
trade_params.filled_with.order_id,
MatchState::Filled,
OrderState::Taken,
)?;

Ok(invoice)
}

Expand Down Expand Up @@ -437,7 +496,7 @@ impl Node {
// todo(holzeis): It would be nice if dlc messages are also propagated via events, so the
// receiver can decide what events to process and we can skip this component specific logic
// here.
if let Message::Channel(ChannelMessage::RenewFinalize(r)) = msg {
if let Message::Channel(ChannelMessage::RenewFinalize(r)) = &msg {
self.finalize_rollover(r.channel_id)?;
}

Expand All @@ -455,6 +514,23 @@ impl Node {
}
}

fn update_order_and_match(
connection: &mut PgConnection,
order_id: Uuid,
match_state: MatchState,
order_state: OrderState,
) -> Result<()> {
connection
.transaction(|connection| {
matches::set_match_state(connection, order_id, match_state)?;

orders::set_order_state(connection, order_id, order_state)?;

diesel::result::QueryResult::Ok(())
})
.map_err(|e| anyhow!("Failed to update order and match. Error: {e:#}"))
}

pub enum TradeAction {
Open,
Close(ChannelId),
Expand Down
Loading