Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix critical csv bugs #63

Merged
merged 1 commit into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions pkg/appendable/csv_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ func (c CSVHandler) Synchronize(f *IndexFile) error {
isHeader = true
fromNewIndexFile = true
} else {
slog.Debug("indexes already exist, not parsing headers")
for _, index := range f.Indexes {
isHeader = false
headers = append(headers, index.FieldName)
}
}
Expand All @@ -50,8 +52,10 @@ func (c CSVHandler) Synchronize(f *IndexFile) error {
start = f.EndByteOffsets[existingCount-1]
}

f.EndByteOffsets = append(f.EndByteOffsets, start+uint64(len(line))+1)
slog.Debug("", slog.Uint64("start", start))

slog.Debug("adding", slog.Any("endbyteoffset", start+uint64(len(line))), slog.Any("line", line))
f.EndByteOffsets = append(f.EndByteOffsets, start+uint64(len(line))+1)
f.Checksums = append(f.Checksums, xxhash.Sum64(line))

if isHeader {
Expand All @@ -68,7 +72,15 @@ func (c CSVHandler) Synchronize(f *IndexFile) error {

dec := csv.NewReader(bytes.NewReader(line))
slog.Debug("Handling csv", "line", i)
f.handleCSVLine(dec, headers, []string{}, uint64(existingCount)-1, start)

if fromNewIndexFile {

f.handleCSVLine(dec, headers, []string{}, uint64(existingCount)-1, start)
} else {

f.handleCSVLine(dec, headers, []string{}, uint64(existingCount), start)
}

slog.Info("Succesfully processed", "line", i)
}

Expand All @@ -79,7 +91,9 @@ func (c CSVHandler) Synchronize(f *IndexFile) error {
slog.Debug("Trimming endbyte offsets and checksums", "endByteOffsets", slog.Any("endByteOffsets", f.EndByteOffsets), "checksums", slog.Any("checksums", f.Checksums))
}

slog.Debug("indexes", slog.Any("", f.Indexes))
slog.Debug("Ending CSV synchronization")
slog.Debug("=========")
return nil
}

Expand Down Expand Up @@ -174,7 +188,7 @@ func (i *IndexFile) handleCSVLine(dec *csv.Reader, headers []string, path []stri
return fmt.Errorf("unexpected type '%T'", value)
}

cumulativeLength += fieldLength
cumulativeLength += fieldLength + 1
friendlymatthew marked this conversation as resolved.
Show resolved Hide resolved
}

return nil
Expand Down
53 changes: 48 additions & 5 deletions pkg/appendable/index_file_csv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package appendable

import (
"bytes"
"fmt"
"log/slog"
"os"
"strings"
"testing"

Expand All @@ -10,6 +13,19 @@ import (

func TestAppendDataRowCSV(t *testing.T) {

originalLogger := slog.Default()

// Create a logger with Debug on
debugLevel := &slog.LevelVar{}
debugLevel.Set(slog.LevelDebug)
debugLogger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
Level: debugLevel,
}))

slog.SetDefault(debugLogger)

defer slog.SetDefault(originalLogger)

var mockCsv string = "header1\ntest1\n"
var mockCsv2 string = "header1\ntest1\ntest3\n"

Expand Down Expand Up @@ -51,6 +67,30 @@ func TestAppendDataRowCSV(t *testing.T) {
}
})

