diff --git a/src/commons/eventsourcing/store.rs b/src/commons/eventsourcing/store.rs index 6489fc162..ec9d57505 100644 --- a/src/commons/eventsourcing/store.rs +++ b/src/commons/eventsourcing/store.rs @@ -1,5 +1,4 @@ use std::{ - borrow::Cow, collections::HashMap, fmt, str::FromStr, @@ -34,7 +33,7 @@ impl Storable for T { pub struct AggregateStore { kv: KeyValueStore, cache: RwLock>>, - history_cache: Mutex>>>, + history_cache: Option>>>, pre_save_listeners: Vec>>, post_save_listeners: Vec>>, locks: HandleLocks, @@ -52,9 +51,9 @@ impl AggregateStore { let kv = KeyValueStore::create(storage_uri, name_space)?; let cache = RwLock::new(HashMap::new()); let history_cache = if disable_history_cache { - Mutex::new(None) + None } else { - Mutex::new(Some(HashMap::new())) + Some(Mutex::new(HashMap::new())) }; let pre_save_listeners = vec![]; let post_save_listeners = vec![]; @@ -396,44 +395,48 @@ where ) -> Result { // If we have history cache, then first update it, and use that. // Otherwise parse *all* commands in history. - let mut cache_lock = self.history_cache.lock().unwrap(); - let records = match cache_lock.as_mut() { - Some(map) => { - let records = map.entry(id.clone()).or_default(); + // Little local helper so we can use borrowed records without keeping + // the lock longer than it wants to live. + fn command_history_for_records(crit: CommandHistoryCriteria, records: &[CommandHistoryRecord]) -> CommandHistory { + let offset = crit.offset(); + + let rows = match crit.rows_limit() { + Some(limit) => limit, + None => records.len(), + }; + + let mut matching = Vec::with_capacity(rows); + let mut skipped = 0; + let mut total = 0; + + for record in records.iter() { + if record.matches(&crit) { + total += 1; + if skipped < offset { + skipped += 1; + } else if total - skipped <= rows { + matching.push(record.clone()); + } + } + } + + CommandHistory::new(offset, total, matching) + } + + match &self.history_cache { + Some(mutex) => { + let mut cache_lock = mutex.lock().unwrap(); + let records = cache_lock.entry(id.clone()).or_default(); self.update_history_records(records, id)?; - Cow::Borrowed(records) + Ok(command_history_for_records(crit, records)) } None => { let mut records = vec![]; self.update_history_records(&mut records, id)?; - Cow::Owned(records) - } - }; - - let offset = crit.offset(); - - let rows = match crit.rows_limit() { - Some(limit) => limit, - None => records.len(), - }; - - let mut matching = Vec::with_capacity(rows); - let mut skipped = 0; - let mut total = 0; - - for record in records.iter() { - if record.matches(&crit) { - total += 1; - if skipped < offset { - skipped += 1; - } else if total - skipped <= rows { - matching.push(record.clone()); - } + Ok(command_history_for_records(crit, &records)) } } - - Ok(CommandHistory::new(offset, total, matching)) } /// Updates history records for a given aggregate