Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protocol and Index_File #31

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions appendable-rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions appendable-rs/appendable/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
protocol = { path="../protocol" }
serde_json = "1.0.111"
friendlymatthew marked this conversation as resolved.
Show resolved Hide resolved
twox-hash = "1.6.3"
127 changes: 127 additions & 0 deletions appendable-rs/appendable/src/handler/jsonl_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use crate::index_file::{Index, IndexFile, IndexKey};
use crate::io::DataHandler;
use serde_json::{Deserializer, Map, Value};
use std::fs::File;
use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
use twox_hash::xxh3::hash64;

pub struct JSONLHandler {
pub file: File,
pub buffer: Option<Vec<u8>>,
}
impl JSONLHandler {
pub fn new(file: File) -> Self {
friendlymatthew marked this conversation as resolved.
Show resolved Hide resolved
JSONLHandler { file, buffer: None }
}

/// we need to read the entire file to buffer to compute byte slices for checksums
pub fn read_file_to_buffer(&mut self) -> Result<(), String> {
let mut buffer = Vec::new();
self.file
.read_to_end(&mut buffer)
.map_err(|e| e.to_string())?;

self.buffer = Some(buffer);

Ok(())
}
friendlymatthew marked this conversation as resolved.
Show resolved Hide resolved
}
impl Seek for JSONLHandler {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
self.file.seek(pos)
}
}
impl DataHandler for JSONLHandler {
fn synchronize(
&mut self,
indexes: &mut Vec<Index>,
end_byte_offsets: &mut Vec<u64>,
checksums: &mut Vec<u64>,
) -> Result<(), String> {
if self.buffer.is_none() {
self.read_file_to_buffer()?;
}

let buffer = self.buffer.as_ref().unwrap();

let mut deserializer = Deserializer::from_slice(buffer).into_iter::<Value>();

let mut start_offset = 0;

while let Some(result) = deserializer.next() {
let value = result.map_err(|e| e.to_string())?;
let end_offset = deserializer.byte_offset();

let existing_count = end_byte_offsets.len();

// since the `StreamDeserializer` parses JSON values rather than lines, we don't have direct access to the original line strings
// this is a workaround where we find the byte slice from the start and end byte offset
let slice = &buffer[start_offset..end_offset];

let checksum = hash64(slice);
checksums.push(checksum);

end_byte_offsets.push(end_offset as u64);


if let Value::Object(obj) = value {
handle_json_object(
&obj,
indexes,
vec![],
existing_count as u64,
start_offset as u64,
)?;
} else {
return Err("expected a JSON object".to_string());
}

start_offset = end_offset;
};

Ok(())
}
}

fn handle_json_object(
obj: &Map<String, Value>,
indexes: &mut Vec<Index>,
path: Vec<String>,
data_index: u64,
data_offset: u64,
) -> Result<(), String> {
for (key, value) in obj {
let field_offset = data_offset; // todo ask kevin about best way of incrementing this
let name = path
.iter()
.chain(std::iter::once(key))
.cloned()
.collect::<Vec<_>>()
.join(".");

match value {
Value::String(s) => {
// Handle string value
}
Value::Number(n) => {
// Handle number value
}
Value::Bool(b) => {
// Handle boolean value
}
Value::Array(arr) => {
// Handle array - might involve recursion
}
Value::Object(nested_obj) => {
// Recursively handle nested object
handle_json_object(nested_obj, indexes, vec![name], data_index, field_offset)?;
}
Value::Null => {
// Handle null value
}
_ => return Err(format!("Unexpected type: {}", value)),
}
}

Ok(())
}
126 changes: 126 additions & 0 deletions appendable-rs/appendable/src/index_file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use crate::io::DataHandler;
use protocol::field_type::FieldFlags;
use protocol::{FieldType, IndexRecord, Version};
use std::collections::HashMap;
use std::fmt;
use std::fmt::Formatter;

const CURRENT_VERSION: Version = 1;

pub(crate) struct Index {
field_name: String,
field_type: FieldFlags,
pub(crate) index_records: HashMap<IndexKey, Vec<IndexRecord>>,
}

/// `IndexFile` is a representation of the entire index file.
pub struct IndexFile {
version: Version,
pub(crate) indexes: Vec<Index>,
pub(crate) end_byte_offsets: Vec<u64>,
pub(crate) checksums: Vec<u64>,
data: Box<dyn DataHandler>,
tail: u32,
}

