Skip to content

Commit

Permalink
chore: use colla storage instead of select from db directly (#782)
Browse files Browse the repository at this point in the history
  • Loading branch information
appflowy authored Sep 2, 2024
1 parent 299680c commit 365c64c
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 61 deletions.
15 changes: 10 additions & 5 deletions libs/database/src/collab/collab_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ pub trait CollabStorageAccessControl: Send + Sync + 'static {
) -> Result<bool, AppError>;
}

pub enum GetCollabOrigin {
User { uid: i64 },
Server,
}

/// Represents a storage mechanism for collaborations.
///
/// This trait provides asynchronous methods for CRUD operations related to collaborations.
Expand Down Expand Up @@ -113,9 +118,9 @@ pub trait CollabStorage: Send + Sync + 'static {
/// * `Result<RawData>` - Returns the data of the collaboration if found, `Err` otherwise.
async fn get_encode_collab(
&self,
uid: &i64,
origin: GetCollabOrigin,
params: QueryCollabParams,
is_collab_init: bool,
from_editing_collab: bool,
) -> AppResult<EncodedCollab>;

async fn batch_get_collab(
Expand Down Expand Up @@ -208,13 +213,13 @@ where

async fn get_encode_collab(
&self,
uid: &i64,
origin: GetCollabOrigin,
params: QueryCollabParams,
is_collab_init: bool,
from_editing_collab: bool,
) -> AppResult<EncodedCollab> {
self
.as_ref()
.get_encode_collab(uid, params, is_collab_init)
.get_encode_collab(origin, params, from_editing_collab)
.await
}

Expand Down
16 changes: 15 additions & 1 deletion libs/database/src/index/collab_embeddings_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use pgvector::Vector;
use sqlx::{Error, Executor, Postgres, Transaction};
use uuid::Uuid;

use database_entity::dto::{AFCollabEmbeddingParams, IndexingStatus};
use database_entity::dto::{
AFCollabEmbeddingParams, IndexingStatus, QueryCollab, QueryCollabParams,
};

pub async fn get_index_status<'a, E>(
tx: E,
Expand Down Expand Up @@ -143,3 +145,15 @@ pub struct CollabId {
pub workspace_id: Uuid,
pub object_id: String,
}

impl From<CollabId> for QueryCollabParams {
fn from(value: CollabId) -> Self {
QueryCollabParams {
workspace_id: value.workspace_id.to_string(),
inner: QueryCollab {
object_id: value.object_id,
collab_type: value.collab_type,
},
}
}
}
22 changes: 6 additions & 16 deletions services/appflowy-collaborate/src/collab/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,7 @@ impl CollabCache {
}
}