t.Run("check end + start byte offsets multiple", func(t *testing.T) {
i, err := NewIndexFile(CSVHandler{ReadSeeker: strings.NewReader(mockCsv2)})
if err != nil {
t.Fatal(err)
}

if len(i.Indexes) != 1 {
t.Errorf("got len(i.Indexes) = %d, want 1", len(i.Indexes))
}

if len(i.Indexes[0].IndexRecords) != 2 {
t.Errorf("got len(i.Indexes[0].IndexRecords) = %d, want 2", len(i.Indexes[0].IndexRecords))
}

if i.Indexes[0].IndexRecords["test1"][0].FieldStartByteOffset != uint64(len("header1\n")) {
t.Errorf("got i.Indexes[0].IndexRecords[\"test1\"][0].FieldStartByteOffset = %d, want 7", i.Indexes[0].IndexRecords["test1"][0].FieldStartByteOffset)
}

if i.Indexes[0].IndexRecords["test3"][0].FieldStartByteOffset != uint64(len("header1\ntest1\n")) {
t.Errorf("got i.Indexes[0].IndexRecords[\"test3\"][0].FieldStartByteOffset = %d, want %d", i.Indexes[0].IndexRecords["test3"][0].FieldStartByteOffset, uint64(len("header\ntest1\n")))
}

})

