From 788687159713fdc6378e335ee4b040b89c95f9fb Mon Sep 17 00:00:00 2001 From: Gyubong Date: Fri, 19 Jan 2024 10:57:12 +0900 Subject: [PATCH] Cleanup code --- raftify/src/storage/heed.rs | 45 +++++++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/raftify/src/storage/heed.rs b/raftify/src/storage/heed.rs index 77c4c655..5251b596 100644 --- a/raftify/src/storage/heed.rs +++ b/raftify/src/storage/heed.rs @@ -114,9 +114,8 @@ impl HeedStorage { impl LogStore for HeedStorage { fn compact(&mut self, index: u64) -> Result<()> { let store = self.wl(); - let reader = store.env.read_txn()?; let mut writer = store.env.write_txn()?; - store.compact(&reader, &mut writer, index)?; + store.compact(&mut writer, index)?; writer.commit()?; Ok(()) } @@ -363,12 +362,7 @@ impl HeedStorageCore { Ok(storage) } - pub fn compact( - &self, - reader: &heed::RoTxn, - writer: &mut heed::RwTxn, - index: u64, - ) -> Result<()> { + pub fn compact(&self, writer: &mut heed::RwTxn, index: u64) -> Result<()> { // TODO, check that compaction is legal //let last_index = self.last_index(&writer)?; // there should always be at least one entry in the log @@ -377,7 +371,7 @@ impl HeedStorageCore { let index = format_entry_key_string(index.to_string().as_str()); if self.config.save_compacted_logs { - let iter = self.entries_db.range(reader, &(..index.clone()))?; + let iter = self.entries_db.range(writer, &(..index.clone()))?; let entries = iter .filter_map(|e| match e { @@ -475,6 +469,16 @@ impl HeedStorageCore { } fn first_index(&self, reader: &heed::RoTxn) -> Result { + // first_entry = self + // .entries_db + // .first(reader); + // match self.entries_db.first(reader)? { + // Some(first_entry) => { + // Ok(first_entry.0.parse::().unwrap() + 1) + // } + // None => Ok(self.snapshot(reader, 0, 0)?.get_metadata().get_index()), + // } + let first_entry = self .entries_db .first(reader)? @@ -484,8 +488,7 @@ impl HeedStorageCore { } fn entry(&self, reader: &heed::RoTxn, index: u64) -> Result> { - let entry = self.entries_db.get(reader, &index.to_string())?; - Ok(entry) + Ok(self.entries_db.get(reader, &index.to_string())?) } fn entries( @@ -546,10 +549,20 @@ impl HeedStorageCore { } fn append(&self, writer: &mut heed::RwTxn, entries: &[Entry]) -> Result<()> { + if entries.is_empty() { + return Ok(()); + } + let mut last_index = self.last_index(writer)?; - // TODO: ensure entry arrive in the right order + + if last_index + 1 < entries[0].index { + self.logger.fatal(&format!( + "raft logs should be continuous, last index: {}, new appended: {}", + last_index, entries[0].index, + )); + } + for entry in entries { - //assert_eq!(entry.get_index(), last_index + 1); let index = entry.index; last_index = std::cmp::max(index, last_index); self.entries_db.put(writer, &index.to_string(), entry)?; @@ -573,10 +586,8 @@ impl HeedStorageCore { ); fs::remove_file(&dest_path)?; } - Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => {} - Err(e) => { - return Err(e.into()); - } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} + Err(e) => return Err(e.into()), _ => {} }