Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: memory map data file #90

Merged
merged 4 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log/slog"
"os"
"runtime/pprof"
"time"

"github.com/kevmo314/appendable/pkg/appendable"
Expand All @@ -14,13 +15,14 @@ import (

func main() {
var debugFlag, jsonlFlag, csvFlag, showTimings bool
var indexFilename string
var indexFilename, pprofFilename string

flag.BoolVar(&debugFlag, "debug", false, "Use logger that prints at the debug-level")
flag.BoolVar(&jsonlFlag, "jsonl", false, "Use JSONL handler")
flag.BoolVar(&csvFlag, "csv", false, "Use CSV handler")
flag.BoolVar(&showTimings, "t", false, "Show time-related metrics")
flag.StringVar(&indexFilename, "i", "", "Specify the existing index of the file to be opened, writing to stdout")
flag.StringVar(&pprofFilename, "pprof", "", "Specify the file to write the pprof data to")

flag.Parse()
logLevel := &slog.LevelVar{}
Expand All @@ -29,6 +31,18 @@ func main() {
logLevel.Set(slog.LevelDebug)
}

if pprofFilename != "" {
f, err := os.Create(pprofFilename)
if err != nil {
panic(err)
}
defer f.Close() // error handling omitted for example
if err := pprof.StartCPUProfile(f); err != nil {
panic(err)
}
defer pprof.StopCPUProfile()
}

logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: logLevel}))
slog.SetDefault(logger)

Expand All @@ -49,17 +63,12 @@ func main() {
flag.Usage()
}

// open the index file
indexFile, err := os.OpenFile(indexFilename, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
panic(err)
}

// Open the data file
file, err := os.Open(args[0])
// Open the data df
df, err := mmap.OpenFile(args[0], os.O_RDONLY, 0)
if err != nil {
panic(err)
}
defer df.Close()

var dataHandler appendable.DataHandler

