Skip to content

Commit

Permalink
more work: change to Cursor, start tokenizer
Browse files Browse the repository at this point in the history
  • Loading branch information
friendlymatthew committed Jan 10, 2024
1 parent 1f2824a commit 7b402ab
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 48 deletions.
1 change: 0 additions & 1 deletion appendable-rs/appendable/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ edition = "2021"

[dependencies]
protocol = { path="../protocol" }
serde_json = "1.0.111"
xxhash-rust = { version = "0.8.8", features = ["xxh3"] }

[dev-dependencies]
Expand Down
44 changes: 22 additions & 22 deletions appendable-rs/appendable/src/handler/jsonl_handler.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
use crate::index_file::Index;
use crate::index_file::IndexFile;
use crate::io::DataHandler;
use serde_json::{Deserializer, Map, Value};
use std::fs::File;
use std::io::{BufRead, BufReader, Seek, SeekFrom};
use std::io::{BufRead, BufReader, Cursor, Seek, SeekFrom};
use xxhash_rust::xxh3::Xxh3;

pub struct JSONLHandler {
reader: BufReader<File>,
// todo! change to borrowed type like &[u8] -- spent too long battling lifetimes
reader: BufReader<Cursor<Vec<u8>>>,
xxh3: Xxh3,
}
impl JSONLHandler {
pub fn new(file: File) -> Self {
pub fn new(data: Vec<u8>) -> Self {
JSONLHandler {
reader: BufReader::new(file),
reader: BufReader::new(Cursor::new(data)),
xxh3: Xxh3::new(),
}
}
Expand All @@ -23,12 +22,7 @@ impl Seek for JSONLHandler {
}
}
impl DataHandler for JSONLHandler {
fn synchronize(
&mut self,
indexes: &mut Vec<Index>,
end_byte_offsets: &mut Vec<u64>,
checksums: &mut Vec<u64>,
) -> Result<(), String> {
fn synchronize(&mut self, index_file: &mut IndexFile) -> Result<(), String> {
let mut line = String::new();
let mut start_offset: u64 = 0;

Expand All @@ -38,19 +32,25 @@ impl DataHandler for JSONLHandler {
.map_err(|e| e.to_string())?
> 0
{
let existing_count = end_byte_offsets.len();
let existing_count = index_file.end_byte_offsets.len();
// compute byte_offset for current line
let line_length = line.as_bytes().len() as u64;
let current_offset = start_offset + line_length + 1;
end_byte_offsets.push(current_offset);
index_file.end_byte_offsets.push(current_offset);

// compute checksum
self.xxh3.update(line.as_bytes());
let checksum = self.xxh3.digest(); // produce the final hash value
checksums.push(checksum);
index_file.checksums.push(checksum);

// Process the JSON line and update indexes
handle_json_object(&line, indexes, vec![], existing_count as u64, start_offset)?;
handle_json_object(
line.into_bytes(),
index_file,
&mut vec![],
existing_count as u64,
start_offset,
)?;

start_offset = current_offset;
line.clear();
Expand All @@ -61,11 +61,11 @@ impl DataHandler for JSONLHandler {
}

fn handle_json_object(
json_line: &str,
indexes: &mut Vec<Index>,
path: Vec<String>,
json_line: Vec<u8>,
index_file: &mut IndexFile,
path: &mut Vec<String>,
data_index: u64,
data_offset: u64,
) -> Result<(), String> {
Ok(())
) -> Result<usize, String> {
Ok(1)
}
12 changes: 3 additions & 9 deletions appendable-rs/appendable/src/index_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use std::fmt::Formatter;
const CURRENT_VERSION: Version = 1;

pub(crate) struct Index {
field_name: String,
field_type: FieldFlags,
pub(crate) field_name: String,
pub(crate) field_type: FieldFlags,
pub(crate) index_records: HashMap<IndexKey, Vec<IndexRecord>>,
}

Expand All @@ -19,7 +19,6 @@ pub struct IndexFile {
pub(crate) indexes: Vec<Index>,
pub(crate) end_byte_offsets: Vec<u64>,
pub(crate) checksums: Vec<u64>,
data: Box<dyn DataHandler>,
tail: u32,
}

Expand All @@ -28,17 +27,12 @@ impl IndexFile {
let mut file = IndexFile {
version: CURRENT_VERSION,
indexes: Vec::new(),
data: data_handler,
end_byte_offsets: Vec::new(),
checksums: Vec::new(),
tail: 0,
};

file.data.synchronize(
&mut file.indexes,
&mut file.end_byte_offsets,
&mut file.checksums,
)?;
data_handler.synchronize(&mut file)?;

Ok(file)
}
Expand Down
9 changes: 2 additions & 7 deletions appendable-rs/appendable/src/io.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
use crate::index_file::Index;
use crate::index_file::{Index, IndexFile};
use std::io::Seek;

pub trait DataHandler: Seek {
fn synchronize(
&mut self,
indexes: &mut Vec<Index>,
end_byte_offsets: &mut Vec<u64>,
checksums: &mut Vec<u64>,
) -> Result<(), String>;
fn synchronize(&mut self, index_file: &mut IndexFile) -> Result<(), String>;
}
87 changes: 87 additions & 0 deletions appendable-rs/appendable/src/json_tokenizer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
pub enum Token {
OpenBracket,
CloseBracket,
Colon,
Comma,
String(String),
Number(String),
Boolean(bool),
OpenArray,
CloseArray,
Null,
}

pub struct JSONTokenizer {
input: Vec<u8>,
position: usize,
}

impl JSONTokenizer {
pub(crate) fn new(input: Vec<u8>) -> Self {
Self { input, position: 0 }
}

pub(crate) fn next(&mut self) -> Result<Option<(Token, usize)>, String> {
// edge case: check if we've reached the end of line
if self.position >= self.input.len() {
Ok(None)
} else {
let current_byte = self.input[self.position];

return match current_byte {
b'{' => {
self.position += 1;
Ok(Some((Token::OpenBracket, self.position - 1)))
}
b'}' => {
self.position += 1;
Ok(Some((Token::CloseBracket, self.position - 1)))
}
b'[' => {
self.position += 1;
Ok(Some((Token::OpenArray, self.position - 1)))
}
b']' => {
self.position += 1;
Ok(Some((Token::CloseArray, self.position - 1)))
}
b'\"' => {
self.position += 1;
self.tokenize_string()
}
b':' => {
self.position += 1;
Ok(Some((Token::Colon, self.position - 1)))
}
_ => Err(format!(
"Unexpected character at position {}",
self.position - 1
)),
};
}
}

fn tokenize_string(&mut self) -> Result<Option<(Token, usize)>, String> {
let start_position = self.position;

while self.position < self.input.len() {
let current_byte = self.input[start_position];

match current_byte {
b'\"' => {
self.position += 1;
Ok(Some((Token::String, start_position)))
}
b'\\' => {
self.position += 2; // skip \n
continue;
}
_ => {
self.position += 1;
}
}
}

Err("Unterminated string".to_string())
}
}
1 change: 1 addition & 0 deletions appendable-rs/appendable/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod index_file;
pub mod io;
mod json_tokenizer;

pub mod tests {
pub mod jsonl_index_file;
Expand Down
18 changes: 9 additions & 9 deletions appendable-rs/appendable/src/tests/jsonl_index_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@
mod tests {
use crate::handler::jsonl_handler::JSONLHandler;
use crate::index_file::IndexFile;
use std::fs::File;
use std::fs;
use std::io::Write;
use std::path::Path;
use std::path::PathBuf;
use tempfile::NamedTempFile;

fn mock_jsonl_file() -> std::io::Result<File> {
// Create a temporary file
fn mock_jsonl_file_to_disk() -> std::io::Result<PathBuf> {
let mut temp_file = NamedTempFile::new()?;

writeln!(
Expand All @@ -20,16 +19,17 @@ mod tests {
r#"{{"name": "kevin", "id": 1, "alpha": ["x", "y", "z"]}}"#
)?;

// Persist the file and return the File handle
let file = temp_file.persist(Path::new("mock_data.jsonl"))?;
Ok(file)
let file_path = temp_file.into_temp_path();
let persisted_file = file_path.keep()?;
Ok(persisted_file)
}

#[test]
fn create_index_file() {
let file = mock_jsonl_file().expect("Failed to create mock file");
let jsonl_handler = JSONLHandler::new(file);
let file_path = mock_jsonl_file_to_disk().expect("Failed to create mock file");
let data = fs::read(&file_path).expect("Unable to read mock file");

let jsonl_handler = JSONLHandler::new(data);
let index_file = IndexFile::new(Box::new(jsonl_handler));

assert!(index_file.is_ok());
Expand Down

0 comments on commit 7b402ab

Please sign in to comment.