diff --git a/pkg/appendable/csv_handler.go b/pkg/appendable/csv_handler.go index c3d00519..2ab6ad8d 100644 --- a/pkg/appendable/csv_handler.go +++ b/pkg/appendable/csv_handler.go @@ -32,7 +32,9 @@ func (c CSVHandler) Synchronize(f *IndexFile) error { isHeader = true fromNewIndexFile = true } else { + slog.Debug("indexes already exist, not parsing headers") for _, index := range f.Indexes { + isHeader = false headers = append(headers, index.FieldName) } } @@ -50,8 +52,10 @@ func (c CSVHandler) Synchronize(f *IndexFile) error { start = f.EndByteOffsets[existingCount-1] } - f.EndByteOffsets = append(f.EndByteOffsets, start+uint64(len(line))+1) + slog.Debug("", slog.Uint64("start", start)) + slog.Debug("adding", slog.Any("endbyteoffset", start+uint64(len(line))), slog.Any("line", line)) + f.EndByteOffsets = append(f.EndByteOffsets, start+uint64(len(line))+1) f.Checksums = append(f.Checksums, xxhash.Sum64(line)) if isHeader { @@ -68,7 +72,15 @@ func (c CSVHandler) Synchronize(f *IndexFile) error { dec := csv.NewReader(bytes.NewReader(line)) slog.Debug("Handling csv", "line", i) - f.handleCSVLine(dec, headers, []string{}, uint64(existingCount)-1, start) + + if fromNewIndexFile { + + f.handleCSVLine(dec, headers, []string{}, uint64(existingCount)-1, start) + } else { + + f.handleCSVLine(dec, headers, []string{}, uint64(existingCount), start) + } + slog.Info("Succesfully processed", "line", i) } @@ -79,7 +91,9 @@ func (c CSVHandler) Synchronize(f *IndexFile) error { slog.Debug("Trimming endbyte offsets and checksums", "endByteOffsets", slog.Any("endByteOffsets", f.EndByteOffsets), "checksums", slog.Any("checksums", f.Checksums)) } + slog.Debug("indexes", slog.Any("", f.Indexes)) slog.Debug("Ending CSV synchronization") + slog.Debug("=========") return nil } @@ -174,7 +188,7 @@ func (i *IndexFile) handleCSVLine(dec *csv.Reader, headers []string, path []stri return fmt.Errorf("unexpected type '%T'", value) } - cumulativeLength += fieldLength + cumulativeLength += fieldLength + 1 } return nil diff --git a/pkg/appendable/index_file_csv_test.go b/pkg/appendable/index_file_csv_test.go index de55e930..146e6c90 100644 --- a/pkg/appendable/index_file_csv_test.go +++ b/pkg/appendable/index_file_csv_test.go @@ -2,6 +2,9 @@ package appendable import ( "bytes" + "fmt" + "log/slog" + "os" "strings" "testing" @@ -10,6 +13,19 @@ import ( func TestAppendDataRowCSV(t *testing.T) { + originalLogger := slog.Default() + + // Create a logger with Debug on + debugLevel := &slog.LevelVar{} + debugLevel.Set(slog.LevelDebug) + debugLogger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{ + Level: debugLevel, + })) + + slog.SetDefault(debugLogger) + + defer slog.SetDefault(originalLogger) + var mockCsv string = "header1\ntest1\n" var mockCsv2 string = "header1\ntest1\ntest3\n" @@ -51,6 +67,30 @@ func TestAppendDataRowCSV(t *testing.T) { } }) + t.Run("check end + start byte offsets multiple", func(t *testing.T) { + i, err := NewIndexFile(CSVHandler{ReadSeeker: strings.NewReader(mockCsv2)}) + if err != nil { + t.Fatal(err) + } + + if len(i.Indexes) != 1 { + t.Errorf("got len(i.Indexes) = %d, want 1", len(i.Indexes)) + } + + if len(i.Indexes[0].IndexRecords) != 2 { + t.Errorf("got len(i.Indexes[0].IndexRecords) = %d, want 2", len(i.Indexes[0].IndexRecords)) + } + + if i.Indexes[0].IndexRecords["test1"][0].FieldStartByteOffset != uint64(len("header1\n")) { + t.Errorf("got i.Indexes[0].IndexRecords[\"test1\"][0].FieldStartByteOffset = %d, want 7", i.Indexes[0].IndexRecords["test1"][0].FieldStartByteOffset) + } + + if i.Indexes[0].IndexRecords["test3"][0].FieldStartByteOffset != uint64(len("header1\ntest1\n")) { + t.Errorf("got i.Indexes[0].IndexRecords[\"test3\"][0].FieldStartByteOffset = %d, want %d", i.Indexes[0].IndexRecords["test3"][0].FieldStartByteOffset, uint64(len("header\ntest1\n"))) + } + + }) + t.Run("append index to existing", func(t *testing.T) { i, err := NewIndexFile(CSVHandler{ReadSeeker: strings.NewReader(mockCsv)}) if err != nil { @@ -74,6 +114,7 @@ func TestAppendDataRowCSV(t *testing.T) { } if len(j.Indexes[0].IndexRecords) != 2 { + fmt.Printf("index records look like %v", j.Indexes[0].IndexRecords) t.Errorf("got len(j.Indexes[0].IndexRecords) = %d, want 2", len(j.Indexes[0].IndexRecords)) } @@ -90,17 +131,17 @@ func TestAppendDataRowCSV(t *testing.T) { if j.Indexes[0].IndexRecords["test1"][0].DataNumber != 0 { t.Errorf("got i.Indexes[0].IndexRecords[\"test1\"][0].DataNumber = %d, want 0", j.Indexes[0].IndexRecords["test1"][0].DataNumber) } - if j.Indexes[0].IndexRecords["test1"][0].FieldStartByteOffset != uint64(len("{\"test\":")) { - t.Errorf("got i.Indexes[0].IndexRecords[\"test1\"][0].FieldStartByteOffset = %d, want 10", j.Indexes[0].IndexRecords["test1"][0].FieldStartByteOffset) + if j.Indexes[0].IndexRecords["test1"][0].FieldStartByteOffset != uint64(len("header1\n")) { + t.Errorf("got i.Indexes[0].IndexRecords[\"test1\"][0].FieldStartByteOffset = %d, want %d", j.Indexes[0].IndexRecords["test1"][0].FieldStartByteOffset, uint64(len("header\n"))) } - if j.Indexes[0].IndexRecords["test3"][0].DataNumber != 0 { + if j.Indexes[0].IndexRecords["test3"][0].DataNumber != 1 { t.Errorf("got i.Indexes[0].IndexRecords[\"test3\"][0].DataNumber = %d, want 1", j.Indexes[0].IndexRecords["test3"][0].DataNumber) } // verify byte offset calculation - if j.Indexes[0].IndexRecords["test3"][0].FieldStartByteOffset != uint64(len("header\ntest1\n")+1) { - t.Errorf("got i.Indexes[0].IndexRecords[\"test3\"][0].FieldStartByteOffset = %d, want 14", j.Indexes[0].IndexRecords["test3"][0].FieldStartByteOffset) + if j.Indexes[0].IndexRecords["test3"][0].FieldStartByteOffset != uint64(len("header1\ntest1\n")) { + t.Errorf("got i.Indexes[0].IndexRecords[\"test3\"][0].FieldStartByteOffset = %d, want %d", j.Indexes[0].IndexRecords["test3"][0].FieldStartByteOffset, uint64(len("header\ntest1\n"))) } }) @@ -140,6 +181,8 @@ func TestAppendDataRowCSV(t *testing.T) { if j.EndByteOffsets[1] != uint64(len("name,move\nmica,coyote\ngalvao,mount\n")) { t.Errorf("got i.DataRanges[1].EndByteOffset = %d, want %d", j.EndByteOffsets[1], uint64(len("name,move\nmica,coyote\ngalvao,mount\n"))) } + + fmt.Printf("index file looks like: %v", j.Indexes) }) t.Run("generate index file", func(t *testing.T) { diff --git a/pkg/appendable/index_file_test.go b/pkg/appendable/index_file_test.go index d339e5da..58ffc113 100644 --- a/pkg/appendable/index_file_test.go +++ b/pkg/appendable/index_file_test.go @@ -1,9 +1,8 @@ package appendable import ( + "bytes" "fmt" - "log/slog" - "os" "strings" "testing" @@ -23,23 +22,13 @@ jsonl <---> csv */ func TestIndexFile(t *testing.T) { - originalLogger := slog.Default() + mockJsonl := "{\"h1\":\"test1\", \"h2\":\"test2\"}\n" + mockJsonl2 := "{\"h1\":\"test1\", \"h2\":\"test2\"}\n{\"h1\":\"test3\", \"h2\":\"test4\"}\n" - // Create a logger with Debug on - debugLevel := &slog.LevelVar{} - debugLevel.Set(slog.LevelDebug) - debugLogger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{ - Level: debugLevel, - })) + mockCsv := "h1,h2\ntest1,test2\n" + mockCsv2 := "h1,h2\ntest1,test2\ntest3,test4\n" - slog.SetDefault(debugLogger) - - defer slog.SetDefault(originalLogger) - - mockJsonl := "{\"id\":\"identification\", \"age\":\"cottoneyedjoe\"}\n" - mockCsv := "id,age\nidentification,cottoneyedjoe\n" - - t.Run("generate index file", func(t *testing.T) { + t.Run("compare mock index file", func(t *testing.T) { // jsonl jif, err := NewIndexFile(JSONLHandler{ReadSeeker: strings.NewReader(mockJsonl)}) @@ -61,6 +50,47 @@ func TestIndexFile(t *testing.T) { }) + t.Run("compare mock index file after appending", func(t *testing.T) { + jif, err := NewIndexFile(JSONLHandler{ReadSeeker: strings.NewReader(mockJsonl)}) + if err != nil { + t.Fatal(err) + } + + jbuf := &bytes.Buffer{} + + if err := jif.Serialize(jbuf); err != nil { + t.Fatal(err) + } + + civ, err := NewIndexFile(CSVHandler{ReadSeeker: strings.NewReader(mockCsv)}) + if err != nil { + t.Fatal(err) + } + + cbuf := &bytes.Buffer{} + if err := civ.Serialize(cbuf); err != nil { + t.Fatal(err) + } + + j, err := ReadIndexFile(jbuf, JSONLHandler{ReadSeeker: strings.NewReader(mockJsonl2)}) + if err != nil { + t.Fatal(err) + } + + c, err := ReadIndexFile(cbuf, CSVHandler{ReadSeeker: strings.NewReader(mockCsv2)}) + if err != nil { + t.Fatal(err) + } + status, res := j.compareTo(c) + + fmt.Printf("%v", c) + + if !status { + t.Errorf("Not equal\n%v", res) + } + + }) + } func compareIndexRecord(ir1, ir2 *protocol.IndexRecord, fieldType protocol.FieldType) (bool, string) { @@ -69,22 +99,14 @@ func compareIndexRecord(ir1, ir2 *protocol.IndexRecord, fieldType protocol.Field } if fieldType&protocol.FieldTypeString != protocol.FieldTypeString { - if ir1.FieldStartByteOffset != ir2.FieldStartByteOffset { - return false, fmt.Sprintf("FieldStartByteOffset do not align\ti1: %v, i2: %v", ir1.FieldStartByteOffset, ir2.FieldStartByteOffset) - } - if ir1.FieldLength != ir2.FieldLength { return false, fmt.Sprintf("Field Length do not align\ti1: %v, i2: %v", ir1.FieldLength, ir2.FieldLength) } - } /* else { - if ir1.FieldStartByteOffset != ir2.FieldStartByteOffset { - return false, fmt.Sprintf("FieldStartByteOffset do not align\ti1: %v, i2: %v", ir1.FieldStartByteOffset, ir2.FieldStartByteOffset) - } - - if ir1.FieldLength != ir2.FieldLength { + } else { + if ir1.FieldLength != ir2.FieldLength+2 { return false, fmt.Sprintf("Field Length do not align\ti1: %v, i2: %v", ir1.FieldLength, ir2.FieldLength) } - }*/ + } return true, "" } @@ -151,21 +173,5 @@ func (i1 *IndexFile) compareTo(i2 *IndexFile) (bool, string) { return false, fmt.Sprintf("checksums length not equal\ti1: %v, i2: %v", len(i1.Checksums), len(i2.Checksums)) } - fmt.Printf("checksums equal") - - /* - for i, _ := range i1.EndByteOffsets { - if i1.EndByteOffsets[i] != i2.EndByteOffsets[i] { - return false, fmt.Sprintf("endbyteoffsets not equal\ti1: %v, i2: %v", i1.EndByteOffsets[i], i2.EndByteOffsets[i]) - } - - if i1.Checksums[i] != i2.Checksums[i] { - return false, fmt.Sprintf("checksums not equal\ti1: %v, i2: %v", i1.Checksums[i], i2.Checksums[i]) - } - } - */ - - fmt.Printf("endbyte and checksums deeply equal") - return true, "great success!" } diff --git a/pkg/appendable/io.go b/pkg/appendable/io.go index 6f9eb0bb..86739cbf 100644 --- a/pkg/appendable/io.go +++ b/pkg/appendable/io.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "fmt" "io" + "log/slog" "sort" "strings" @@ -32,6 +33,8 @@ func ReadIndexFile(r io.Reader, data DataHandler) (*IndexFile, error) { f.data = data + slog.Debug("Starting ReadIndexFile") + // read the version version, err := encoding.ReadByte(r) if err != nil { @@ -50,6 +53,8 @@ func ReadIndexFile(r io.Reader, data DataHandler) (*IndexFile, error) { return nil, fmt.Errorf("failed to read index file header: %w", err) } + slog.Debug("headers", slog.Any("ifh", ifh)) + // read the index headers f.Indexes = []Index{} br := 0 @@ -72,11 +77,15 @@ func ReadIndexFile(r io.Reader, data DataHandler) (*IndexFile, error) { index.IndexRecords = make(map[any][]protocol.IndexRecord) f.Indexes = append(f.Indexes, index) br += encoding.SizeString(index.FieldName) + binary.Size(ft) + binary.Size(uint64(0)) + + slog.Debug("", slog.Any("ih", index), slog.Any("recordCount", recordCount)) } if br != int(ifh.IndexLength) { return nil, fmt.Errorf("expected to read %d bytes, read %d bytes", ifh.IndexLength, br) } + slog.Debug("Reading index headers done") + // read the index records for i, index := range f.Indexes { @@ -106,6 +115,8 @@ func ReadIndexFile(r io.Reader, data DataHandler) (*IndexFile, error) { return nil, fmt.Errorf("failed to read index record: %w", err) } + slog.Debug("read index record", slog.Any("index", index.FieldName), slog.Any("any", value), slog.Any("record", ir)) + switch value.(type) { case nil, bool, int, int8, int16, int32, int64, float32, float64, string: fmt.Printf("appending: %v", value) @@ -173,7 +184,7 @@ func ReadIndexFile(r io.Reader, data DataHandler) (*IndexFile, error) { return nil, fmt.Errorf("failed to seek data file: %w", err) } } - + slog.Debug("======") return f, data.Synchronize(f) } diff --git a/pkg/protocol/protocol.go b/pkg/protocol/protocol.go index 78a16d54..96595c18 100644 --- a/pkg/protocol/protocol.go +++ b/pkg/protocol/protocol.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "io" + "log/slog" "strings" ) @@ -140,6 +141,8 @@ func (i IndexRecord) CSVField(r io.ReadSeeker) (any, error) { return nil, fmt.Errorf("failed to seek to original offset: %w", err) } + slog.Debug("fields", slog.Any("F", fields), slog.Any("len", len(fields))) + return fields[0], nil }