From abd96d8b5677fdfc41dfac2e19da96ee835049d7 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Mon, 2 Sep 2024 11:24:40 +0800 Subject: [PATCH] =?UTF-8?q?chore:=20add=20APPFLOWY=5FCOLLAB=5FREMOVE=5FBAT?= =?UTF-8?q?CH=5FSIZE=20env=20to=20control=20num=20of=20co=E2=80=A6=20(#779?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * chore: add APPFLOWY_COLLAB_REMOVE_BATCH_SIZE env to control num of collabs to be removed in one tick * chore: clippy --- deploy.env | 1 + dev.env | 1 + .../appflowy-collaborate/src/group/state.rs | 20 ++++++++++++------- .../appflowy-collaborate/src/rt_server.rs | 8 ++++++-- 4 files changed, 21 insertions(+), 9 deletions(-) diff --git a/deploy.env b/deploy.env index 12ed54d6b..ab46858e7 100644 --- a/deploy.env +++ b/deploy.env @@ -131,3 +131,4 @@ APPFLOWY_INDEXER_REDIS_URL=redis://redis:6379 # AppFlowy Collaborate APPFLOWY_COLLABORATE_MULTI_THREAD=false +APPFLOWY_COLLABORATE_REMOVE_BATCH_SIZE=100 diff --git a/dev.env b/dev.env index ee3683184..f6bfe0470 100644 --- a/dev.env +++ b/dev.env @@ -117,3 +117,4 @@ APPFLOWY_INDEXER_REDIS_URL=redis://redis:6379 # AppFlowy Collaborate APPFLOWY_COLLABORATE_MULTI_THREAD=false +APPFLOWY_COLLABORATE_REMOVE_BATCH_SIZE=100 diff --git a/services/appflowy-collaborate/src/group/state.rs b/services/appflowy-collaborate/src/group/state.rs index 86b825439..e1051c271 100644 --- a/services/appflowy-collaborate/src/group/state.rs +++ b/services/appflowy-collaborate/src/group/state.rs @@ -6,13 +6,13 @@ use dashmap::mapref::one::RefMut; use dashmap::try_result::TryResult; use dashmap::DashMap; use tokio::time::sleep; -use tracing::{error, event, info, warn}; - -use collab_rt_entity::user::RealtimeUser; +use tracing::{error, event, info, trace, warn}; +use crate::config::get_env_var; use crate::error::RealtimeError; use crate::group::group_init::CollabGroup; use crate::metrics::CollabRealtimeMetrics; +use collab_rt_entity::user::RealtimeUser; #[derive(Clone)] pub(crate) struct GroupManagementState { @@ -20,14 +20,20 @@ pub(crate) struct GroupManagementState { /// Keep track of all [Collab] objects that a user is subscribed to. editing_by_user: Arc>>, metrics_calculate: Arc, + /// By default, the number of groups to remove in a single batch is 50. + remove_batch_size: usize, } impl GroupManagementState { pub(crate) fn new(metrics_calculate: Arc) -> Self { + let remove_batch_size = get_env_var("APPFLOWY_COLLABORATE_REMOVE_BATCH_SIZE", "50") + .parse::() + .unwrap_or(50); Self { group_by_object_id: Arc::new(DashMap::new()), editing_by_user: Arc::new(DashMap::new()), metrics_calculate, + remove_batch_size, } } @@ -39,18 +45,18 @@ impl GroupManagementState { let (object_id, group) = (entry.key(), entry.value()); if group.is_inactive().await { inactive_group_ids.push(object_id.clone()); - if inactive_group_ids.len() > 10 { + if inactive_group_ids.len() > self.remove_batch_size { break; } } } - info!( - "total groups:{}, inactive group:{:?}, inactive group ids:{:?}", + "total groups:{}, inactive group:{}", self.group_by_object_id.len() as i64, inactive_group_ids.len(), - inactive_group_ids, ); + + trace!("inactive group ids:{:?}", inactive_group_ids); for object_id in &inactive_group_ids { self.remove_group(object_id).await; } diff --git a/services/appflowy-collaborate/src/rt_server.rs b/services/appflowy-collaborate/src/rt_server.rs index eb3080bac..4422f25ad 100644 --- a/services/appflowy-collaborate/src/rt_server.rs +++ b/services/appflowy-collaborate/src/rt_server.rs @@ -282,9 +282,13 @@ fn spawn_period_check_inactive_group( S: CollabStorage, AC: RealtimeAccessControl, { - let mut interval = interval(Duration::from_secs(60)); + let mut interval = interval(Duration::from_secs(20)); let cloned_group_sender_by_object_id = group_sender_by_object_id.clone(); - tokio::task::spawn_local(async move { + tokio::spawn(async move { + // when appflowy-collaborate start, wait for 60 seconds to start the check. Since no groups will + // be inactive in the first 60 seconds. + tokio::time::sleep(Duration::from_secs(60)).await; + loop { interval.tick().await; if let Some(groups) = weak_groups.upgrade() {