Skip to content

Commit

Permalink
fix bugs (#63)
Browse files Browse the repository at this point in the history
  • Loading branch information
friendlymatthew authored Jan 25, 2024
1 parent 2a6030d commit ef202bd
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 53 deletions.
20 changes: 17 additions & 3 deletions pkg/appendable/csv_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ func (c CSVHandler) Synchronize(f *IndexFile) error {
isHeader = true
fromNewIndexFile = true
} else {
slog.Debug("indexes already exist, not parsing headers")
for _, index := range f.Indexes {
isHeader = false
headers = append(headers, index.FieldName)
}
}
Expand All @@ -50,8 +52,10 @@ func (c CSVHandler) Synchronize(f *IndexFile) error {
start = f.EndByteOffsets[existingCount-1]
}

f.EndByteOffsets = append(f.EndByteOffsets, start+uint64(len(line))+1)
slog.Debug("", slog.Uint64("start", start))

slog.Debug("adding", slog.Any("endbyteoffset", start+uint64(len(line))), slog.Any("line", line))
f.EndByteOffsets = append(f.EndByteOffsets, start+uint64(len(line))+1)
f.Checksums = append(f.Checksums, xxhash.Sum64(line))

if isHeader {
Expand All @@ -68,7 +72,15 @@ func (c CSVHandler) Synchronize(f *IndexFile) error {

dec := csv.NewReader(bytes.NewReader(line))
slog.Debug("Handling csv", "line", i)
f.handleCSVLine(dec, headers, []string{}, uint64(existingCount)-1, start)

if fromNewIndexFile {

f.handleCSVLine(dec, headers, []string{}, uint64(existingCount)-1, start)
} else {

f.handleCSVLine(dec, headers, []string{}, uint64(existingCount), start)
}

slog.Info("Succesfully processed", "line", i)
}

Expand All @@ -79,7 +91,9 @@ func (c CSVHandler) Synchronize(f *IndexFile) error {
slog.Debug("Trimming endbyte offsets and checksums", "endByteOffsets", slog.Any("endByteOffsets", f.EndByteOffsets), "checksums", slog.Any("checksums", f.Checksums))
}

slog.Debug("indexes", slog.Any("", f.Indexes))
slog.Debug("Ending CSV synchronization")
slog.Debug("=========")
return nil
}

Expand Down Expand Up @@ -174,7 +188,7 @@ func (i *IndexFile) handleCSVLine(dec *csv.Reader, headers []string, path []stri
return fmt.Errorf("unexpected type '%T'", value)
}

cumulativeLength += fieldLength
cumulativeLength += fieldLength + 1
}

return nil
Expand Down
53 changes: 48 additions & 5 deletions pkg/appendable/index_file_csv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package appendable

import (
"bytes"
"fmt"
"log/slog"
"os"
"strings"
"testing"

Expand All @@ -10,6 +13,19 @@ import (

func TestAppendDataRowCSV(t *testing.T) {

originalLogger := slog.Default()

// Create a logger with Debug on
debugLevel := &slog.LevelVar{}
debugLevel.Set(slog.LevelDebug)
debugLogger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
Level: debugLevel,
}))

slog.SetDefault(debugLogger)

defer slog.SetDefault(originalLogger)

var mockCsv string = "header1\ntest1\n"
var mockCsv2 string = "header1\ntest1\ntest3\n"

Expand Down Expand Up @@ -51,6 +67,30 @@ func TestAppendDataRowCSV(t *testing.T) {
}
})

