Skip to content

Commit

Permalink
fix: Block a db connection during a rollover as short as possible.
Browse files Browse the repository at this point in the history
Since the connections are limited it's better to not block a connection unnecessarily for a long time.

As we are using a connection pool (r2d2) getting a connection is very cheap.

This might help in the context of #2629
  • Loading branch information
holzeis committed Jun 17, 2024
1 parent f632f0e commit 5fa45d9
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 55 deletions.
91 changes: 53 additions & 38 deletions coordinator/src/node/rollover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use bitcoin::secp256k1::PublicKey;
use bitcoin::Network;
use diesel::r2d2::ConnectionManager;
use diesel::r2d2::Pool;
use diesel::r2d2::PooledConnection;
use diesel::PgConnection;
use dlc_manager::contract::contract_input::ContractInput;
use dlc_manager::contract::contract_input::ContractInputInfo;
Expand Down Expand Up @@ -92,37 +91,41 @@ impl Node {
trader_id: PublicKey,
network: Network,
) -> Result<()> {
let mut conn = spawn_blocking(move || pool.get())
.await
.expect("task to complete")?;

tracing::debug!(%trader_id, "Checking if the user's position is eligible for rollover");

if check_version(&mut conn, &trader_id).is_err() {
tracing::info!(
%trader_id,
"User is not on the latest version. \
Will not check if their position is eligible for rollover"
);
return Ok(());
}
let position = spawn_blocking({
move || {
let mut conn = pool.get()?;

if check_version(&mut conn, &trader_id).is_err() {
tracing::info!(
%trader_id,
"User is not on the latest version. \
Will not check if their position is eligible for rollover"
);
return anyhow::Ok(None);
}

let position = match positions::Position::get_position_by_trader(
&mut conn,
trader_id,
vec![PositionState::Open, PositionState::Rollover],
)? {
Some(position) => position,
None => return Ok(()),
};
let position = positions::Position::get_position_by_trader(
&mut conn,
trader_id,
vec![PositionState::Open, PositionState::Rollover],
)?;

anyhow::Ok(position)
}
})
.await??;

if let Some(position) = position {
tracing::debug!(%trader_id, "Checking if the user's position is eligible for rollover");
self.check_rollover(position, network, &notifier, None)
.await?;
}

self.check_rollover(&mut conn, position, network, &notifier, None)
.await
Ok(())
}

pub async fn check_rollover(
&self,
connection: &mut PooledConnection<ConnectionManager<PgConnection>>,
position: Position,
network: Network,
notifier: &mpsc::Sender<Notification>,
Expand Down Expand Up @@ -156,13 +159,8 @@ impl Node {

if self.is_connected(trader_id) {
tracing::info!(%trader_id, "Proposing to rollover DLC channel");
self.propose_rollover(
connection,
&signed_channel.channel_id,
position,
self.inner.network,
)
.await?;
self.propose_rollover(&signed_channel.channel_id, position, self.inner.network)
.await?;
} else {
tracing::warn!(%trader_id, "Skipping rollover, user is not connected.");
}
Expand All @@ -174,7 +172,6 @@ impl Node {
/// Initiates the rollover protocol with the app.
pub async fn propose_rollover(
&self,
conn: &mut PooledConnection<ConnectionManager<PgConnection>>,
dlc_channel_id: &DlcChannelId,
position: Position,
network: Network,
Expand Down Expand Up @@ -218,8 +215,17 @@ impl Node {
let maintenance_margin_rate =
Decimal::try_from(maintenance_margin_rate).expect("to fit into decimal");

let funding_fee_events =
get_outstanding_funding_fee_events(conn, trader_pubkey, position.id)?;
let funding_fee_events = spawn_blocking({
let pool = self.pool.clone();
move || {
let mut conn = pool.get()?;
let funding_fee_events =
get_outstanding_funding_fee_events(&mut conn, trader_pubkey, position.id)?;

anyhow::Ok(funding_fee_events)
}
})
.await??;

let funding_fee = funding_fee_from_funding_fee_events(&funding_fee_events);

Expand Down Expand Up @@ -328,8 +334,17 @@ impl Node {
)
.context("Failed to insert start of rollover protocol in dlc_protocols table")?;

db::positions::Position::rollover_position(conn, trader_pubkey, &next_expiry)
.context("Failed to set position state to rollover")?;
spawn_blocking({
let pool = self.pool.clone();
move || {
let mut conn = pool.get()?;
positions::Position::rollover_position(&mut conn, trader_pubkey, &next_expiry)
.context("Failed to set position state to rollover")?;

anyhow::Ok(())
}
})
.await??;

self.inner
.event_handler
Expand Down
7 changes: 1 addition & 6 deletions coordinator/src/routes/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,12 +428,7 @@ pub async fn rollover(

state
.node
.propose_rollover(
&mut connection,
&dlc_channel_id,
position,
state.node.inner.network,
)
.propose_rollover(&dlc_channel_id, position, state.node.inner.network)
.await
.map_err(|e| {
AppError::InternalServerError(format!("Failed to rollover DLC channel: {e:#}",))
Expand Down
26 changes: 15 additions & 11 deletions coordinator/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,6 @@ fn build_rollover_notification_job(
) -> Result<Job, JobSchedulerError> {
Job::new_async(schedule, move |_, _| {
let notifier = notifier.clone();
let mut conn = match pool.get() {
Ok(conn) => conn,
Err(e) => {
return Box::pin(async move {
tracing::error!("Failed to get connection. Error: {e:#}")
});
}
};

if !commons::is_eligible_for_rollover(OffsetDateTime::now_utc(), network) {
return Box::pin(async move {
Expand All @@ -226,8 +218,21 @@ fn build_rollover_notification_job(
// calculates the expiry of the next rollover window. positions which have an
// expiry before that haven't rolled over yet, and need to be reminded.
let expiry = commons::calculate_next_expiry(OffsetDateTime::now_utc(), network);
match db::positions::Position::get_all_open_positions_with_expiry_before(&mut conn, expiry)
{

let positions = {
let mut conn = match pool.get() {
Ok(conn) => conn,
Err(e) => {
return Box::pin(async move {
tracing::error!("Failed to get connection. Error: {e:#}")
});
}
};

db::positions::Position::get_all_open_positions_with_expiry_before(&mut conn, expiry)
};

match positions {
Ok(positions) => Box::pin({
tracing::debug!(
nr_of_positions = positions.len(),
Expand All @@ -239,7 +244,6 @@ fn build_rollover_notification_job(
for position in positions {
if let Err(e) = node
.check_rollover(
&mut conn,
position,
node.inner.network,
&notifier,
Expand Down

0 comments on commit 5fa45d9

Please sign in to comment.