Skip to content

Commit

Permalink
Use Option<Mutex<..>> instead of Mutex<Option<..>>
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim Bruijnzeels committed Jul 6, 2023
1 parent 8bb96b0 commit 2de2bd7
Showing 1 changed file with 37 additions and 34 deletions.
71 changes: 37 additions & 34 deletions src/commons/eventsourcing/store.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::{
borrow::Cow,
collections::HashMap,
fmt,
str::FromStr,
Expand Down Expand Up @@ -34,7 +33,7 @@ impl<T: Clone + Serialize + DeserializeOwned + Sized + 'static> Storable for T {
pub struct AggregateStore<A: Aggregate> {
kv: KeyValueStore,
cache: RwLock<HashMap<MyHandle, Arc<A>>>,
history_cache: Mutex<Option<HashMap<MyHandle, Vec<CommandHistoryRecord>>>>,
history_cache: Option<Mutex<HashMap<MyHandle, Vec<CommandHistoryRecord>>>>,
pre_save_listeners: Vec<Arc<dyn PreSaveEventListener<A>>>,
post_save_listeners: Vec<Arc<dyn PostSaveEventListener<A>>>,
locks: HandleLocks,
Expand All @@ -52,9 +51,9 @@ impl<A: Aggregate> AggregateStore<A> {
let kv = KeyValueStore::create(storage_uri, name_space)?;
let cache = RwLock::new(HashMap::new());
let history_cache = if disable_history_cache {
Mutex::new(None)
None
} else {
Mutex::new(Some(HashMap::new()))
Some(Mutex::new(HashMap::new()))
};
let pre_save_listeners = vec![];
let post_save_listeners = vec![];
Expand Down Expand Up @@ -396,44 +395,48 @@ where
) -> Result<CommandHistory, AggregateStoreError> {
// If we have history cache, then first update it, and use that.
// Otherwise parse *all* commands in history.
let mut cache_lock = self.history_cache.lock().unwrap();

let records = match cache_lock.as_mut() {
Some(map) => {
let records = map.entry(id.clone()).or_default();
// Little local helper so we can use borrowed records without keeping
// the lock longer than it wants to live.
fn command_history_for_records(crit: CommandHistoryCriteria, records: &[CommandHistoryRecord]) -> CommandHistory {
let offset = crit.offset();

let rows = match crit.rows_limit() {
Some(limit) => limit,
None => records.len(),
};

let mut matching = Vec::with_capacity(rows);
let mut skipped = 0;
let mut total = 0;

for record in records.iter() {
if record.matches(&crit) {
total += 1;
if skipped < offset {
skipped += 1;
} else if total - skipped <= rows {
matching.push(record.clone());
}
}
}

CommandHistory::new(offset, total, matching)
}

match &self.history_cache {
Some(mutex) => {
let mut cache_lock = mutex.lock().unwrap();
let records = cache_lock.entry(id.clone()).or_default();
self.update_history_records(records, id)?;
Cow::Borrowed(records)
Ok(command_history_for_records(crit, records))
}
None => {
let mut records = vec![];
self.update_history_records(&mut records, id)?;
Cow::Owned(records)
}
};

let offset = crit.offset();

let rows = match crit.rows_limit() {
Some(limit) => limit,
None => records.len(),
};

let mut matching = Vec::with_capacity(rows);
let mut skipped = 0;
let mut total = 0;

for record in records.iter() {
if record.matches(&crit) {
total += 1;
if skipped < offset {
skipped += 1;
} else if total - skipped <= rows {
matching.push(record.clone());
}
Ok(command_history_for_records(crit, &records))
}
}

Ok(CommandHistory::new(offset, total, matching))
}

/// Updates history records for a given aggregate
Expand Down

0 comments on commit 2de2bd7

Please sign in to comment.