Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: switch to b+ tree #75

Merged
merged 16 commits into from
Feb 5, 2024
Merged
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ jobs:
node-version: '18'
- run: npm ci
- run: npm run build
- run: npm test
- run: npm test
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
dist
node_modules
node_modules

examples/workspace/data.index
examples/workspace/green_tripdata_2023-01.jsonl
46 changes: 18 additions & 28 deletions cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
package main

import (
"bufio"
"flag"
"fmt"
"log/slog"
"os"
"time"

"github.com/kevmo314/appendable/pkg/appendable"
"github.com/kevmo314/appendable/pkg/handlers"
)

func main() {
var debugFlag, jsonlFlag, csvFlag, showTimings bool
var indexFilename string

flag.BoolVar(&debugFlag, "debug", false, "Use logger that prints at the debug-level")
flag.BoolVar(&jsonlFlag, "jsonl", false, "Use JSONL handler")
flag.BoolVar(&csvFlag, "csv", false, "Use CSV handler")
flag.BoolVar(&showTimings, "t", false, "Show time-related metrics")
flag.StringVar(&indexFilename, "i", "", "Specify the existing index of the file to be opened, writing to stdout")

flag.Parse()

logLevel := &slog.LevelVar{}

if debugFlag {
Expand All @@ -35,7 +36,6 @@ func main() {
totalStart = time.Now()
}

// index := flag.String("i", "", "Specify the existing index of the file to be opened, writing to stdout")
flag.Usage = func() {
fmt.Printf("Usage: %s [-t] [-i index] [-I index] filename\n", os.Args[0])
flag.PrintDefaults()
Expand All @@ -48,7 +48,13 @@ func main() {
flag.Usage()
}

// Open the file
// open the index file
indexFile, err := os.OpenFile(indexFilename, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
panic(err)
}

// Open the data file
file, err := os.Open(args[0])
if err != nil {
panic(err)
Expand All @@ -58,13 +64,9 @@ func main() {

switch {
case jsonlFlag:
dataHandler = appendable.JSONLHandler{
ReadSeeker: file,
}
case csvFlag:
dataHandler = appendable.CSVHandler{
ReadSeeker: file,
}
dataHandler = handlers.JSONLHandler{}
// case csvFlag:
// dataHandler = handlers.CSVHandler{}
default:
logger.Error("Please specify the file type with -jsonl or -csv.")
os.Exit(1)
Expand All @@ -73,12 +75,15 @@ func main() {
readStart = time.Now()
}
// Open the index file
indexFile, err := appendable.NewIndexFile(dataHandler)

i, err := appendable.NewIndexFile(indexFile, dataHandler)
if err != nil {
panic(err)
}

if err := i.Synchronize(file); err != nil {
panic(err)
}

if showTimings {
readDuration := time.Since(readStart)
logger.Info("Opening + synchronizing index file took", slog.Duration("duration", readDuration))
Expand All @@ -88,21 +93,6 @@ func main() {
if showTimings {
writeStart = time.Now()
}
of, err := os.Create(args[0] + ".index")
if err != nil {
panic(err)
}
logger.Info("Writing index file to", slog.String("path", args[0]+".index"))
bufof := bufio.NewWriter(of)
if err := indexFile.Serialize(bufof); err != nil {
panic(err)
}
if err := bufof.Flush(); err != nil {
panic(err)
}
if err := of.Close(); err != nil {
panic(err)
}

if showTimings {
writeDuration := time.Since(writeStart)
Expand Down
135 changes: 135 additions & 0 deletions pkg/appendable/appendable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package appendable

import (
"encoding/binary"
"fmt"
"strings"
)

/**
* The structure of an index file is characterized by some pages that point
* to other pages. Each box below represents a (typically 4kB) page and
* the arrows indicate that there is a pointer to the next page.
*
* +-----------+-----------+ +-------------+ +-------------+ +-------------+
* | Page GC | File Meta | -> | Index Meta | -> | Index Meta | -> | Index Meta |
* +-----------+-----------+ +-------------+ +-------------+ +-------------+
* | | |
* v v v
* +~~~~~~~~~~~~~+ +~~~~~~~~~~~~~+ +~~~~~~~~~~~~~+
* | B+ Tree | | B+ Tree | | B+ Tree |
* +~~~~~~~~~~~~~+ +~~~~~~~~~~~~~+ +~~~~~~~~~~~~~+
*
* Note: By convention, the first FileMeta does not have a pointer to the
* B+ tree. Instead, the first FileMeta is used to store metadata about the
* file itself and only contains a next pointer.
*
* Additionally, the Page GC page is used by the page file to store free page
* indexes for garbage collection.
*
* Consequentially, the index file cannot be smaller than two pages (typically 8kB).
*/

type Version byte

type Format byte

const (
FormatJSONL Format = iota
FormatCSV
)

// FieldType represents the type of data stored in the field, which follows
// JSON types excluding Object and null. Object is broken down into subfields
// and null is not stored.
type FieldType byte

const (
FieldTypeString FieldType = iota
FieldTypeInt64
FieldTypeUint64
FieldTypeFloat64
FieldTypeObject
FieldTypeArray
FieldTypeBoolean
FieldTypeNull
)

func (t FieldType) TypescriptType() string {
components := []string{}
if t&FieldTypeString != 0 {
components = append(components, "string")
}
if t&FieldTypeInt64 != 0 || t&FieldTypeFloat64 != 0 {
components = append(components, "number")
}
if t&FieldTypeObject != 0 {
components = append(components, "Record")
}
if t&FieldTypeArray != 0 {
components = append(components, "any[]")
}
if t&FieldTypeBoolean != 0 {
components = append(components, "boolean")
}
if t&FieldTypeNull != 0 {
components = append(components, "null")
}
if len(components) == 0 {
return "unknown"
}
return strings.Join(components, " | ")
}

type FileMeta struct {
Version
Format
// An offset to indicate how much data is contained within
// this index. Note that this is implementation-dependent,
// so it is not guaranteed to have any uniform interpretation.
// For example, in JSONL, this is the number of bytes read
// and indexed so far.
ReadOffset uint64
}

func (m *FileMeta) MarshalBinary() ([]byte, error) {
buf := make([]byte, 9)
buf[0] = byte(m.Version)
binary.BigEndian.PutUint64(buf[1:], m.ReadOffset)
return buf, nil
}

func (m *FileMeta) UnmarshalBinary(buf []byte) error {
if len(buf) < 9 {
return fmt.Errorf("invalid metadata size: %d", len(buf))
}
m.Version = Version(buf[0])
m.ReadOffset = binary.BigEndian.Uint64(buf[1:])
return nil
}

type IndexMeta struct {
FieldName string
FieldType FieldType
}

func (m *IndexMeta) MarshalBinary() ([]byte, error) {
buf := make([]byte, 8+len(m.FieldName)+2)
binary.BigEndian.PutUint64(buf[0:], uint64(m.FieldType))
binary.BigEndian.PutUint16(buf[8:], uint16(len(m.FieldName)))
copy(buf[10:], m.FieldName)
return buf, nil
}

func (m *IndexMeta) UnmarshalBinary(buf []byte) error {
if len(buf) < 10 {
return fmt.Errorf("invalid metadata size: %d", len(buf))
}
m.FieldType = FieldType(binary.BigEndian.Uint64(buf[0:]))
nameLength := binary.BigEndian.Uint16(buf[8:])
if len(buf) < 10+int(nameLength) {
return fmt.Errorf("invalid metadata size: %d", len(buf))
}
m.FieldName = string(buf[10 : 10+nameLength])
return nil
}
Loading
Loading