Skip to content

Commit

Permalink
Cleanup code
Browse files Browse the repository at this point in the history
  • Loading branch information
jopemachine committed Jan 19, 2024
1 parent d8f06e4 commit 7886871
Showing 1 changed file with 28 additions and 17 deletions.
45 changes: 28 additions & 17 deletions raftify/src/storage/heed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,8 @@ impl HeedStorage {
impl LogStore for HeedStorage {
fn compact(&mut self, index: u64) -> Result<()> {
let store = self.wl();
let reader = store.env.read_txn()?;
let mut writer = store.env.write_txn()?;
store.compact(&reader, &mut writer, index)?;
store.compact(&mut writer, index)?;
writer.commit()?;
Ok(())
}
Expand Down Expand Up @@ -363,12 +362,7 @@ impl HeedStorageCore {
Ok(storage)
}

pub fn compact(
&self,
reader: &heed::RoTxn,
writer: &mut heed::RwTxn,
index: u64,
) -> Result<()> {
pub fn compact(&self, writer: &mut heed::RwTxn, index: u64) -> Result<()> {
// TODO, check that compaction is legal
//let last_index = self.last_index(&writer)?;
// there should always be at least one entry in the log
Expand All @@ -377,7 +371,7 @@ impl HeedStorageCore {
let index = format_entry_key_string(index.to_string().as_str());

if self.config.save_compacted_logs {
let iter = self.entries_db.range(reader, &(..index.clone()))?;
let iter = self.entries_db.range(writer, &(..index.clone()))?;

let entries = iter
.filter_map(|e| match e {
Expand Down Expand Up @@ -475,6 +469,16 @@ impl HeedStorageCore {
}

fn first_index(&self, reader: &heed::RoTxn) -> Result<u64> {
// first_entry = self
// .entries_db
// .first(reader);
// match self.entries_db.first(reader)? {
// Some(first_entry) => {
// Ok(first_entry.0.parse::<u64>().unwrap() + 1)
// }
// None => Ok(self.snapshot(reader, 0, 0)?.get_metadata().get_index()),
// }

let first_entry = self
.entries_db
.first(reader)?
Expand All @@ -484,8 +488,7 @@ impl HeedStorageCore {
}

fn entry(&self, reader: &heed::RoTxn, index: u64) -> Result<Option<Entry>> {
let entry = self.entries_db.get(reader, &index.to_string())?;
Ok(entry)
Ok(self.entries_db.get(reader, &index.to_string())?)
}

fn entries(
Expand Down Expand Up @@ -546,10 +549,20 @@ impl HeedStorageCore {
}

fn append(&self, writer: &mut heed::RwTxn, entries: &[Entry]) -> Result<()> {
if entries.is_empty() {
return Ok(());
}

let mut last_index = self.last_index(writer)?;
// TODO: ensure entry arrive in the right order

if last_index + 1 < entries[0].index {
self.logger.fatal(&format!(
"raft logs should be continuous, last index: {}, new appended: {}",
last_index, entries[0].index,
));
}

for entry in entries {
//assert_eq!(entry.get_index(), last_index + 1);
let index = entry.index;
last_index = std::cmp::max(index, last_index);
self.entries_db.put(writer, &index.to_string(), entry)?;
Expand All @@ -573,10 +586,8 @@ impl HeedStorageCore {
);
fs::remove_file(&dest_path)?;
}
Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => {
return Err(e.into());
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => return Err(e.into()),
_ => {}
}

Expand Down

0 comments on commit 7886871

Please sign in to comment.