pub async fn get_encode_collab(
&self,
uid: &i64,
query: QueryCollab,
) -> Result<EncodedCollab, AppError> {
pub async fn get_encode_collab(&self, query: QueryCollab) -> Result<EncodedCollab, AppError> {
self.total_attempts.fetch_add(1, Ordering::Relaxed);
// Attempt to retrieve encoded collab from memory cache, falling back to disk cache if necessary.
if let Some(encoded_collab) = self.mem_cache.get_encode_collab(&query.object_id).await {
Expand All @@ -86,10 +82,7 @@ impl CollabCache {

// Retrieve from disk cache as fallback. After retrieval, the value is inserted into the memory cache.
let object_id = query.object_id.clone();
let encode_collab = self
.disk_cache
.get_collab_encoded_from_disk(uid, query)
.await?;
let encode_collab = self.disk_cache.get_collab_encoded_from_disk(query).await?;

// spawn a task to insert the encoded collab into the memory cache
let cloned_encode_collab = encode_collab.clone();
Expand Down Expand Up @@ -180,13 +173,9 @@ impl CollabCache {

pub async fn get_encode_collab_from_disk(
&self,
uid: &i64,
query: QueryCollab,
) -> Result<EncodedCollab, AppError> {
let encode_collab = self
.disk_cache
.get_collab_encoded_from_disk(uid, query)
.await?;
let encode_collab = self.disk_cache.get_collab_encoded_from_disk(query).await?;
Ok(encode_collab)
}

Expand Down Expand Up @@ -219,11 +208,11 @@ impl CollabCache {
}

pub fn query_state(&self) -> QueryState {
let successful_attempts = self.success_attempts.load(Ordering::Relaxed);
let success_attempts = self.success_attempts.load(Ordering::Relaxed);
let total_attempts = self.total_attempts.load(Ordering::Relaxed);
QueryState {
total_attempts,
success_attempts: successful_attempts,
success_attempts,
}
}

Expand All @@ -249,6 +238,7 @@ impl CollabCache {
}
}

#[derive(Debug)]
pub struct QueryState {
pub total_attempts: u64,
pub success_attempts: u64,
Expand Down
1 change: 0 additions & 1 deletion services/appflowy-collaborate/src/collab/disk_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ impl CollabDiskCache {
#[instrument(level = "trace", skip_all)]
pub async fn get_collab_encoded_from_disk(
&self,
_uid: &i64,
query: QueryCollab,
) -> Result<EncodedCollab, AppError> {
event!(
Expand Down
1 change: 0 additions & 1 deletion services/appflowy-collaborate/src/collab/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ impl StorageQueue {
priority: WritePriority,
) -> Result<(), AppError> {
trace!("queuing {} object to pending write queue", params.object_id,);
// TODO(nathan): compress the data before storing it in Redis
self
.collab_cache
.insert_encode_collab_data_in_mem(params)
Expand Down
42 changes: 24 additions & 18 deletions services/appflowy-collaborate/src/collab/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use crate::collab::cache::CollabCache;
use crate::command::{CLCommandSender, CollaborationCommand};
use crate::shared_state::RealtimeSharedState;
use app_error::AppError;
use database::collab::{AppResult, CollabMetadata, CollabStorage, CollabStorageAccessControl};
use database::collab::{
AppResult, CollabMetadata, CollabStorage, CollabStorageAccessControl, GetCollabOrigin,
};
use database_entity::dto::{
AFAccessLevel, AFSnapshotMeta, AFSnapshotMetas, CollabParams, InsertSnapshotParams, QueryCollab,
QueryCollabParams, QueryCollabResult, SnapshotData,
Expand Down Expand Up @@ -277,30 +279,34 @@ where
Ok(())
}

#[instrument(level = "trace", skip_all, fields(oid = %params.object_id, is_collab_init = %is_collab_init))]
#[instrument(level = "trace", skip_all, fields(oid = %params.object_id, from_editing_collab = %from_editing_collab))]
async fn get_encode_collab(
&self,
uid: &i64,
origin: GetCollabOrigin,
params: QueryCollabParams,
is_collab_init: bool,
from_editing_collab: bool,
) -> AppResult<EncodedCollab> {
params.validate()?;

// Check if the user has enough permissions to access the collab
let can_read = self
.access_control
.enforce_read_collab(&params.workspace_id, uid, &params.object_id)
.await?;

if !can_read {
return Err(AppError::NotEnoughPermissions {
user: uid.to_string(),
action: format!("read collab:{}", params.object_id),
});
match origin {
GetCollabOrigin::User { uid } => {
// Check if the user has enough permissions to access the collab
let can_read = self
.access_control
.enforce_read_collab(&params.workspace_id, &uid, &params.object_id)
.await?;

if !can_read {
return Err(AppError::NotEnoughPermissions {
user: uid.to_string(),
action: format!("read collab:{}", params.object_id),
});
}
},
GetCollabOrigin::Server => {},
}

// Early return if editing collab is initialized, as it indicates no need to query further.
if !is_collab_init {
if from_editing_collab {
// Attempt to retrieve encoded collab from the editing collab
if let Some(value) = self.get_encode_collab_from_editing(&params.object_id).await {
trace!(
Expand All @@ -311,7 +317,7 @@ where
}
}

let encode_collab = self.cache.get_encode_collab(uid, params.inner).await?;
let encode_collab = self.cache.get_encode_collab(params.inner).await?;
Ok(encode_collab)
}

Expand Down
4 changes: 2 additions & 2 deletions services/appflowy-collaborate/src/group/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use collab_rt_entity::CollabMessage;
use collab_stream::client::{CollabRedisStream, CONTROL_STREAM_KEY};
use collab_stream::model::CollabControlEvent;
use collab_stream::stream_group::StreamGroup;
use database::collab::CollabStorage;
use database::collab::{CollabStorage, GetCollabOrigin};
use database_entity::dto::QueryCollabParams;

use crate::client::client_msg_router::ClientMessageRouter;
Expand Down Expand Up @@ -271,7 +271,7 @@ where
S: CollabStorage,
{
let encode_collab = storage
.get_encode_collab(&uid, params.clone(), true)
.get_encode_collab(GetCollabOrigin::User { uid }, params.clone(), false)
.await?;
let result = Collab::new_with_source(
CollabOrigin::Server,
Expand Down
16 changes: 9 additions & 7 deletions services/appflowy-collaborate/src/indexer/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::config::get_env_var;
use crate::indexer::DocumentIndexer;
use app_error::AppError;
use appflowy_ai_client::client::AppFlowyAIClient;
use database::collab::select_blob_from_af_collab;
use database::collab::{CollabStorage, GetCollabOrigin};
use database::index::{get_collabs_without_embeddings, upsert_collab_embeddings};
use database::workspace::select_workspace_settings;
use database_entity::dto::{AFCollabEmbeddingParams, AFCollabEmbeddings, CollabParams};
Expand Down Expand Up @@ -93,20 +93,22 @@ impl IndexerProvider {

fn get_unindexed_collabs(
&self,
storage: Arc<dyn CollabStorage>,
) -> Pin<Box<dyn Stream<Item = Result<UnindexedCollab, anyhow::Error>> + Send>> {
let db = self.db.clone();

Box::pin(try_stream! {
let collabs = get_collabs_without_embeddings(&db).await?;

if !collabs.is_empty() {
tracing::trace!("found {} unindexed collabs", collabs.len());
}
for cid in collabs {
match &cid.collab_type {
CollabType::Document => {
let collab =
select_blob_from_af_collab(&db, &CollabType::Document, &cid.object_id).await?;
let collab = EncodedCollab::decode_from_bytes(&collab)?;
let collab = storage
.get_encode_collab(GetCollabOrigin::Server, cid.clone().into(), false)
.await?;

yield UnindexedCollab {
workspace_id: cid.workspace_id,
object_id: cid.object_id,
Expand All @@ -125,8 +127,8 @@ impl IndexerProvider {
})
}

pub async fn handle_unindexed_collabs(indexer: Arc<Self>) {
let mut stream = indexer.get_unindexed_collabs();
pub async fn handle_unindexed_collabs(indexer: Arc<Self>, storage: Arc<dyn CollabStorage>) {
let mut stream = indexer.get_unindexed_collabs(storage);
while let Some(result) = stream.next().await {
match result {
Ok(collab) => {
Expand Down
12 changes: 9 additions & 3 deletions services/appflowy-collaborate/src/rt_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ where

spawn_metrics(metrics.clone(), storage.clone());

spawn_handle_unindexed_collabs(indexer_provider);
spawn_handle_unindexed_collabs(indexer_provider, storage.clone());

Ok(Self {
storage,
Expand Down Expand Up @@ -271,8 +271,14 @@ where
}
}

fn spawn_handle_unindexed_collabs(indexer_provider: Arc<IndexerProvider>) {
tokio::spawn(IndexerProvider::handle_unindexed_collabs(indexer_provider));
fn spawn_handle_unindexed_collabs(
indexer_provider: Arc<IndexerProvider>,
storage: Arc<dyn CollabStorage>,
) {
tokio::spawn(IndexerProvider::handle_unindexed_collabs(
indexer_provider,
storage,
));
}

fn spawn_period_check_inactive_group<S, AC>(
Expand Down
10 changes: 5 additions & 5 deletions src/api/workspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use authentication::jwt::{OptionalUserUuid, UserUuid};
use collab_rt_entity::realtime_proto::HttpRealtimeMessage;
use collab_rt_entity::RealtimeMessage;
use collab_rt_protocol::validate_encode_collab;
use database::collab::CollabStorage;
use database::collab::{CollabStorage, GetCollabOrigin};
use database::user::select_uid_from_email;
use database_entity::dto::*;
use shared_entity::dto::workspace_dto::*;
Expand Down Expand Up @@ -764,7 +764,7 @@ async fn get_collab_handler(
let object_id = params.object_id.clone();
let encode_collab = state
.collab_access_control_storage
.get_encode_collab(&uid, params, false)
.get_encode_collab(GetCollabOrigin::User { uid }, params, true)
.await
.map_err(AppResponseError::from)?;

Expand Down Expand Up @@ -800,7 +800,7 @@ async fn v1_get_collab_handler(

let encode_collab = state
.collab_access_control_storage
.get_encode_collab(&uid, param, false)
.get_encode_collab(GetCollabOrigin::User { uid }, param, true)
.await
.map_err(AppResponseError::from)?;

Expand Down Expand Up @@ -845,9 +845,9 @@ async fn create_collab_snapshot_handler(
let encoded_collab_v1 = state
.collab_access_control_storage
.get_encode_collab(
&uid,
GetCollabOrigin::User { uid },
QueryCollabParams::new(&object_id, collab_type.clone(), &workspace_id),
false,
true,
)
.await?
.encode_to_bytes()
Expand Down
4 changes: 2 additions & 2 deletions tests/collab/storage_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ async fn simulate_small_data_set_write(pool: PgPool) {
collab_type: params.collab_type.clone(),
};
let encode_collab_from_disk = collab_cache
.get_encode_collab_from_disk(&user.uid, query)
.get_encode_collab_from_disk(query)
.await
.unwrap();

Expand Down Expand Up @@ -558,7 +558,7 @@ async fn simulate_large_data_set_write(pool: PgPool) {
collab_type: params.collab_type.clone(),
};
let encode_collab_from_disk = collab_cache
.get_encode_collab_from_disk(&user.uid, query)
.get_encode_collab_from_disk(query)
.await
.unwrap();
assert_eq!(
Expand Down

0 comments on commit 365c64c

Please sign in to comment.