t.Run("check end + start byte offsets multiple", func(t *testing.T) {
i, err := NewIndexFile(CSVHandler{ReadSeeker: strings.NewReader(mockCsv2)})
if err != nil {
t.Fatal(err)
}

if len(i.Indexes) != 1 {
t.Errorf("got len(i.Indexes) = %d, want 1", len(i.Indexes))
}

if len(i.Indexes[0].IndexRecords) != 2 {
t.Errorf("got len(i.Indexes[0].IndexRecords) = %d, want 2", len(i.Indexes[0].IndexRecords))
}

if i.Indexes[0].IndexRecords["test1"][0].FieldStartByteOffset != uint64(len("header1\n")) {
t.Errorf("got i.Indexes[0].IndexRecords[\"test1\"][0].FieldStartByteOffset = %d, want 7", i.Indexes[0].IndexRecords["test1"][0].FieldStartByteOffset)
}

if i.Indexes[0].IndexRecords["test3"][0].FieldStartByteOffset != uint64(len("header1\ntest1\n")) {
t.Errorf("got i.Indexes[0].IndexRecords[\"test3\"][0].FieldStartByteOffset = %d, want %d", i.Indexes[0].IndexRecords["test3"][0].FieldStartByteOffset, uint64(len("header\ntest1\n")))
}

})