Expand All @@ -75,17 +84,19 @@ func main() {
if showTimings {
readStart = time.Now()
}
mmpif, err := mmap.NewMemoryMappedFile(indexFile)
mmpif, err := mmap.OpenFile(indexFilename, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
panic(err)
}
defer mmpif.Close()

// Open the index file
i, err := appendable.NewIndexFile(mmpif, dataHandler)
if err != nil {
panic(err)
}

if err := i.Synchronize(file); err != nil {
if err := i.Synchronize(df.Bytes()); err != nil {
panic(err)
}

Expand Down
9 changes: 2 additions & 7 deletions pkg/appendable/index_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,8 @@ import (

const CurrentVersion = 1

type DataFile interface {
io.ReadSeeker
io.ReaderAt
}

type DataHandler interface {
Synchronize(f *IndexFile, df DataFile) error
Synchronize(f *IndexFile, df []byte) error
Format() Format
}

Expand Down Expand Up @@ -181,6 +176,6 @@ func (i *IndexFile) FindOrCreateIndex(name string, fieldType FieldType) (*btree.
// Synchronize will synchronize the index file with the data file.
// This is a convenience method and is equivalent to calling
// Synchronize() on the data handler itself.
func (i *IndexFile) Synchronize(df DataFile) error {
func (i *IndexFile) Synchronize(df []byte) error {
return i.dataHandler.Synchronize(i, df)
}
2 changes: 1 addition & 1 deletion pkg/btree/bptree.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type BPTree struct {

maxPageSize int

Data io.ReaderAt
Data []byte
}

func NewBPTree(tree ReadWriteSeekPager, meta MetaPage) *BPTree {
Expand Down
2 changes: 1 addition & 1 deletion pkg/btree/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (m *LinkedMetaPage) SetRoot(mp MemoryPointer) error {
//
// Generally, passing data is required, however if the tree
// consists of only inlined values, it is not necessary.
func (m *LinkedMetaPage) BPTree(data io.ReaderAt) *BPTree {
func (m *LinkedMetaPage) BPTree(data []byte) *BPTree {
t := NewBPTree(m.rws, m)
if data != nil {
t.Data = data
Expand Down
13 changes: 4 additions & 9 deletions pkg/btree/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type ReferencedValue struct {
}

type BPTreeNode struct {
Data io.ReaderAt
Data []byte
// contains the offset of the child node or the offset of the record for leaf
// if the node is a leaf, the last pointer is the offset of the next leaf
Pointers []MemoryPointer
Expand Down Expand Up @@ -119,16 +119,11 @@ func (n *BPTreeNode) UnmarshalBinary(buf []byte) error {
// read the key out of the memory pointer stored at this position
n.Keys[i].DataPointer.Offset = binary.BigEndian.Uint64(buf[m+4 : m+12])
n.Keys[i].DataPointer.Length = binary.BigEndian.Uint32(buf[m+12 : m+16])
n.Keys[i].Value = make([]byte, n.Keys[i].DataPointer.Length)
if _, err := n.Data.ReadAt(n.Keys[i].Value, int64(n.Keys[i].DataPointer.Offset)); err != nil {
return fmt.Errorf("failed to read key: %w", err)
}
dp := n.Keys[i].DataPointer
n.Keys[i].Value = n.Data[dp.Offset : dp.Offset+uint64(dp.Length)]
m += 4 + 12
} else {
n.Keys[i].Value = make([]byte, l)
if o := copy(n.Keys[i].Value, buf[m+4:m+4+int(l)]); o != int(l) {
return fmt.Errorf("failed to copy key: %w", io.ErrShortWrite)
}
n.Keys[i].Value = buf[m+4 : m+4+int(l)]
m += 4 + int(l)
}
}
Expand Down
42 changes: 17 additions & 25 deletions pkg/handlers/csv.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package handlers

import (
"bufio"
"bytes"
"encoding/binary"
"encoding/csv"
Expand All @@ -26,7 +25,7 @@ func (c CSVHandler) Format() appendable.Format {
return appendable.FormatCSV
}

func (c CSVHandler) Synchronize(f *appendable.IndexFile, df appendable.DataFile) error {
func (c CSVHandler) Synchronize(f *appendable.IndexFile, df []byte) error {
slog.Debug("Starting CSV synchronization")

var headers []string
Expand All @@ -36,10 +35,6 @@ func (c CSVHandler) Synchronize(f *appendable.IndexFile, df appendable.DataFile)
if err != nil {
return fmt.Errorf("failed to read metadata: %w", err)
}
if _, err := df.Seek(int64(metadata.ReadOffset), io.SeekStart); err != nil {
return fmt.Errorf("failed to seek: %w", err)
}

isHeader := false

isEmpty, err := f.IsEmpty()
Expand All @@ -57,38 +52,35 @@ func (c CSVHandler) Synchronize(f *appendable.IndexFile, df appendable.DataFile)
headers = fieldNames
}

scanner := bufio.NewScanner(df)

for scanner.Scan() {
line := scanner.Bytes()
friendlymatthew marked this conversation as resolved.
Show resolved Hide resolved
for {
i := bytes.IndexByte(df[metadata.ReadOffset:], '\n')
if i == -1 {
break
}

if isHeader {
slog.Info("Parsing CSV headers")
dec := csv.NewReader(bytes.NewReader(line))
dec := csv.NewReader(bytes.NewReader(df[metadata.ReadOffset : metadata.ReadOffset+uint64(i)]))
headers, err = dec.Read()
if err != nil {
slog.Error("failed to parse CSV header", "error", err)
return fmt.Errorf("failed to parse CSV header: %w", err)
}
metadata.ReadOffset += uint64(len(line)) + 1
metadata.ReadOffset += uint64(i) + 1
isHeader = false
continue
}

dec := csv.NewReader(bytes.NewReader(line))
dec := csv.NewReader(bytes.NewReader(df[metadata.ReadOffset : metadata.ReadOffset+uint64(i)]))

if err := handleCSVLine(f, df, dec, headers, []string{}, btree.MemoryPointer{
Offset: metadata.ReadOffset,
Length: uint32(len(line)),
Length: uint32(i),
}); err != nil {
return fmt.Errorf("failed to handle object: %w", err)
}

metadata.ReadOffset += uint64(len(line)) + 1 // include the newline
}

if err := scanner.Err(); err != nil {
return fmt.Errorf("failed to scan: %w", err)
metadata.ReadOffset += uint64(i) + 1 // include the newline
}

// update the metadata
Expand Down Expand Up @@ -144,7 +136,7 @@ func InferCSVField(fieldValue string) (interface{}, appendable.FieldType) {
return fieldValue, appendable.FieldTypeString
}

func handleCSVLine(f *appendable.IndexFile, r io.ReaderAt, dec *csv.Reader, headers []string, path []string, data btree.MemoryPointer) error {
func handleCSVLine(f *appendable.IndexFile, df []byte, dec *csv.Reader, headers []string, path []string, data btree.MemoryPointer) error {
record, err := dec.Read()
if err != nil {
slog.Error("Failed to read CSV record at index", "error", err)
Expand Down Expand Up @@ -177,21 +169,21 @@ func handleCSVLine(f *appendable.IndexFile, r io.ReaderAt, dec *csv.Reader, head
case appendable.FieldTypeFloat64:
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, math.Float64bits(value.(float64)))
if err := page.BPTree(r).Insert(btree.ReferencedValue{Value: buf}, data); err != nil {
if err := page.BPTree(df).Insert(btree.ReferencedValue{Value: buf}, data); err != nil {
return fmt.Errorf("failed to insert into b+tree: %w", err)
}
case appendable.FieldTypeBoolean:
if value.(bool) {
if err := page.BPTree(r).Insert(btree.ReferencedValue{Value: []byte{1}}, data); err != nil {
if err := page.BPTree(df).Insert(btree.ReferencedValue{Value: []byte{1}}, data); err != nil {
return fmt.Errorf("failed to insert into b+tree: %w", err)
}
} else {
if err := page.BPTree(r).Insert(btree.ReferencedValue{Value: []byte{0}}, data); err != nil {
if err := page.BPTree(df).Insert(btree.ReferencedValue{Value: []byte{0}}, data); err != nil {
return fmt.Errorf("failed to insert into b+tree: %w", err)
}
}
case appendable.FieldTypeString:
if err := page.BPTree(r).Insert(btree.ReferencedValue{
if err := page.BPTree(df).Insert(btree.ReferencedValue{
DataPointer: btree.MemoryPointer{
Offset: fieldOffset,
Length: fieldLength,
Expand All @@ -210,7 +202,7 @@ func handleCSVLine(f *appendable.IndexFile, r io.ReaderAt, dec *csv.Reader, head
case appendable.FieldTypeNull:
// nil values are a bit of a degenerate case, we are essentially using the btree
// as a set. we store the value as an empty byte slice.
if err := page.BPTree(r).Insert(btree.ReferencedValue{Value: []byte{}}, data); err != nil {
if err := page.BPTree(df).Insert(btree.ReferencedValue{Value: []byte{}}, data); err != nil {
return fmt.Errorf("failed to insert into b+tree: %w", err)
}
slog.Debug("Marked field", "name", name)
Expand Down
15 changes: 7 additions & 8 deletions pkg/handlers/csv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"log/slog"
"math"
"os"
"strings"
"testing"

"github.com/kevmo314/appendable/pkg/appendable"
Expand All @@ -27,7 +26,7 @@ func TestCSV(t *testing.T) {

t.Run("no schema changes", func(t *testing.T) {
f := buftest.NewSeekableBuffer()
g := strings.NewReader("test\ntest1\n")
g := []byte("test\ntest1\n")

i, err := appendable.NewIndexFile(f, CSVHandler{})
if err != nil {
Expand Down Expand Up @@ -67,8 +66,8 @@ func TestCSV(t *testing.T) {
}
})
t.Run("correctly sets field offset", func(t *testing.T) {
r1 := strings.NewReader("test\ntest1\n")
r2 := strings.NewReader("test\ntest1\ntest2\n")
r1 := []byte("test\ntest1\n")
r2 := []byte("test\ntest1\ntest2\n")

f := buftest.NewSeekableBuffer()

Expand Down Expand Up @@ -134,11 +133,11 @@ func TestCSV(t *testing.T) {
t.Fatal(err)
}

if err := i.Synchronize(strings.NewReader(s1)); err != nil {
if err := i.Synchronize([]byte(s1)); err != nil {
t.Fatal(err)
}

r2 := strings.NewReader(s2)
r2 := []byte(s2)
if err := i.Synchronize(r2); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -204,8 +203,8 @@ func TestCSV(t *testing.T) {
})

t.Run("recognize null fields", func(t *testing.T) {
r1 := strings.NewReader("nullheader,header1\n,wef\n")
r2 := strings.NewReader("nullheader,header1\n,wef\n,howdy\n")
r1 := []byte("nullheader,header1\n,wef\n")
r2 := []byte("nullheader,header1\n,wef\n,howdy\n")

f := buftest.NewSeekableBuffer()

Expand Down
17 changes: 8 additions & 9 deletions pkg/handlers/equality_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"log/slog"
"math"
"os"
"strings"
"testing"

"github.com/kevmo314/appendable/pkg/appendable"
Expand Down Expand Up @@ -37,8 +36,8 @@ func TestEquality(t *testing.T) {
mockCsv2 := "h1,h2\ntest1,37.3\ntest3,4\n"

t.Run("test index files after Synchronize", func(t *testing.T) {
jr1 := strings.NewReader(mockJsonl2)
cr1 := strings.NewReader(mockCsv2)
jr1 := []byte(mockJsonl2)
cr1 := []byte(mockCsv2)
f := buftest.NewSeekableBuffer()

jsonlI, err := appendable.NewIndexFile(f, JSONLHandler{})
Expand Down Expand Up @@ -68,8 +67,8 @@ func TestEquality(t *testing.T) {
})

t.Run("test index files with appending", func(t *testing.T) {
jr := strings.NewReader(mockJsonl)
cr := strings.NewReader(mockCsv)
jr := []byte(mockJsonl)
cr := []byte(mockCsv)
f := buftest.NewSeekableBuffer()

jsonlI, err := appendable.NewIndexFile(f, JSONLHandler{})
Expand All @@ -81,7 +80,7 @@ func TestEquality(t *testing.T) {
t.Fatal(err)
}

jr = strings.NewReader(mockJsonl2)
jr = []byte(mockJsonl2)
if err := jsonlI.Synchronize(jr); err != nil {
t.Fatal(err)
}
Expand All @@ -97,7 +96,7 @@ func TestEquality(t *testing.T) {
t.Fatal(err)
}

cr = strings.NewReader(mockCsv2)
cr = []byte(mockCsv2)
if err := csvI.Synchronize(cr); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -177,7 +176,7 @@ func compareIndexMeta(i1, i2 []*btree.LinkedMetaPage) (bool, string) {
return true, ""
}

func compareMetaPages(i1, i2 []*btree.LinkedMetaPage, jr, cr *strings.Reader) (bool, string) {
func compareMetaPages(i1, i2 []*btree.LinkedMetaPage, jr, cr []byte) (bool, string) {
h1 := [2]string{"test1", "test3"}
h2 := [2]float64{37.3, 4}

Expand Down Expand Up @@ -236,7 +235,7 @@ func compareMetaPages(i1, i2 []*btree.LinkedMetaPage, jr, cr *strings.Reader) (b
return true, ""
}

func compare(i1, i2 *appendable.IndexFile, jReader, cReader *strings.Reader) (bool, string) {
func compare(i1, i2 *appendable.IndexFile, jReader, cReader []byte) (bool, string) {
// compare field names

i1fn, err := i1.IndexFieldNames()
Expand Down
Loading
Loading