impl IndexFile {
pub fn new(mut data_handler: Box<dyn DataHandler>) -> Result<Self, String> {
let mut file = IndexFile {
version: CURRENT_VERSION,
indexes: Vec::new(),
data: data_handler,
end_byte_offsets: Vec::new(),
checksums: Vec::new(),
tail: 0,
};

file.data.synchronize(
&mut file.indexes,
&mut file.end_byte_offsets,
&mut file.checksums,
)?;

Ok(file)
}

pub(crate) fn find_index(&mut self, name: &str, value: &IndexKey) -> usize {
if let Some((position, _)) = self
.indexes
.iter()
.enumerate()
.find(|(_, index)| index.field_name == name)
{
if !self.indexes[position]
.field_type
.contains(value.field_type())
{
self.indexes[position].field_type.set(value.field_type());
}

position
} else {
let mut new_index = Index {
field_name: name.to_string(),
field_type: FieldFlags::new(),
index_records: HashMap::new(),
};

new_index.field_type.set(value.field_type());
self.indexes.push(new_index);
self.indexes.len() - 1
}
}
}

/// `IndexKey` addresses the dynamic typing of keys in `IndexRecord` by stating all possible variants
#[derive(Eq, PartialEq, Debug, Clone)]
pub enum IndexKey {
String(String),
Number(String),
Boolean(bool),
Array(Vec<IndexKey>),
Object(HashMap<String, IndexKey>),
}

impl IndexKey {
fn field_type(&self) -> FieldType {
match self {
IndexKey::String(_) => FieldType::String,
IndexKey::Number(_) => FieldType::Number,
IndexKey::Boolean(_) => FieldType::Boolean,
IndexKey::Array(_) => FieldType::Array,
IndexKey::Object(_) => FieldType::Object,
}
}
}

impl fmt::Display for IndexKey {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
IndexKey::String(s) => write!(f, "{}", s),
IndexKey::Number(n) => write!(f, "{}", n),
IndexKey::Boolean(b) => write!(f, "{}", b),
IndexKey::Array(v) => {
let elements = v
.iter()
.map(|element| format!("{}", element))
.collect::<Vec<String>>()
.join(", ");

write!(f, "[{}]", elements)
}
IndexKey::Object(o) => {
let entries = o
.iter()
.map(|(key, value)| format!("{}: {}", key, value))
.collect::<Vec<String>>()
.join(", ");

write!(f, "{{{}}}", entries)
}
}
}
}

// todo handleJSONLObject()
// linking: https://github.com/kevmo314/appendable/blob/main/pkg/appendable/index_file.go#L77
11 changes: 11 additions & 0 deletions appendable-rs/appendable/src/io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use crate::index_file::Index;
use std::io::{Read, Seek};

pub trait DataHandler: Seek {
fn synchronize(
&mut self,
indexes: &mut Vec<Index>,
end_byte_offsets: &mut Vec<u64>,
checksums: &mut Vec<u64>,
) -> Result<(), String>;
}
16 changes: 4 additions & 12 deletions appendable-rs/appendable/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,6 @@
pub fn add(left: usize, right: usize) -> usize {
left + right
}

#[cfg(test)]
mod tests {
use super::*;
pub mod index_file;
pub mod io;

#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
}
pub mod handler {
mod jsonl_handler;
}
77 changes: 77 additions & 0 deletions appendable-rs/protocol/src/field_type.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@

/// `FieldType` represents the type of data stored in the field, which follows JSON types excluding Object and null. Object is broken down into subfields and null is not stored.
pub enum FieldType {
String,
Number,
Object,
Array,
Boolean,
Null,
}


/// `FieldFlags` is left as u64 to avoid shooting ourselves in the foot if we want to support more types in the future via other file formats
pub struct FieldFlags {
flags: u64,
}

impl FieldFlags {
pub fn new() -> Self {
FieldFlags { flags: 0 }
}

pub fn set(&mut self, field: FieldType) {
match field {
FieldType::String => self.flags |= 1 << 0,
FieldType::Number => self.flags |= 1 << 1,
FieldType::Object => self.flags |= 1 << 2,
FieldType::Array => self.flags |= 1 << 3,
FieldType::Boolean => self.flags |= 1 << 4,
FieldType::Null => self.flags |= 1 << 5,
}
}

pub fn contains(&self, field: FieldType) -> bool {
match field {
FieldType::String => (self.flags & (1 << 0)) != 0,
FieldType::Number => (self.flags & (1 << 1)) != 0,
FieldType::Object => (self.flags & (1 << 2)) != 0,
FieldType::Array => (self.flags & (1 << 3)) != 0,
FieldType::Boolean => (self.flags & (1 << 4)) != 0,
FieldType::Null => (self.flags & (1 << 5)) != 0,
}
}

pub fn typescript_type(&self) -> String {
let mut components = Vec::new();

if self.contains(FieldType::String) {
components.push("string");
}

if self.contains(FieldType::Number) {
components.push("number");
}

if self.contains(FieldType::Object) {
components.push("Record");
}

if self.contains(FieldType::Array) {
components.push("any[]");
}

if self.contains(FieldType::Boolean) {
components.push("boolean");
}

if self.contains(FieldType::Null) {
components.push("null");
}

match components.is_empty() {
true => "unknown".to_string(),
false => components.join(" | ")
}
}
}
Loading