Skip to content

Commit

Permalink
chore: add APPFLOWY_COLLAB_REMOVE_BATCH_SIZE env to control num of co… (
Browse files Browse the repository at this point in the history
#779)

* chore: add APPFLOWY_COLLAB_REMOVE_BATCH_SIZE env to control num of collabs to be removed in one tick

* chore: clippy
  • Loading branch information
appflowy authored Sep 2, 2024
1 parent c2a5369 commit abd96d8
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 9 deletions.
1 change: 1 addition & 0 deletions deploy.env
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,4 @@ APPFLOWY_INDEXER_REDIS_URL=redis://redis:6379

# AppFlowy Collaborate
APPFLOWY_COLLABORATE_MULTI_THREAD=false
APPFLOWY_COLLABORATE_REMOVE_BATCH_SIZE=100
1 change: 1 addition & 0 deletions dev.env
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,4 @@ APPFLOWY_INDEXER_REDIS_URL=redis://redis:6379

# AppFlowy Collaborate
APPFLOWY_COLLABORATE_MULTI_THREAD=false
APPFLOWY_COLLABORATE_REMOVE_BATCH_SIZE=100
20 changes: 13 additions & 7 deletions services/appflowy-collaborate/src/group/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,34 @@ use dashmap::mapref::one::RefMut;
use dashmap::try_result::TryResult;
use dashmap::DashMap;
use tokio::time::sleep;
use tracing::{error, event, info, warn};

use collab_rt_entity::user::RealtimeUser;
use tracing::{error, event, info, trace, warn};

use crate::config::get_env_var;
use crate::error::RealtimeError;
use crate::group::group_init::CollabGroup;
use crate::metrics::CollabRealtimeMetrics;
use collab_rt_entity::user::RealtimeUser;

#[derive(Clone)]
pub(crate) struct GroupManagementState {
group_by_object_id: Arc<DashMap<String, Arc<CollabGroup>>>,
/// Keep track of all [Collab] objects that a user is subscribed to.
editing_by_user: Arc<DashMap<RealtimeUser, HashSet<Editing>>>,
metrics_calculate: Arc<CollabRealtimeMetrics>,
/// By default, the number of groups to remove in a single batch is 50.
remove_batch_size: usize,
}

impl GroupManagementState {
pub(crate) fn new(metrics_calculate: Arc<CollabRealtimeMetrics>) -> Self {
let remove_batch_size = get_env_var("APPFLOWY_COLLABORATE_REMOVE_BATCH_SIZE", "50")
.parse::<usize>()
.unwrap_or(50);
Self {
group_by_object_id: Arc::new(DashMap::new()),
editing_by_user: Arc::new(DashMap::new()),
metrics_calculate,
remove_batch_size,
}
}

Expand All @@ -39,18 +45,18 @@ impl GroupManagementState {
let (object_id, group) = (entry.key(), entry.value());
if group.is_inactive().await {
inactive_group_ids.push(object_id.clone());
if inactive_group_ids.len() > 10 {
if inactive_group_ids.len() > self.remove_batch_size {
break;
}
}
}

info!(
"total groups:{}, inactive group:{:?}, inactive group ids:{:?}",
"total groups:{}, inactive group:{}",
self.group_by_object_id.len() as i64,
inactive_group_ids.len(),
inactive_group_ids,
);

trace!("inactive group ids:{:?}", inactive_group_ids);
for object_id in &inactive_group_ids {
self.remove_group(object_id).await;
}
Expand Down
8 changes: 6 additions & 2 deletions services/appflowy-collaborate/src/rt_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,13 @@ fn spawn_period_check_inactive_group<S, AC>(
S: CollabStorage,
AC: RealtimeAccessControl,
{
let mut interval = interval(Duration::from_secs(60));
let mut interval = interval(Duration::from_secs(20));
let cloned_group_sender_by_object_id = group_sender_by_object_id.clone();
tokio::task::spawn_local(async move {
tokio::spawn(async move {
// when appflowy-collaborate start, wait for 60 seconds to start the check. Since no groups will
// be inactive in the first 60 seconds.
tokio::time::sleep(Duration::from_secs(60)).await;

loop {
interval.tick().await;
if let Some(groups) = weak_groups.upgrade() {
Expand Down

0 comments on commit abd96d8

Please sign in to comment.