Skip to content

Commit

Permalink
feat: Measure sync (#772)
Browse files Browse the repository at this point in the history
* chore: redesign collab sync protocol to enable injection of metric tracking

* chore: track metrics for apply update on the server side

* chore: close locks

* chore: post rebase fixes
  • Loading branch information
Horusiath authored Aug 30, 2024
1 parent 405d970 commit 6972f9c
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 153 deletions.
7 changes: 4 additions & 3 deletions libs/client-api/src/collab_sync/collab_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use yrs::ReadTxn;
use client_api_entity::{validate_data_for_folder, CollabType};
use collab_rt_entity::{AckCode, ClientCollabMessage, ServerCollabMessage, ServerInit, UpdateSync};
use collab_rt_protocol::{
handle_message_follow_protocol, ClientSyncProtocol, Message, MessageReader, SyncMessage,
ClientSyncProtocol, CollabSyncProtocol, Message, MessageReader, SyncMessage,
};

use crate::af_spawn;
Expand Down Expand Up @@ -344,8 +344,9 @@ where
.map_err(|err| SyncError::OverrideWithIncorrectData(err.to_string()))?;
}

if let Some(return_payload) =
handle_message_follow_protocol(&message_origin, &ClientSyncProtocol, &collab, msg).await?
if let Some(return_payload) = ClientSyncProtocol
.handle_message(&message_origin, &collab, msg)
.await?
{
let object_id = sync_object.object_id.clone();
sink.queue_msg(|msg_id| {
Expand Down
168 changes: 79 additions & 89 deletions libs/collab-rt-protocol/src/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::borrow::BorrowMut;
use std::sync::Arc;

use async_trait::async_trait;
use collab::core::awareness::{Awareness, AwarenessUpdate};
use collab::core::collab::{TransactionExt, TransactionMutExt};
use collab::core::origin::CollabOrigin;
Expand Down Expand Up @@ -34,6 +35,8 @@ use crate::message::{CustomMessage, Message, RTProtocolError, SyncMessage, SyncM
/// A implementation of [CollabSyncProtocol].
#[derive(Clone)]
pub struct ClientSyncProtocol;

#[async_trait]
impl CollabSyncProtocol for ClientSyncProtocol {
fn check<E: Encoder>(&self, encoder: &mut E, last_sync_at: i64) -> Result<(), RTProtocolError> {
let meta = SyncMeta { last_sync_at };
Expand All @@ -43,13 +46,17 @@ impl CollabSyncProtocol for ClientSyncProtocol {

/// Handle reply for a sync-step-1 send from this replica previously. By default just apply
/// an update to current `awareness` document instance.
fn handle_sync_step2(
async fn handle_sync_step2(
&self,
origin: &CollabOrigin,
awareness: &mut Awareness,
update: Update,
) -> Result<(), RTProtocolError> {
let mut txn = awareness
collab: &CollabRef,
update: Vec<u8>,
) -> Result<Option<Vec<u8>>, RTProtocolError> {
let update = decode_update(update).await?;
let mut lock = collab.write().await;
let collab = (*lock).borrow_mut();
let mut txn = collab
.get_awareness()
.doc()
.try_transact_mut_with(origin.clone())
.map_err(|err| {
Expand Down Expand Up @@ -77,12 +84,41 @@ impl CollabSyncProtocol for ClientSyncProtocol {
reason: "client miss updates".to_string(),
})
},
None => Ok(()),
None => Ok(None),
}
}
}

pub type CollabRef = Arc<RwLock<dyn BorrowMut<Collab> + Send + Sync + 'static>>;

#[async_trait]
pub trait CollabSyncProtocol {
/// Handles incoming messages from the client/server
async fn handle_message(
&self,
message_origin: &CollabOrigin,
collab: &CollabRef,
msg: Message,
) -> Result<Option<Vec<u8>>, RTProtocolError> {
match msg {
Message::Sync(msg) => match msg {
SyncMessage::SyncStep1(sv) => self.handle_sync_step1(collab, sv).await,
SyncMessage::SyncStep2(update) => {
self.handle_sync_step2(message_origin, collab, update).await
},
SyncMessage::Update(update) => self.handle_update(message_origin, collab, update).await,
},
Message::Auth(reason) => self.handle_auth(collab, reason).await,
//FIXME: where is the QueryAwareness protocol?
Message::Awareness(update) => {
self
.handle_awareness_update(message_origin, collab, update)
.await
},
Message::Custom(msg) => self.handle_custom_message(collab, msg).await,
}
}

fn check<E: Encoder>(&self, _encoder: &mut E, _last_sync_at: i64) -> Result<(), RTProtocolError> {
Ok(())
}
Expand Down Expand Up @@ -121,50 +157,55 @@ pub trait CollabSyncProtocol {

/// Given a [StateVector] of a remote side, calculate missing
/// updates. Returns a sync-step-2 message containing a calculated update.
fn handle_sync_step1(
async fn handle_sync_step1(
&self,
awareness: &Awareness,
collab: &CollabRef,
sv: StateVector,
) -> Result<Option<Vec<u8>>, RTProtocolError> {
let txn = awareness.doc().try_transact().map_err(|err| {
RTProtocolError::YrsTransaction(format!("fail to handle sync step1. error: {}", err))
})?;
let update = txn.try_encode_state_as_update_v1(&sv).map_err(|err| {
RTProtocolError::YrsEncodeState(format!(
"fail to encode state as update. error: {}\ninit state vector: {:?}\ndocument state: {:#?}",
err,
sv,
txn.store()
))
})?;
// calculate missing updates base on the input state vector
let update = {
let lock = collab.read().await;
let collab = lock.borrow();
let txn = collab.get_awareness().doc().try_transact().map_err(|err| {
RTProtocolError::YrsTransaction(format!("fail to handle sync step1. error: {}", err))
})?;
txn.try_encode_state_as_update_v1(&sv).map_err(|err| {
RTProtocolError::YrsEncodeState(format!(
"fail to encode state as update. error: {}\ninit state vector: {:?}\ndocument state: {:#?}",
err,
sv,
txn.store()
))
})?
};
Ok(Some(
Message::Sync(SyncMessage::SyncStep2(update)).encode_v1(),
))
}

/// Handle reply for a sync-step-1 send from this replica previously. By default just apply
/// an update to current `awareness` document instance.
fn handle_sync_step2(
async fn handle_sync_step2(
&self,
origin: &CollabOrigin,
awareness: &mut Awareness,
update: Update,
) -> Result<(), RTProtocolError>;
collab: &CollabRef,
update: Vec<u8>,
) -> Result<Option<Vec<u8>>, RTProtocolError>;

/// Handle continuous update send from the client. By default just apply an update to a current
/// `awareness` document instance.
fn handle_update(
async fn handle_update(
&self,
origin: &CollabOrigin,
awareness: &mut Awareness,
update: Update,
) -> Result<(), RTProtocolError> {
self.handle_sync_step2(origin, awareness, update)
collab: &CollabRef,
update: Vec<u8>,
) -> Result<Option<Vec<u8>>, RTProtocolError> {
self.handle_sync_step2(origin, collab, update).await
}

fn handle_auth(
async fn handle_auth(
&self,
_awareness: &Awareness,
_collab: &CollabRef,
deny_reason: Option<String>,
) -> Result<Option<Vec<u8>>, RTProtocolError> {
if let Some(reason) = deny_reason {
Expand All @@ -176,19 +217,21 @@ pub trait CollabSyncProtocol {

/// Reply to awareness query or just incoming [AwarenessUpdate], where current `awareness`
/// instance is being updated with incoming data.
fn handle_awareness_update(
async fn handle_awareness_update(
&self,
_message_origin: &CollabOrigin,
awareness: &mut Awareness,
collab: &CollabRef,
update: AwarenessUpdate,
) -> Result<Option<Vec<u8>>, RTProtocolError> {
awareness.apply_update(update)?;
let mut lock = collab.write().await;
let collab = (*lock).borrow_mut();
collab.get_awareness().apply_update(update)?;
Ok(None)
}

fn handle_custom_message(
async fn handle_custom_message(
&self,
_awareness: &mut Awareness,
_collab: &CollabRef,
_msg: CustomMessage,
) -> Result<Option<Vec<u8>>, RTProtocolError> {
Ok(None)
Expand All @@ -198,7 +241,7 @@ pub trait CollabSyncProtocol {
const LARGE_UPDATE_THRESHOLD: usize = 1024 * 1024; // 1MB

#[inline]
async fn decode_update(update: Vec<u8>) -> Result<Update, RTProtocolError> {
pub async fn decode_update(update: Vec<u8>) -> Result<Update, RTProtocolError> {
let update = if update.len() > LARGE_UPDATE_THRESHOLD {
spawn_blocking(move || Update::decode_v1(&update))
.await
Expand All @@ -208,56 +251,3 @@ async fn decode_update(update: Vec<u8>) -> Result<Update, RTProtocolError> {
}?;
Ok(update)
}

/// Handles incoming messages from the client/server
pub async fn handle_message_follow_protocol<P>(
message_origin: &CollabOrigin,
protocol: &P,
collab: &Arc<RwLock<dyn BorrowMut<Collab> + Send + Sync + 'static>>,
msg: Message,
) -> Result<Option<Vec<u8>>, RTProtocolError>
where
P: CollabSyncProtocol,
{
match msg {
Message::Sync(msg) => match msg {
SyncMessage::SyncStep1(sv) => {
// calculate missing updates base on the input state vector
let lock = collab.read().await;
let collab = lock.borrow();
let update = protocol.handle_sync_step1(collab.get_awareness(), sv)?;
Ok(update)
},
SyncMessage::SyncStep2(update) => {
let update = decode_update(update).await?;
let mut lock = collab.write().await;
let collab = (*lock).borrow_mut();
protocol.handle_sync_step2(message_origin, collab.get_mut_awareness(), update)?;
Ok(None)
},
SyncMessage::Update(update) => {
let update = decode_update(update).await?;
let mut lock = collab.write().await;
let collab = (*lock).borrow_mut();
protocol.handle_update(message_origin, collab.get_mut_awareness(), update)?;
Ok(None)
},
},
Message::Auth(reason) => {
let lock = collab.read().await;
let collab = lock.borrow();
protocol.handle_auth(collab.get_awareness(), reason)
},
//FIXME: where is the QueryAwareness protocol?
Message::Awareness(update) => {
let mut lock = collab.write().await;
let collab = (*lock).borrow_mut();
protocol.handle_awareness_update(message_origin, collab.get_mut_awareness(), update)
},
Message::Custom(msg) => {
let mut lock = collab.write().await;
let collab = (*lock).borrow_mut();
protocol.handle_custom_message(collab.get_mut_awareness(), msg)
},
}
}
16 changes: 9 additions & 7 deletions services/appflowy-collaborate/src/group/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use collab_rt_entity::{AckCode, MsgId};
use collab_rt_entity::{
AwarenessSync, BroadcastSync, ClientCollabMessage, CollabAck, CollabMessage,
};
use collab_rt_protocol::{handle_message_follow_protocol, RTProtocolError, SyncMessage};
use collab_rt_protocol::{Message, MessageReader, MSG_SYNC, MSG_SYNC_UPDATE};
use collab_rt_protocol::{CollabSyncProtocol, Message, MessageReader, MSG_SYNC, MSG_SYNC_UPDATE};
use collab_rt_protocol::{RTProtocolError, SyncMessage};

use crate::error::RealtimeError;
use crate::group::group_init::EditState;
Expand Down Expand Up @@ -279,7 +279,7 @@ async fn handle_client_messages<Sink>(
message_map: MessageByObjectId,
sink: &mut Sink,
collab: Arc<RwLock<dyn BorrowMut<Collab> + Send + Sync + 'static>>,
metrics_calculate: &CollabRealtimeMetrics,
metrics_calculate: &Arc<CollabRealtimeMetrics>,
edit_state: &Arc<EditState>,
) where
Sink: SinkExt<CollabMessage> + Unpin + 'static,
Expand Down Expand Up @@ -337,7 +337,7 @@ async fn handle_one_client_message(
object_id: &str,
collab_msg: &ClientCollabMessage,
collab: &Arc<RwLock<dyn BorrowMut<Collab> + Send + Sync + 'static>>,
metrics_calculate: &CollabRealtimeMetrics,
metrics_calculate: &Arc<CollabRealtimeMetrics>,
edit_state: &Arc<EditState>,
) -> Result<CollabAck, RealtimeError> {
let msg_id = collab_msg.msg_id();
Expand Down Expand Up @@ -382,7 +382,7 @@ async fn handle_one_message_payload(
msg_id: MsgId,
payload: &Bytes,
collab: &Arc<RwLock<dyn BorrowMut<Collab> + Send + Sync + 'static>>,
metrics_calculate: &CollabRealtimeMetrics,
metrics_calculate: &Arc<CollabRealtimeMetrics>,
edit_state: &Arc<EditState>,
) -> Result<CollabAck, RealtimeError> {
let payload = payload.clone();
Expand Down Expand Up @@ -416,7 +416,7 @@ async fn handle_message(
payload: &Bytes,
message_origin: &CollabOrigin,
collab: &Arc<RwLock<dyn BorrowMut<Collab> + Send + Sync + 'static>>,
metrics_calculate: &CollabRealtimeMetrics,
metrics_calculate: &Arc<CollabRealtimeMetrics>,
object_id: &str,
msg_id: MsgId,
edit_state: &Arc<EditState>,
Expand All @@ -430,7 +430,9 @@ async fn handle_message(
match msg {
Ok(msg) => {
is_sync_step2 = matches!(msg, Message::Sync(SyncMessage::SyncStep2(_)));
match handle_message_follow_protocol(message_origin, &ServerSyncProtocol, collab, msg).await
match ServerSyncProtocol::new(metrics_calculate.clone())
.handle_message(message_origin, collab, msg)
.await
{
Ok(payload) => {
metrics_calculate.apply_update_count.inc();
Expand Down
Loading

0 comments on commit 6972f9c

Please sign in to comment.