From 5c9d23287170df844378c592b837f1aae6a8fbc4 Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Tue, 6 Feb 2024 17:02:45 -0500 Subject: [PATCH 1/3] feat: memory map data file --- cmd/main.go | 19 ++++++++++++++++--- pkg/appendable/index_file.go | 9 ++------- pkg/btree/bptree.go | 2 +- pkg/btree/multi.go | 2 +- pkg/btree/node.go | 13 ++++--------- pkg/handlers/csv.go | 20 ++++++++------------ pkg/handlers/csv_test.go | 15 +++++++-------- pkg/handlers/equality_test.go | 17 ++++++++--------- pkg/handlers/jsonl.go | 13 ++++--------- pkg/handlers/jsonl_test.go | 29 ++++++++++++++--------------- pkg/mmap/mmap.go | 17 +++++++++++++++++ 11 files changed, 82 insertions(+), 74 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 1241997f..534bef11 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -3,8 +3,10 @@ package main import ( "flag" "fmt" + "log" "log/slog" "os" + "runtime/pprof" "time" "github.com/kevmo314/appendable/pkg/appendable" @@ -29,6 +31,16 @@ func main() { logLevel.Set(slog.LevelDebug) } + f, err := os.Create("pprof.out") + if err != nil { + log.Fatal("could not create CPU profile: ", err) + } + defer f.Close() // error handling omitted for example + if err := pprof.StartCPUProfile(f); err != nil { + log.Fatal("could not start CPU profile: ", err) + } + defer pprof.StopCPUProfile() + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: logLevel})) slog.SetDefault(logger) @@ -55,8 +67,8 @@ func main() { panic(err) } - // Open the data file - file, err := os.Open(args[0]) + // Open the data df + df, err := mmap.Open(args[0]) if err != nil { panic(err) } @@ -79,13 +91,14 @@ func main() { if err != nil { panic(err) } + // Open the index file i, err := appendable.NewIndexFile(mmpif, dataHandler) if err != nil { panic(err) } - if err := i.Synchronize(file); err != nil { + if err := i.Synchronize(df.Bytes()); err != nil { panic(err) } diff --git a/pkg/appendable/index_file.go b/pkg/appendable/index_file.go index ed865464..c6fa9a12 100644 --- a/pkg/appendable/index_file.go +++ b/pkg/appendable/index_file.go @@ -9,13 +9,8 @@ import ( const CurrentVersion = 1 -type DataFile interface { - io.ReadSeeker - io.ReaderAt -} - type DataHandler interface { - Synchronize(f *IndexFile, df DataFile) error + Synchronize(f *IndexFile, df []byte) error Format() Format } @@ -181,6 +176,6 @@ func (i *IndexFile) FindOrCreateIndex(name string, fieldType FieldType) (*btree. // Synchronize will synchronize the index file with the data file. // This is a convenience method and is equivalent to calling // Synchronize() on the data handler itself. -func (i *IndexFile) Synchronize(df DataFile) error { +func (i *IndexFile) Synchronize(df []byte) error { return i.dataHandler.Synchronize(i, df) } diff --git a/pkg/btree/bptree.go b/pkg/btree/bptree.go index 3241e618..0a302797 100644 --- a/pkg/btree/bptree.go +++ b/pkg/btree/bptree.go @@ -21,7 +21,7 @@ type BPTree struct { maxPageSize int - Data io.ReaderAt + Data []byte } func NewBPTree(tree ReadWriteSeekPager, meta MetaPage) *BPTree { diff --git a/pkg/btree/multi.go b/pkg/btree/multi.go index 13c1c952..e3c17d8e 100644 --- a/pkg/btree/multi.go +++ b/pkg/btree/multi.go @@ -44,7 +44,7 @@ func (m *LinkedMetaPage) SetRoot(mp MemoryPointer) error { // // Generally, passing data is required, however if the tree // consists of only inlined values, it is not necessary. -func (m *LinkedMetaPage) BPTree(data io.ReaderAt) *BPTree { +func (m *LinkedMetaPage) BPTree(data []byte) *BPTree { t := NewBPTree(m.rws, m) if data != nil { t.Data = data diff --git a/pkg/btree/node.go b/pkg/btree/node.go index 4ea66c33..e0bec2a6 100644 --- a/pkg/btree/node.go +++ b/pkg/btree/node.go @@ -23,7 +23,7 @@ type ReferencedValue struct { } type BPTreeNode struct { - Data io.ReaderAt + Data []byte // contains the offset of the child node or the offset of the record for leaf // if the node is a leaf, the last pointer is the offset of the next leaf Pointers []MemoryPointer @@ -119,16 +119,11 @@ func (n *BPTreeNode) UnmarshalBinary(buf []byte) error { // read the key out of the memory pointer stored at this position n.Keys[i].DataPointer.Offset = binary.BigEndian.Uint64(buf[m+4 : m+12]) n.Keys[i].DataPointer.Length = binary.BigEndian.Uint32(buf[m+12 : m+16]) - n.Keys[i].Value = make([]byte, n.Keys[i].DataPointer.Length) - if _, err := n.Data.ReadAt(n.Keys[i].Value, int64(n.Keys[i].DataPointer.Offset)); err != nil { - return fmt.Errorf("failed to read key: %w", err) - } + dp := n.Keys[i].DataPointer + n.Keys[i].Value = n.Data[dp.Offset : dp.Offset+uint64(dp.Length)] m += 4 + 12 } else { - n.Keys[i].Value = make([]byte, l) - if o := copy(n.Keys[i].Value, buf[m+4:m+4+int(l)]); o != int(l) { - return fmt.Errorf("failed to copy key: %w", io.ErrShortWrite) - } + n.Keys[i].Value = buf[m+4 : m+4+int(l)] m += 4 + int(l) } } diff --git a/pkg/handlers/csv.go b/pkg/handlers/csv.go index 04ffb1e0..c1c87696 100644 --- a/pkg/handlers/csv.go +++ b/pkg/handlers/csv.go @@ -26,7 +26,7 @@ func (c CSVHandler) Format() appendable.Format { return appendable.FormatCSV } -func (c CSVHandler) Synchronize(f *appendable.IndexFile, df appendable.DataFile) error { +func (c CSVHandler) Synchronize(f *appendable.IndexFile, df []byte) error { slog.Debug("Starting CSV synchronization") var headers []string @@ -36,10 +36,6 @@ func (c CSVHandler) Synchronize(f *appendable.IndexFile, df appendable.DataFile) if err != nil { return fmt.Errorf("failed to read metadata: %w", err) } - if _, err := df.Seek(int64(metadata.ReadOffset), io.SeekStart); err != nil { - return fmt.Errorf("failed to seek: %w", err) - } - isHeader := false isEmpty, err := f.IsEmpty() @@ -57,7 +53,7 @@ func (c CSVHandler) Synchronize(f *appendable.IndexFile, df appendable.DataFile) headers = fieldNames } - scanner := bufio.NewScanner(df) + scanner := bufio.NewScanner(bytes.NewBuffer(df[metadata.ReadOffset:])) for scanner.Scan() { line := scanner.Bytes() @@ -144,7 +140,7 @@ func InferCSVField(fieldValue string) (interface{}, appendable.FieldType) { return fieldValue, appendable.FieldTypeString } -func handleCSVLine(f *appendable.IndexFile, r io.ReaderAt, dec *csv.Reader, headers []string, path []string, data btree.MemoryPointer) error { +func handleCSVLine(f *appendable.IndexFile, df []byte, dec *csv.Reader, headers []string, path []string, data btree.MemoryPointer) error { record, err := dec.Read() if err != nil { slog.Error("Failed to read CSV record at index", "error", err) @@ -177,21 +173,21 @@ func handleCSVLine(f *appendable.IndexFile, r io.ReaderAt, dec *csv.Reader, head case appendable.FieldTypeFloat64: buf := make([]byte, 8) binary.BigEndian.PutUint64(buf, math.Float64bits(value.(float64))) - if err := page.BPTree(r).Insert(btree.ReferencedValue{Value: buf}, data); err != nil { + if err := page.BPTree(df).Insert(btree.ReferencedValue{Value: buf}, data); err != nil { return fmt.Errorf("failed to insert into b+tree: %w", err) } case appendable.FieldTypeBoolean: if value.(bool) { - if err := page.BPTree(r).Insert(btree.ReferencedValue{Value: []byte{1}}, data); err != nil { + if err := page.BPTree(df).Insert(btree.ReferencedValue{Value: []byte{1}}, data); err != nil { return fmt.Errorf("failed to insert into b+tree: %w", err) } } else { - if err := page.BPTree(r).Insert(btree.ReferencedValue{Value: []byte{0}}, data); err != nil { + if err := page.BPTree(df).Insert(btree.ReferencedValue{Value: []byte{0}}, data); err != nil { return fmt.Errorf("failed to insert into b+tree: %w", err) } } case appendable.FieldTypeString: - if err := page.BPTree(r).Insert(btree.ReferencedValue{ + if err := page.BPTree(df).Insert(btree.ReferencedValue{ DataPointer: btree.MemoryPointer{ Offset: fieldOffset, Length: fieldLength, @@ -210,7 +206,7 @@ func handleCSVLine(f *appendable.IndexFile, r io.ReaderAt, dec *csv.Reader, head case appendable.FieldTypeNull: // nil values are a bit of a degenerate case, we are essentially using the btree // as a set. we store the value as an empty byte slice. - if err := page.BPTree(r).Insert(btree.ReferencedValue{Value: []byte{}}, data); err != nil { + if err := page.BPTree(df).Insert(btree.ReferencedValue{Value: []byte{}}, data); err != nil { return fmt.Errorf("failed to insert into b+tree: %w", err) } slog.Debug("Marked field", "name", name) diff --git a/pkg/handlers/csv_test.go b/pkg/handlers/csv_test.go index 7c942961..e2daefc2 100644 --- a/pkg/handlers/csv_test.go +++ b/pkg/handlers/csv_test.go @@ -5,7 +5,6 @@ import ( "log/slog" "math" "os" - "strings" "testing" "github.com/kevmo314/appendable/pkg/appendable" @@ -27,7 +26,7 @@ func TestCSV(t *testing.T) { t.Run("no schema changes", func(t *testing.T) { f := buftest.NewSeekableBuffer() - g := strings.NewReader("test\ntest1\n") + g := []byte("test\ntest1\n") i, err := appendable.NewIndexFile(f, CSVHandler{}) if err != nil { @@ -67,8 +66,8 @@ func TestCSV(t *testing.T) { } }) t.Run("correctly sets field offset", func(t *testing.T) { - r1 := strings.NewReader("test\ntest1\n") - r2 := strings.NewReader("test\ntest1\ntest2\n") + r1 := []byte("test\ntest1\n") + r2 := []byte("test\ntest1\ntest2\n") f := buftest.NewSeekableBuffer() @@ -134,11 +133,11 @@ func TestCSV(t *testing.T) { t.Fatal(err) } - if err := i.Synchronize(strings.NewReader(s1)); err != nil { + if err := i.Synchronize([]byte(s1)); err != nil { t.Fatal(err) } - r2 := strings.NewReader(s2) + r2 := []byte(s2) if err := i.Synchronize(r2); err != nil { t.Fatal(err) } @@ -204,8 +203,8 @@ func TestCSV(t *testing.T) { }) t.Run("recognize null fields", func(t *testing.T) { - r1 := strings.NewReader("nullheader,header1\n,wef\n") - r2 := strings.NewReader("nullheader,header1\n,wef\n,howdy\n") + r1 := []byte("nullheader,header1\n,wef\n") + r2 := []byte("nullheader,header1\n,wef\n,howdy\n") f := buftest.NewSeekableBuffer() diff --git a/pkg/handlers/equality_test.go b/pkg/handlers/equality_test.go index f9a6f9e6..fed5c944 100644 --- a/pkg/handlers/equality_test.go +++ b/pkg/handlers/equality_test.go @@ -6,7 +6,6 @@ import ( "log/slog" "math" "os" - "strings" "testing" "github.com/kevmo314/appendable/pkg/appendable" @@ -37,8 +36,8 @@ func TestEquality(t *testing.T) { mockCsv2 := "h1,h2\ntest1,37.3\ntest3,4\n" t.Run("test index files after Synchronize", func(t *testing.T) { - jr1 := strings.NewReader(mockJsonl2) - cr1 := strings.NewReader(mockCsv2) + jr1 := []byte(mockJsonl2) + cr1 := []byte(mockCsv2) f := buftest.NewSeekableBuffer() jsonlI, err := appendable.NewIndexFile(f, JSONLHandler{}) @@ -68,8 +67,8 @@ func TestEquality(t *testing.T) { }) t.Run("test index files with appending", func(t *testing.T) { - jr := strings.NewReader(mockJsonl) - cr := strings.NewReader(mockCsv) + jr := []byte(mockJsonl) + cr := []byte(mockCsv) f := buftest.NewSeekableBuffer() jsonlI, err := appendable.NewIndexFile(f, JSONLHandler{}) @@ -81,7 +80,7 @@ func TestEquality(t *testing.T) { t.Fatal(err) } - jr = strings.NewReader(mockJsonl2) + jr = []byte(mockJsonl2) if err := jsonlI.Synchronize(jr); err != nil { t.Fatal(err) } @@ -97,7 +96,7 @@ func TestEquality(t *testing.T) { t.Fatal(err) } - cr = strings.NewReader(mockCsv2) + cr = []byte(mockCsv2) if err := csvI.Synchronize(cr); err != nil { t.Fatal(err) } @@ -177,7 +176,7 @@ func compareIndexMeta(i1, i2 []*btree.LinkedMetaPage) (bool, string) { return true, "" } -func compareMetaPages(i1, i2 []*btree.LinkedMetaPage, jr, cr *strings.Reader) (bool, string) { +func compareMetaPages(i1, i2 []*btree.LinkedMetaPage, jr, cr []byte) (bool, string) { h1 := [2]string{"test1", "test3"} h2 := [2]float64{37.3, 4} @@ -236,7 +235,7 @@ func compareMetaPages(i1, i2 []*btree.LinkedMetaPage, jr, cr *strings.Reader) (b return true, "" } -func compare(i1, i2 *appendable.IndexFile, jReader, cReader *strings.Reader) (bool, string) { +func compare(i1, i2 *appendable.IndexFile, jReader, cReader []byte) (bool, string) { // compare field names i1fn, err := i1.IndexFieldNames() diff --git a/pkg/handlers/jsonl.go b/pkg/handlers/jsonl.go index 03880d7f..5d52569a 100644 --- a/pkg/handlers/jsonl.go +++ b/pkg/handlers/jsonl.go @@ -6,8 +6,6 @@ import ( "encoding/binary" "encoding/json" "fmt" - "io" - "log/slog" "math" "strings" @@ -24,16 +22,13 @@ func (j JSONLHandler) Format() appendable.Format { return appendable.FormatJSONL } -func (j JSONLHandler) Synchronize(f *appendable.IndexFile, df appendable.DataFile) error { +func (j JSONLHandler) Synchronize(f *appendable.IndexFile, df []byte) error { // read until the next newline metadata, err := f.Metadata() if err != nil { return fmt.Errorf("failed to read metadata: %w", err) } - if _, err := df.Seek(int64(metadata.ReadOffset), io.SeekStart); err != nil { - return fmt.Errorf("failed to seek: %w", err) - } - scanner := bufio.NewScanner(df) + scanner := bufio.NewScanner(bytes.NewBuffer(df[metadata.ReadOffset:])) for i := 0; scanner.Scan(); i++ { line := scanner.Bytes() @@ -59,7 +54,7 @@ func (j JSONLHandler) Synchronize(f *appendable.IndexFile, df appendable.DataFil metadata.ReadOffset += uint64(len(line)) + 1 // include the newline - slog.Info("read line", "i", i, "offset", metadata.ReadOffset) + // slog.Info("read line", "i", i, "offset", metadata.ReadOffset) } if err := scanner.Err(); err != nil { @@ -95,7 +90,7 @@ func jsonTypeToFieldType(t json.Token) appendable.FieldType { panic(fmt.Sprintf("unexpected token '%v'", t)) } -func handleJSONLObject(f *appendable.IndexFile, r io.ReaderAt, dec *json.Decoder, path []string, data btree.MemoryPointer) error { +func handleJSONLObject(f *appendable.IndexFile, r []byte, dec *json.Decoder, path []string, data btree.MemoryPointer) error { // while the next token is not }, read the key for dec.More() { key, err := dec.Token() diff --git a/pkg/handlers/jsonl_test.go b/pkg/handlers/jsonl_test.go index 362cf1c9..dd2bf3ca 100644 --- a/pkg/handlers/jsonl_test.go +++ b/pkg/handlers/jsonl_test.go @@ -3,7 +3,6 @@ package handlers import ( "encoding/binary" "math" - "strings" "testing" "github.com/kevmo314/appendable/pkg/appendable" @@ -13,7 +12,7 @@ import ( func TestJSONL(t *testing.T) { t.Run("no schema changes", func(t *testing.T) { f := buftest.NewSeekableBuffer() - g := strings.NewReader("{\"test\":\"test1\"}\n") + g := []byte("{\"test\":\"test1\"}\n") i, err := appendable.NewIndexFile(f, JSONLHandler{}) if err != nil { @@ -62,11 +61,11 @@ func TestJSONL(t *testing.T) { t.Fatal(err) } - if err := i.Synchronize(strings.NewReader("{\"test\":\"test1\"}\n")); err != nil { + if err := i.Synchronize([]byte("{\"test\":\"test1\"}\n")); err != nil { t.Fatal(err) } - r2 := strings.NewReader("{\"test\":\"test1\"}\n{\"test\":\"test3\"}\n") + r2 := []byte("{\"test\":\"test1\"}\n{\"test\":\"test3\"}\n") if err := i.Synchronize(r2); err != nil { t.Fatal(err) } @@ -117,11 +116,11 @@ func TestJSONL(t *testing.T) { t.Fatal(err) } - if err := i.Synchronize(strings.NewReader("{\"test\":\"test1\"}\n")); err != nil { + if err := i.Synchronize([]byte("{\"test\":\"test1\"}\n")); err != nil { t.Fatal(err) } - r2 := strings.NewReader("{\"test\":\"test1\"}\n{\"test2\":\"test3\"}\n") + r2 := []byte("{\"test\":\"test1\"}\n{\"test2\":\"test3\"}\n") if err := i.Synchronize(r2); err != nil { t.Fatal(err) } @@ -192,11 +191,11 @@ func TestJSONL(t *testing.T) { t.Fatal(err) } - if err := i.Synchronize(strings.NewReader("{\"test\":\"test1\"}\n")); err != nil { + if err := i.Synchronize([]byte("{\"test\":\"test1\"}\n")); err != nil { t.Fatal(err) } - r2 := strings.NewReader("{\"test\":\"test1\"}\n{\"test\":123}\n") + r2 := []byte("{\"test\":\"test1\"}\n{\"test\":123}\n") if err := i.Synchronize(r2); err != nil { t.Fatal(err) } @@ -269,7 +268,7 @@ func TestJSONL(t *testing.T) { t.Fatal(err) } - if err := i.Synchronize(strings.NewReader("{\"test\":\"test1\"}\n{\"test2\":{\"a\":1,\"b\":\"2\"}}\n")); err != nil { + if err := i.Synchronize([]byte("{\"test\":\"test1\"}\n{\"test2\":{\"a\":1,\"b\":\"2\"}}\n")); err != nil { t.Fatal(err) } @@ -349,7 +348,7 @@ func TestJSONL(t *testing.T) { t.Fatal(err) } - if err := i.Synchronize(strings.NewReader("{\"test\":\"test1\"}\n{\"test\":{\"a\":1,\"b\":\"2\"}}\n")); err != nil { + if err := i.Synchronize([]byte("{\"test\":\"test1\"}\n{\"test\":{\"a\":1,\"b\":\"2\"}}\n")); err != nil { t.Fatal(err) } @@ -429,7 +428,7 @@ func TestJSONL(t *testing.T) { t.Fatal(err) } - if err := i.Synchronize(strings.NewReader("{\"test\":\"test1\"}\n{\"test2\":[[1,2,3],4]}\n")); err != nil { + if err := i.Synchronize([]byte("{\"test\":\"test1\"}\n{\"test2\":[[1,2,3],4]}\n")); err != nil { t.Fatal(err) } @@ -483,11 +482,11 @@ func TestJSONL(t *testing.T) { t.Fatal(err) } - if err := i.Synchronize(strings.NewReader("{\"test\":\"test1\"}\n")); err != nil { + if err := i.Synchronize([]byte("{\"test\":\"test1\"}\n")); err != nil { t.Fatal(err) } - r2 := strings.NewReader("{\"test\":\"test1\"}\n{\"test\":null}\n") + r2 := []byte("{\"test\":\"test1\"}\n{\"test\":null}\n") if err := i.Synchronize(r2); err != nil { t.Fatal(err) } @@ -555,8 +554,8 @@ func TestJSONL(t *testing.T) { }) t.Run("recognize null fields", func(t *testing.T) { - r1 := strings.NewReader("{\"nullheader\":null}\n") - r2 := strings.NewReader("{\"nullheader\":null}\n{\"nullheader\":null}\n") + r1 := []byte("{\"nullheader\":null}\n") + r2 := []byte("{\"nullheader\":null}\n{\"nullheader\":null}\n") f := buftest.NewSeekableBuffer() diff --git a/pkg/mmap/mmap.go b/pkg/mmap/mmap.go index 3bb948a9..ef8be657 100644 --- a/pkg/mmap/mmap.go +++ b/pkg/mmap/mmap.go @@ -36,6 +36,23 @@ func NewMemoryMappedFile(f *os.File) (*MemoryMappedFile, error) { return &MemoryMappedFile{file: f, bytes: b, seek: 0}, nil } +// Open is a convenience function to open a file and memory map it. +func Open(path string) (*MemoryMappedFile, error) { + f, err := os.Open(path) + if err != nil { + return nil, fmt.Errorf("open: %v", err) + } + return NewMemoryMappedFile(f) +} + +func (m *MemoryMappedFile) File() *os.File { + return m.file +} + +func (m *MemoryMappedFile) Bytes() []byte { + return m.bytes +} + // Close closes the file and unmaps the memory. func (m *MemoryMappedFile) Close() error { if m.bytes == nil { From bd234931c07a617329ba9e7160d00cc2767dd3d8 Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Tue, 6 Feb 2024 18:35:04 -0500 Subject: [PATCH 2/3] update csv/json handlers --- cmd/main.go | 12 ++++-------- pkg/handlers/csv.go | 24 ++++++++++-------------- pkg/handlers/jsonl.go | 20 ++++++++------------ pkg/mmap/mmap.go | 28 ++++++++++++++++++++++------ pkg/mmap/mmap_test.go | 28 +++++++++++++++------------- 5 files changed, 59 insertions(+), 53 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 534bef11..479660a6 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -61,17 +61,12 @@ func main() { flag.Usage() } - // open the index file - indexFile, err := os.OpenFile(indexFilename, os.O_RDWR|os.O_CREATE, 0666) - if err != nil { - panic(err) - } - // Open the data df - df, err := mmap.Open(args[0]) + df, err := mmap.OpenFile(args[0], os.O_RDONLY, 0) if err != nil { panic(err) } + defer df.Close() var dataHandler appendable.DataHandler @@ -87,10 +82,11 @@ func main() { if showTimings { readStart = time.Now() } - mmpif, err := mmap.NewMemoryMappedFile(indexFile) + mmpif, err := mmap.OpenFile(indexFilename, os.O_RDWR|os.O_CREATE, 0666) if err != nil { panic(err) } + defer mmpif.Close() // Open the index file i, err := appendable.NewIndexFile(mmpif, dataHandler) diff --git a/pkg/handlers/csv.go b/pkg/handlers/csv.go index c1c87696..bc09b6e2 100644 --- a/pkg/handlers/csv.go +++ b/pkg/handlers/csv.go @@ -1,7 +1,6 @@ package handlers import ( - "bufio" "bytes" "encoding/binary" "encoding/csv" @@ -53,38 +52,35 @@ func (c CSVHandler) Synchronize(f *appendable.IndexFile, df []byte) error { headers = fieldNames } - scanner := bufio.NewScanner(bytes.NewBuffer(df[metadata.ReadOffset:])) - - for scanner.Scan() { - line := scanner.Bytes() + for { + i := bytes.IndexByte(df[metadata.ReadOffset:], '\n') + if i == -1 { + break + } if isHeader { slog.Info("Parsing CSV headers") - dec := csv.NewReader(bytes.NewReader(line)) + dec := csv.NewReader(bytes.NewReader(df[metadata.ReadOffset : metadata.ReadOffset+uint64(i)])) headers, err = dec.Read() if err != nil { slog.Error("failed to parse CSV header", "error", err) return fmt.Errorf("failed to parse CSV header: %w", err) } - metadata.ReadOffset += uint64(len(line)) + 1 + metadata.ReadOffset += uint64(i) + 1 isHeader = false continue } - dec := csv.NewReader(bytes.NewReader(line)) + dec := csv.NewReader(bytes.NewReader(df[metadata.ReadOffset : metadata.ReadOffset+uint64(i)])) if err := handleCSVLine(f, df, dec, headers, []string{}, btree.MemoryPointer{ Offset: metadata.ReadOffset, - Length: uint32(len(line)), + Length: uint32(i), }); err != nil { return fmt.Errorf("failed to handle object: %w", err) } - metadata.ReadOffset += uint64(len(line)) + 1 // include the newline - } - - if err := scanner.Err(); err != nil { - return fmt.Errorf("failed to scan: %w", err) + metadata.ReadOffset += uint64(i) + 1 // include the newline } // update the metadata diff --git a/pkg/handlers/jsonl.go b/pkg/handlers/jsonl.go index 5d52569a..ee95beef 100644 --- a/pkg/handlers/jsonl.go +++ b/pkg/handlers/jsonl.go @@ -1,7 +1,6 @@ package handlers import ( - "bufio" "bytes" "encoding/binary" "encoding/json" @@ -28,12 +27,13 @@ func (j JSONLHandler) Synchronize(f *appendable.IndexFile, df []byte) error { if err != nil { return fmt.Errorf("failed to read metadata: %w", err) } - scanner := bufio.NewScanner(bytes.NewBuffer(df[metadata.ReadOffset:])) - for i := 0; scanner.Scan(); i++ { - line := scanner.Bytes() - + for { + i := bytes.IndexByte(df[metadata.ReadOffset:], '\n') + if i == -1 { + break + } // create a new json decoder - dec := json.NewDecoder(bytes.NewReader(line)) + dec := json.NewDecoder(bytes.NewReader(df[metadata.ReadOffset:(metadata.ReadOffset + uint64(i))])) // if the first token is not {, then return an error if t, err := dec.Token(); err != nil || t != json.Delim('{') { @@ -42,7 +42,7 @@ func (j JSONLHandler) Synchronize(f *appendable.IndexFile, df []byte) error { if err := handleJSONLObject(f, df, dec, []string{}, btree.MemoryPointer{ Offset: metadata.ReadOffset, - Length: uint32(len(line)), + Length: uint32(i), }); err != nil { return fmt.Errorf("failed to handle object: %w", err) } @@ -52,15 +52,11 @@ func (j JSONLHandler) Synchronize(f *appendable.IndexFile, df []byte) error { return fmt.Errorf("expected '}', got '%v'", t) } - metadata.ReadOffset += uint64(len(line)) + 1 // include the newline + metadata.ReadOffset += uint64(i) + 1 // include the newline // slog.Info("read line", "i", i, "offset", metadata.ReadOffset) } - if err := scanner.Err(); err != nil { - return fmt.Errorf("failed to scan: %w", err) - } - // update the metadata if err := f.SetMetadata(metadata); err != nil { return fmt.Errorf("failed to set metadata: %w", err) diff --git a/pkg/mmap/mmap.go b/pkg/mmap/mmap.go index ef8be657..8a32f617 100644 --- a/pkg/mmap/mmap.go +++ b/pkg/mmap/mmap.go @@ -20,7 +20,15 @@ var _ io.Closer = &MemoryMappedFile{} var _ io.ReaderAt = &MemoryMappedFile{} var _ io.WriterAt = &MemoryMappedFile{} -func NewMemoryMappedFile(f *os.File) (*MemoryMappedFile, error) { +func toProt(flag int) int { + prot := unix.PROT_READ + if flag&os.O_RDWR != 0 { + prot |= unix.PROT_WRITE + } + return prot +} + +func NewMemoryMappedFile(f *os.File, prot int) (*MemoryMappedFile, error) { fd := uintptr(f.Fd()) fi, err := f.Stat() if err != nil { @@ -29,7 +37,7 @@ func NewMemoryMappedFile(f *os.File) (*MemoryMappedFile, error) { if fi.Size() == 0 { return &MemoryMappedFile{file: f, bytes: nil, seek: 0}, nil } - b, err := unix.Mmap(int(fd), 0, int(fi.Size()), unix.PROT_READ|unix.PROT_WRITE, unix.MAP_SHARED) + b, err := unix.Mmap(int(fd), 0, int(fi.Size()), prot, unix.MAP_SHARED) if err != nil { return nil, fmt.Errorf("mmap: %v", err) } @@ -38,11 +46,16 @@ func NewMemoryMappedFile(f *os.File) (*MemoryMappedFile, error) { // Open is a convenience function to open a file and memory map it. func Open(path string) (*MemoryMappedFile, error) { - f, err := os.Open(path) + return OpenFile(path, os.O_RDWR, 0) +} + +// OpenFile is a convenience function to open a file with the given flags and memory map it. +func OpenFile(path string, flag int, perm os.FileMode) (*MemoryMappedFile, error) { + f, err := os.OpenFile(path, flag, perm) if err != nil { return nil, fmt.Errorf("open: %v", err) } - return NewMemoryMappedFile(f) + return NewMemoryMappedFile(f, toProt(flag)) } func (m *MemoryMappedFile) File() *os.File { @@ -56,9 +69,12 @@ func (m *MemoryMappedFile) Bytes() []byte { // Close closes the file and unmaps the memory. func (m *MemoryMappedFile) Close() error { if m.bytes == nil { - return nil + return m.file.Close() + } + if err := unix.Munmap(m.bytes); err != nil { + return fmt.Errorf("munmap: %v", err) } - return unix.Munmap(m.bytes) + return m.file.Close() } // Seek sets the offset for the next Read or Write on file to offset. diff --git a/pkg/mmap/mmap_test.go b/pkg/mmap/mmap_test.go index 8d03ecbd..1688f182 100644 --- a/pkg/mmap/mmap_test.go +++ b/pkg/mmap/mmap_test.go @@ -6,6 +6,8 @@ import ( "log" "os" "testing" + + "golang.org/x/sys/unix" ) func ExampleMemoryMappedFile() { @@ -17,7 +19,7 @@ func ExampleMemoryMappedFile() { defer f.Close() // Create a memory-mapped file. - m, err := NewMemoryMappedFile(f) + m, err := NewMemoryMappedFile(f, unix.PROT_READ|unix.PROT_WRITE) if err != nil { log.Fatal(err) } @@ -50,7 +52,7 @@ func TestMemoryMappedFile_Read(t *testing.T) { log.Fatal(err) } - m, err := NewMemoryMappedFile(f) + m, err := NewMemoryMappedFile(f, unix.PROT_READ|unix.PROT_WRITE) if err != nil { log.Fatal(err) } @@ -78,7 +80,7 @@ func TestMemoryMappedFile_Read(t *testing.T) { log.Fatal(err) } - m, err := NewMemoryMappedFile(f) + m, err := NewMemoryMappedFile(f, unix.PROT_READ|unix.PROT_WRITE) if err != nil { log.Fatal(err) } @@ -106,7 +108,7 @@ func TestMemoryMappedFile_Read(t *testing.T) { log.Fatal(err) } - m, err := NewMemoryMappedFile(f) + m, err := NewMemoryMappedFile(f, unix.PROT_READ|unix.PROT_WRITE) if err != nil { log.Fatal(err) } @@ -131,7 +133,7 @@ func TestMemoryMappedFile_Read(t *testing.T) { log.Fatal(err) } - m, err := NewMemoryMappedFile(f) + m, err := NewMemoryMappedFile(f, unix.PROT_READ|unix.PROT_WRITE) if err != nil { log.Fatal(err) } @@ -161,7 +163,7 @@ func TestMemoryMappedFile_Write(t *testing.T) { defer f.Close() defer os.Remove(f.Name()) - m, err := NewMemoryMappedFile(f) + m, err := NewMemoryMappedFile(f, unix.PROT_READ|unix.PROT_WRITE) if err != nil { log.Fatal(err) } @@ -188,7 +190,7 @@ func TestMemoryMappedFile_Write(t *testing.T) { defer f.Close() defer os.Remove(f.Name()) - m, err := NewMemoryMappedFile(f) + m, err := NewMemoryMappedFile(f, unix.PROT_READ|unix.PROT_WRITE) if err != nil { log.Fatal(err) } @@ -228,7 +230,7 @@ func TestMemoryMappedFile_Write(t *testing.T) { defer f.Close() defer os.Remove(f.Name()) - m, err := NewMemoryMappedFile(f) + m, err := NewMemoryMappedFile(f, unix.PROT_READ|unix.PROT_WRITE) if err != nil { log.Fatal(err) } @@ -261,7 +263,7 @@ func TestMemoryMappedFile_Seek(t *testing.T) { defer f.Close() defer os.Remove(f.Name()) - m, err := NewMemoryMappedFile(f) + m, err := NewMemoryMappedFile(f, unix.PROT_READ|unix.PROT_WRITE) if err != nil { log.Fatal(err) } @@ -288,7 +290,7 @@ func TestMemoryMappedFile_Seek(t *testing.T) { defer f.Close() defer os.Remove(f.Name()) - m, err := NewMemoryMappedFile(f) + m, err := NewMemoryMappedFile(f, unix.PROT_READ|unix.PROT_WRITE) if err != nil { log.Fatal(err) } @@ -323,7 +325,7 @@ func TestMemoryMappedFile_Seek(t *testing.T) { defer f.Close() defer os.Remove(f.Name()) - m, err := NewMemoryMappedFile(f) + m, err := NewMemoryMappedFile(f, unix.PROT_READ|unix.PROT_WRITE) if err != nil { log.Fatal(err) } @@ -359,7 +361,7 @@ func TestMemoryMappedFile_Close(t *testing.T) { } defer os.Remove(f.Name()) - m, err := NewMemoryMappedFile(f) + m, err := NewMemoryMappedFile(f, unix.PROT_READ|unix.PROT_WRITE) if err != nil { log.Fatal(err) } @@ -381,7 +383,7 @@ func TestMemoryMappedFile_Close(t *testing.T) { } defer os.Remove(f.Name()) - m, err := NewMemoryMappedFile(f) + m, err := NewMemoryMappedFile(f, unix.PROT_READ|unix.PROT_WRITE) if err != nil { log.Fatal(err) } From aece01c1479ba43477e452bc0ff876c5dcefd278 Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Tue, 6 Feb 2024 18:37:02 -0500 Subject: [PATCH 3/3] pprof --- cmd/main.go | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 479660a6..de9cc3ea 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -3,7 +3,6 @@ package main import ( "flag" "fmt" - "log" "log/slog" "os" "runtime/pprof" @@ -16,13 +15,14 @@ import ( func main() { var debugFlag, jsonlFlag, csvFlag, showTimings bool - var indexFilename string + var indexFilename, pprofFilename string flag.BoolVar(&debugFlag, "debug", false, "Use logger that prints at the debug-level") flag.BoolVar(&jsonlFlag, "jsonl", false, "Use JSONL handler") flag.BoolVar(&csvFlag, "csv", false, "Use CSV handler") flag.BoolVar(&showTimings, "t", false, "Show time-related metrics") flag.StringVar(&indexFilename, "i", "", "Specify the existing index of the file to be opened, writing to stdout") + flag.StringVar(&pprofFilename, "pprof", "", "Specify the file to write the pprof data to") flag.Parse() logLevel := &slog.LevelVar{} @@ -31,15 +31,17 @@ func main() { logLevel.Set(slog.LevelDebug) } - f, err := os.Create("pprof.out") - if err != nil { - log.Fatal("could not create CPU profile: ", err) - } - defer f.Close() // error handling omitted for example - if err := pprof.StartCPUProfile(f); err != nil { - log.Fatal("could not start CPU profile: ", err) + if pprofFilename != "" { + f, err := os.Create(pprofFilename) + if err != nil { + panic(err) + } + defer f.Close() // error handling omitted for example + if err := pprof.StartCPUProfile(f); err != nil { + panic(err) + } + defer pprof.StopCPUProfile() } - defer pprof.StopCPUProfile() logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: logLevel})) slog.SetDefault(logger)