t.Run("append index to existing", func(t *testing.T) {
i, err := NewIndexFile(CSVHandler{ReadSeeker: strings.NewReader(mockCsv)})
if err != nil {
Expand All @@ -74,6 +114,7 @@ func TestAppendDataRowCSV(t *testing.T) {
}

if len(j.Indexes[0].IndexRecords) != 2 {
fmt.Printf("index records look like %v", j.Indexes[0].IndexRecords)
t.Errorf("got len(j.Indexes[0].IndexRecords) = %d, want 2", len(j.Indexes[0].IndexRecords))
}

Expand All @@ -90,17 +131,17 @@ func TestAppendDataRowCSV(t *testing.T) {
if j.Indexes[0].IndexRecords["test1"][0].DataNumber != 0 {
t.Errorf("got i.Indexes[0].IndexRecords[\"test1\"][0].DataNumber = %d, want 0", j.Indexes[0].IndexRecords["test1"][0].DataNumber)
}
if j.Indexes[0].IndexRecords["test1"][0].FieldStartByteOffset != uint64(len("{\"test\":")) {
t.Errorf("got i.Indexes[0].IndexRecords[\"test1\"][0].FieldStartByteOffset = %d, want 10", j.Indexes[0].IndexRecords["test1"][0].FieldStartByteOffset)
if j.Indexes[0].IndexRecords["test1"][0].FieldStartByteOffset != uint64(len("header1\n")) {
t.Errorf("got i.Indexes[0].IndexRecords[\"test1\"][0].FieldStartByteOffset = %d, want %d", j.Indexes[0].IndexRecords["test1"][0].FieldStartByteOffset, uint64(len("header\n")))
}

if j.Indexes[0].IndexRecords["test3"][0].DataNumber != 0 {
if j.Indexes[0].IndexRecords["test3"][0].DataNumber != 1 {
t.Errorf("got i.Indexes[0].IndexRecords[\"test3\"][0].DataNumber = %d, want 1", j.Indexes[0].IndexRecords["test3"][0].DataNumber)
}

// verify byte offset calculation
if j.Indexes[0].IndexRecords["test3"][0].FieldStartByteOffset != uint64(len("header\ntest1\n")+1) {
t.Errorf("got i.Indexes[0].IndexRecords[\"test3\"][0].FieldStartByteOffset = %d, want 14", j.Indexes[0].IndexRecords["test3"][0].FieldStartByteOffset)
if j.Indexes[0].IndexRecords["test3"][0].FieldStartByteOffset != uint64(len("header1\ntest1\n")) {
t.Errorf("got i.Indexes[0].IndexRecords[\"test3\"][0].FieldStartByteOffset = %d, want %d", j.Indexes[0].IndexRecords["test3"][0].FieldStartByteOffset, uint64(len("header\ntest1\n")))
}
})

Expand Down Expand Up @@ -140,6 +181,8 @@ func TestAppendDataRowCSV(t *testing.T) {
if j.EndByteOffsets[1] != uint64(len("name,move\nmica,coyote\ngalvao,mount\n")) {
t.Errorf("got i.DataRanges[1].EndByteOffset = %d, want %d", j.EndByteOffsets[1], uint64(len("name,move\nmica,coyote\ngalvao,mount\n")))
}

fmt.Printf("index file looks like: %v", j.Indexes)
})

t.Run("generate index file", func(t *testing.T) {
Expand Down
94 changes: 50 additions & 44 deletions pkg/appendable/index_file_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package appendable

import (
"bytes"
"fmt"
"log/slog"
"os"
"strings"
"testing"

Expand All @@ -23,23 +22,13 @@ jsonl <---> csv
*/
func TestIndexFile(t *testing.T) {

originalLogger := slog.Default()
mockJsonl := "{\"h1\":\"test1\", \"h2\":\"test2\"}\n"
mockJsonl2 := "{\"h1\":\"test1\", \"h2\":\"test2\"}\n{\"h1\":\"test3\", \"h2\":\"test4\"}\n"

// Create a logger with Debug on
debugLevel := &slog.LevelVar{}
debugLevel.Set(slog.LevelDebug)
debugLogger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
Level: debugLevel,
}))
mockCsv := "h1,h2\ntest1,test2\n"
mockCsv2 := "h1,h2\ntest1,test2\ntest3,test4\n"

slog.SetDefault(debugLogger)

defer slog.SetDefault(originalLogger)

mockJsonl := "{\"id\":\"identification\", \"age\":\"cottoneyedjoe\"}\n"
mockCsv := "id,age\nidentification,cottoneyedjoe\n"

t.Run("generate index file", func(t *testing.T) {
t.Run("compare mock index file", func(t *testing.T) {
// jsonl
jif, err := NewIndexFile(JSONLHandler{ReadSeeker: strings.NewReader(mockJsonl)})

Expand All @@ -61,6 +50,47 @@ func TestIndexFile(t *testing.T) {

})

t.Run("compare mock index file after appending", func(t *testing.T) {
jif, err := NewIndexFile(JSONLHandler{ReadSeeker: strings.NewReader(mockJsonl)})
if err != nil {
t.Fatal(err)
}

jbuf := &bytes.Buffer{}

if err := jif.Serialize(jbuf); err != nil {
t.Fatal(err)
}

civ, err := NewIndexFile(CSVHandler{ReadSeeker: strings.NewReader(mockCsv)})
if err != nil {
t.Fatal(err)
}

cbuf := &bytes.Buffer{}
if err := civ.Serialize(cbuf); err != nil {
t.Fatal(err)
}

j, err := ReadIndexFile(jbuf, JSONLHandler{ReadSeeker: strings.NewReader(mockJsonl2)})
if err != nil {
t.Fatal(err)
}

c, err := ReadIndexFile(cbuf, CSVHandler{ReadSeeker: strings.NewReader(mockCsv2)})
if err != nil {
t.Fatal(err)
}
status, res := j.compareTo(c)

fmt.Printf("%v", c)

if !status {
t.Errorf("Not equal\n%v", res)
}

})

}

func compareIndexRecord(ir1, ir2 *protocol.IndexRecord, fieldType protocol.FieldType) (bool, string) {
Expand All @@ -69,22 +99,14 @@ func compareIndexRecord(ir1, ir2 *protocol.IndexRecord, fieldType protocol.Field
}

if fieldType&protocol.FieldTypeString != protocol.FieldTypeString {
if ir1.FieldStartByteOffset != ir2.FieldStartByteOffset {
return false, fmt.Sprintf("FieldStartByteOffset do not align\ti1: %v, i2: %v", ir1.FieldStartByteOffset, ir2.FieldStartByteOffset)
}

if ir1.FieldLength != ir2.FieldLength {
return false, fmt.Sprintf("Field Length do not align\ti1: %v, i2: %v", ir1.FieldLength, ir2.FieldLength)
}
} /* else {
if ir1.FieldStartByteOffset != ir2.FieldStartByteOffset {
return false, fmt.Sprintf("FieldStartByteOffset do not align\ti1: %v, i2: %v", ir1.FieldStartByteOffset, ir2.FieldStartByteOffset)
}
if ir1.FieldLength != ir2.FieldLength {
} else {
if ir1.FieldLength != ir2.FieldLength+2 {
return false, fmt.Sprintf("Field Length do not align\ti1: %v, i2: %v", ir1.FieldLength, ir2.FieldLength)
}
}*/
}
return true, ""
}

Expand Down Expand Up @@ -151,21 +173,5 @@ func (i1 *IndexFile) compareTo(i2 *IndexFile) (bool, string) {
return false, fmt.Sprintf("checksums length not equal\ti1: %v, i2: %v", len(i1.Checksums), len(i2.Checksums))
}

fmt.Printf("checksums equal")

/*
for i, _ := range i1.EndByteOffsets {
if i1.EndByteOffsets[i] != i2.EndByteOffsets[i] {
return false, fmt.Sprintf("endbyteoffsets not equal\ti1: %v, i2: %v", i1.EndByteOffsets[i], i2.EndByteOffsets[i])
}
if i1.Checksums[i] != i2.Checksums[i] {
return false, fmt.Sprintf("checksums not equal\ti1: %v, i2: %v", i1.Checksums[i], i2.Checksums[i])
}
}
*/

fmt.Printf("endbyte and checksums deeply equal")

return true, "great success!"
}
13 changes: 12 additions & 1 deletion pkg/appendable/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/binary"
"fmt"
"io"
"log/slog"
"sort"
"strings"

Expand Down Expand Up @@ -32,6 +33,8 @@ func ReadIndexFile(r io.Reader, data DataHandler) (*IndexFile, error) {

f.data = data

slog.Debug("Starting ReadIndexFile")

// read the version
version, err := encoding.ReadByte(r)
if err != nil {
Expand All @@ -50,6 +53,8 @@ func ReadIndexFile(r io.Reader, data DataHandler) (*IndexFile, error) {
return nil, fmt.Errorf("failed to read index file header: %w", err)
}

slog.Debug("headers", slog.Any("ifh", ifh))

// read the index headers
f.Indexes = []Index{}
br := 0
Expand All @@ -72,11 +77,15 @@ func ReadIndexFile(r io.Reader, data DataHandler) (*IndexFile, error) {
index.IndexRecords = make(map[any][]protocol.IndexRecord)
f.Indexes = append(f.Indexes, index)
br += encoding.SizeString(index.FieldName) + binary.Size(ft) + binary.Size(uint64(0))

slog.Debug("", slog.Any("ih", index), slog.Any("recordCount", recordCount))
}
if br != int(ifh.IndexLength) {
return nil, fmt.Errorf("expected to read %d bytes, read %d bytes", ifh.IndexLength, br)
}

slog.Debug("Reading index headers done")

// read the index records
for i, index := range f.Indexes {

Expand Down Expand Up @@ -106,6 +115,8 @@ func ReadIndexFile(r io.Reader, data DataHandler) (*IndexFile, error) {
return nil, fmt.Errorf("failed to read index record: %w", err)
}

slog.Debug("read index record", slog.Any("index", index.FieldName), slog.Any("any", value), slog.Any("record", ir))

switch value.(type) {
case nil, bool, int, int8, int16, int32, int64, float32, float64, string:
fmt.Printf("appending: %v", value)
Expand Down Expand Up @@ -173,7 +184,7 @@ func ReadIndexFile(r io.Reader, data DataHandler) (*IndexFile, error) {
return nil, fmt.Errorf("failed to seek data file: %w", err)
}
}

slog.Debug("======")
return f, data.Synchronize(f)
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"io"
"log/slog"
"strings"
)

Expand Down Expand Up @@ -140,6 +141,8 @@ func (i IndexRecord) CSVField(r io.ReadSeeker) (any, error) {
return nil, fmt.Errorf("failed to seek to original offset: %w", err)
}

slog.Debug("fields", slog.Any("F", fields), slog.Any("len", len(fields)))

return fields[0], nil
}

Expand Down

0 comments on commit ef202bd

Please sign in to comment.