t.Run("append index to existing", func(t *testing.T) {
i, err := NewIndexFile(CSVHandler{ReadSeeker: strings.NewReader(mockCsv)})
if err != nil {
Expand All @@ -74,6 +114,7 @@ func TestAppendDataRowCSV(t *testing.T) {
}

if len(j.Indexes[0].IndexRecords) != 2 {
fmt.Printf("index records look like %v", j.Indexes[0].IndexRecords)
t.Errorf("got len(j.Indexes[0].IndexRecords) = %d, want 2", len(j.Indexes[0].IndexRecords))
}

Expand All @@ -90,17 +131,17 @@ func TestAppendDataRowCSV(t *testing.T) {
if j.Indexes[0].IndexRecords["test1"][0].DataNumber != 0 {
t.Errorf("got i.Indexes[0].IndexRecords[\"test1\"][0].DataNumber = %d, want 0", j.Indexes[0].IndexRecords["test1"][0].DataNumber)
}
if j.Indexes[0].IndexRecords["test1"][0].FieldStartByteOffset != uint64(len("{\"test\":")) {
t.Errorf("got i.Indexes[0].IndexRecords[\"test1\"][0].FieldStartByteOffset = %d, want 10", j.Indexes[0].IndexRecords["test1"][0].FieldStartByteOffset)
if j.Indexes[0].IndexRecords["test1"][0].FieldStartByteOffset != uint64(len("header1\n")) {
t.Errorf("got i.Indexes[0].IndexRecords[\"test1\"][0].FieldStartByteOffset = %d, want %d", j.Indexes[0].IndexRecords["test1"][0].FieldStartByteOffset, uint64(len("header\n")))
}

if j.Indexes[0].IndexRecords["test3"][0].DataNumber != 0 {
if j.Indexes[0].IndexRecords["test3"][0].DataNumber != 1 {
t.Errorf("got i.Indexes[0].IndexRecords[\"test3\"][0].DataNumber = %d, want 1", j.Indexes[0].IndexRecords["test3"][0].DataNumber)
}

// verify byte offset calculation
if j.Indexes[0].IndexRecords["test3"][0].FieldStartByteOffset != uint64(len("header\ntest1\n")+1) {
t.Errorf("got i.Indexes[0].IndexRecords[\"test3\"][0].FieldStartByteOffset = %d, want 14", j.Indexes[0].IndexRecords["test3"][0].FieldStartByteOffset)
if j.Indexes[0].IndexRecords["test3"][0].FieldStartByteOffset != uint64(len("header1\ntest1\n")) {
t.Errorf("got i.Indexes[0].IndexRecords[\"test3\"][0].FieldStartByteOffset = %d, want %d", j.Indexes[0].IndexRecords["test3"][0].FieldStartByteOffset, uint64(len("header\ntest1\n")))
}
})

Expand Down Expand Up @@ -140,6 +181,8 @@ func TestAppendDataRowCSV(t *testing.T) {
if j.EndByteOffsets[1] != uint64(len("name,move\nmica,coyote\ngalvao,mount\n")) {
t.Errorf("got i.DataRanges[1].EndByteOffset = %d, want %d", j.EndByteOffsets[1], uint64(len("name,move\nmica,coyote\ngalvao,mount\n")))
}

fmt.Printf("index file looks like: %v", j.Indexes)
})

t.Run("generate index file", func(t *testing.T) {
Expand Down
94 changes: 50 additions & 44 deletions pkg/appendable/index_file_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package appendable

import (
"bytes"
"fmt"
"log/slog"
"os"
"strings"
"testing"

Expand All @@ -23,23 +22,13 @@ jsonl <---> csv
*/
func TestIndexFile(t *testing.T) {

originalLogger := slog.Default()
mockJsonl := "{\"h1\":\"test1\", \"h2\":\"test2\"}\n"
mockJsonl2 := "{\"h1\":\"test1\", \"h2\":\"test2\"}\n{\"h1\":\"test3\", \"h2\":\"test4\"}\n"

// Create a logger with Debug on
debugLevel := &slog.LevelVar{}
debugLevel.Set(slog.LevelDebug)
debugLogger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
Level: debugLevel,
}))
mockCsv := "h1,h2\ntest1,test2\n"
mockCsv2 := "h1,h2\ntest1,test2\ntest3,test4\n"

slog.SetDefault(debugLogger)

defer slog.SetDefault(originalLogger)

mockJsonl := "{\"id\":\"identification\", \"age\":\"cottoneyedjoe\"}\n"
mockCsv := "id,age\nidentification,cottoneyedjoe\n"

t.Run("generate index file", func(t *testing.T) {
t.Run("compare mock index file", func(t *testing.T) {
// jsonl
jif, err := NewIndexFile(JSONLHandler{ReadSeeker: strings.NewReader(mockJsonl)})

Expand All @@ -61,6 +50,47 @@ func TestIndexFile(t *testing.T) {

})

t.Run("compare mock index file after appending", func(t *testing.T) {
friendlymatthew marked this conversation as resolved.
Show resolved Hide resolved
jif, err := NewIndexFile(JSONLHandler{ReadSeeker: strings.NewReader(mockJsonl)})
if err != nil {
t.Fatal(err)
}

jbuf := &bytes.Buffer{}

if err := jif.Serialize(jbuf); err != nil {
t.Fatal(err)
}

civ, err := NewIndexFile(CSVHandler{ReadSeeker: strings.NewReader(mockCsv)})
if err != nil {
t.Fatal(err)
}

cbuf := &bytes.Buffer{}
if err := civ.Serialize(cbuf); err != nil {
t.Fatal(err)
}

j, err := ReadIndexFile(jbuf, JSONLHandler{ReadSeeker: strings.NewReader(mockJsonl2)})
if err != nil {
t.Fatal(err)
}

c, err := ReadIndexFile(cbuf, CSVHandler{ReadSeeker: strings.NewReader(mockCsv2)})
if err != nil {
t.Fatal(err)
}
status, res := j.compareTo(c)

fmt.Printf("%v", c)

if !status {
t.Errorf("Not equal\n%v", res)
}

})

}

func compareIndexRecord(ir1, ir2 *protocol.IndexRecord, fieldType protocol.FieldType) (bool, string) {
Expand All @@ -69,22 +99,14 @@ func compareIndexRecord(ir1, ir2 *protocol.IndexRecord, fieldType protocol.Field
}

if fieldType&protocol.FieldTypeString != protocol.FieldTypeString {
if ir1.FieldStartByteOffset != ir2.FieldStartByteOffset {
return false, fmt.Sprintf("FieldStartByteOffset do not align\ti1: %v, i2: %v", ir1.FieldStartByteOffset, ir2.FieldStartByteOffset)
}

if ir1.FieldLength != ir2.FieldLength {
return false, fmt.Sprintf("Field Length do not align\ti1: %v, i2: %v", ir1.FieldLength, ir2.FieldLength)
}
} /* else {
if ir1.FieldStartByteOffset != ir2.FieldStartByteOffset {
return false, fmt.Sprintf("FieldStartByteOffset do not align\ti1: %v, i2: %v", ir1.FieldStartByteOffset, ir2.FieldStartByteOffset)
}

if ir1.FieldLength != ir2.FieldLength {
} else {
if ir1.FieldLength != ir2.FieldLength+2 {
friendlymatthew marked this conversation as resolved.
Show resolved Hide resolved
return false, fmt.Sprintf("Field Length do not align\ti1: %v, i2: %v", ir1.FieldLength, ir2.FieldLength)
}
}*/
}
return true, ""
}

Expand Down Expand Up @@ -151,21 +173,5 @@ func (i1 *IndexFile) compareTo(i2 *IndexFile) (bool, string) {
return false, fmt.Sprintf("checksums length not equal\ti1: %v, i2: %v", len(i1.Checksums), len(i2.Checksums))
}

fmt.Printf("checksums equal")

/*
for i, _ := range i1.EndByteOffsets {
if i1.EndByteOffsets[i] != i2.EndByteOffsets[i] {
return false, fmt.Sprintf("endbyteoffsets not equal\ti1: %v, i2: %v", i1.EndByteOffsets[i], i2.EndByteOffsets[i])
}

if i1.Checksums[i] != i2.Checksums[i] {
return false, fmt.Sprintf("checksums not equal\ti1: %v, i2: %v", i1.Checksums[i], i2.Checksums[i])
}
}
*/

fmt.Printf("endbyte and checksums deeply equal")

return true, "great success!"
}
13 changes: 12 additions & 1 deletion pkg/appendable/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/binary"
"fmt"
"io"
"log/slog"
"sort"
"strings"

Expand Down Expand Up @@ -32,6 +33,8 @@ func ReadIndexFile(r io.Reader, data DataHandler) (*IndexFile, error) {

f.data = data

slog.Debug("Starting ReadIndexFile")

// read the version
version, err := encoding.ReadByte(r)
if err != nil {
Expand All @@ -50,6 +53,8 @@ func ReadIndexFile(r io.Reader, data DataHandler) (*IndexFile, error) {
return nil, fmt.Errorf("failed to read index file header: %w", err)
}

slog.Debug("headers", slog.Any("ifh", ifh))

// read the index headers
f.Indexes = []Index{}
br := 0
Expand All @@ -72,11 +77,15 @@ func ReadIndexFile(r io.Reader, data DataHandler) (*IndexFile, error) {
index.IndexRecords = make(map[any][]protocol.IndexRecord)
f.Indexes = append(f.Indexes, index)
br += encoding.SizeString(index.FieldName) + binary.Size(ft) + binary.Size(uint64(0))

slog.Debug("", slog.Any("ih", index), slog.Any("recordCount", recordCount))
}
if br != int(ifh.IndexLength) {
return nil, fmt.Errorf("expected to read %d bytes, read %d bytes", ifh.IndexLength, br)
}

slog.Debug("Reading index headers done")

// read the index records
for i, index := range f.Indexes {

Expand Down Expand Up @@ -106,6 +115,8 @@ func ReadIndexFile(r io.Reader, data DataHandler) (*IndexFile, error) {
return nil, fmt.Errorf("failed to read index record: %w", err)
}

slog.Debug("read index record", slog.Any("index", index.FieldName), slog.Any("any", value), slog.Any("record", ir))

switch value.(type) {
case nil, bool, int, int8, int16, int32, int64, float32, float64, string:
fmt.Printf("appending: %v", value)
Expand Down Expand Up @@ -173,7 +184,7 @@ func ReadIndexFile(r io.Reader, data DataHandler) (*IndexFile, error) {
return nil, fmt.Errorf("failed to seek data file: %w", err)
}
}

slog.Debug("======")
return f, data.Synchronize(f)
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"io"
"log/slog"
"strings"
)

Expand Down Expand Up @@ -140,6 +141,8 @@ func (i IndexRecord) CSVField(r io.ReadSeeker) (any, error) {
return nil, fmt.Errorf("failed to seek to original offset: %w", err)
}

slog.Debug("fields", slog.Any("F", fields), slog.Any("len", len(fields)))

return fields[0], nil
}

Expand Down
Loading