diff --git a/cmd/main.go b/cmd/main.go index 6291cf2c..63dbf849 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -4,7 +4,7 @@ import ( "bufio" "flag" "fmt" - "log" + "log/slog" "os" "time" @@ -12,16 +12,24 @@ import ( ) func main() { + var debugFlag, jsonlFlag, csvFlag, showTimings bool - var jsonlFlag bool - var csvFlag bool - + flag.BoolVar(&debugFlag, "debug", false, "Use logger that prints at the debug-level") flag.BoolVar(&jsonlFlag, "jsonl", false, "Use JSONL handler") flag.BoolVar(&csvFlag, "csv", false, "Use CSV handler") - - var showTimings bool flag.BoolVar(&showTimings, "t", false, "Show time-related metrics") + flag.Parse() + + logLevel := &slog.LevelVar{} + + if debugFlag { + logLevel.Set(slog.LevelDebug) + } + + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: logLevel})) + slog.SetDefault(logger) + var totalStart, readStart, writeStart time.Time if showTimings { totalStart = time.Now() @@ -33,7 +41,6 @@ func main() { flag.PrintDefaults() os.Exit(1) } - flag.Parse() args := flag.Args() @@ -47,32 +54,34 @@ func main() { panic(err) } - var dataHandler appendable.DataHandler switch { case jsonlFlag: - dataHandler = appendable.JSONLHandler{ReadSeeker: file} + dataHandler = appendable.JSONLHandler{ + ReadSeeker: file, + } case csvFlag: - dataHandler = appendable.CSVHandler{ReadSeeker: file} + dataHandler = appendable.CSVHandler{ + ReadSeeker: file, + } default: - fmt.Println("Please specify the file type with -jsonl or -csv.") + logger.Error("Please specify the file type with -jsonl or -csv.") os.Exit(1) } - if showTimings { + if showTimings { readStart = time.Now() } // Open the index file indexFile, err := appendable.NewIndexFile(dataHandler) + if err != nil { + panic(err) + } if showTimings { readDuration := time.Since(readStart) - log.Printf("Opening + synchronizing index file took: %s", readDuration) - } - - if err != nil { - panic(err) + logger.Info("Opening + synchronizing index file took", slog.Duration("duration", readDuration)) } // Write the index file @@ -83,7 +92,7 @@ func main() { if err != nil { panic(err) } - log.Printf("Writing index file to %s", args[0]+".index") + logger.Info("Writing index file to", slog.String("path", args[0]+".index")) bufof := bufio.NewWriter(of) if err := indexFile.Serialize(bufof); err != nil { panic(err) @@ -97,11 +106,11 @@ func main() { if showTimings { writeDuration := time.Since(writeStart) - log.Printf("Writing index file took: %s", writeDuration) + logger.Info("Writing index file took", slog.Duration("duration", writeDuration)) totalDuration := time.Since(totalStart) - log.Printf("Total execution time: %s", totalDuration) + logger.Info("Total execution time", slog.Duration("duration", totalDuration)) } - log.Printf("Done!") + logger.Info("Done!") } diff --git a/pkg/appendable/csv_handler.go b/pkg/appendable/csv_handler.go index 3d3c69fb..c3d00519 100644 --- a/pkg/appendable/csv_handler.go +++ b/pkg/appendable/csv_handler.go @@ -6,6 +6,7 @@ import ( "encoding/csv" "fmt" "io" + "log/slog" "strconv" "strings" @@ -18,6 +19,8 @@ type CSVHandler struct { } func (c CSVHandler) Synchronize(f *IndexFile) error { + slog.Debug("Starting CSV synchronization") + var headers []string var err error @@ -52,9 +55,11 @@ func (c CSVHandler) Synchronize(f *IndexFile) error { f.Checksums = append(f.Checksums, xxhash.Sum64(line)) if isHeader { + slog.Info("Parsing CSV headers") dec := csv.NewReader(bytes.NewReader(line)) headers, err = dec.Read() if err != nil { + slog.Error("failed to parse CSV header", "error", err) return fmt.Errorf("failed to parse CSV header: %w", err) } isHeader = false @@ -62,14 +67,19 @@ func (c CSVHandler) Synchronize(f *IndexFile) error { } dec := csv.NewReader(bytes.NewReader(line)) + slog.Debug("Handling csv", "line", i) f.handleCSVLine(dec, headers, []string{}, uint64(existingCount)-1, start) + slog.Info("Succesfully processed", "line", i) } if fromNewIndexFile && len(f.EndByteOffsets) > 0 { f.EndByteOffsets = f.EndByteOffsets[1:] f.Checksums = f.Checksums[1:] + + slog.Debug("Trimming endbyte offsets and checksums", "endByteOffsets", slog.Any("endByteOffsets", f.EndByteOffsets), "checksums", slog.Any("checksums", f.Checksums)) } + slog.Debug("Ending CSV synchronization") return nil } @@ -89,7 +99,6 @@ func fieldRankCsvField(fieldValue any) int { } func inferCSVField(fieldValue string) (interface{}, protocol.FieldType) { - if fieldValue == "" { return nil, protocol.FieldTypeNull } @@ -110,17 +119,22 @@ func inferCSVField(fieldValue string) (interface{}, protocol.FieldType) { } func (i *IndexFile) handleCSVLine(dec *csv.Reader, headers []string, path []string, dataIndex, dataOffset uint64) error { + slog.Debug("Processing CSV line", slog.Int("dataIndex", int(dataIndex)), slog.Int("dataOffset", int(dataOffset))) record, err := dec.Read() if err != nil { + slog.Error("Failed to read CSV record at index", "dataIndex", dataIndex, "error", err) return fmt.Errorf("failed to read CSV record at index %d: %w", dataIndex, err) } + slog.Debug("CSV line read successfully", "record", record) + cumulativeLength := uint64(0) for fieldIndex, fieldValue := range record { if fieldIndex >= len(headers) { + slog.Error("Field index is out of bounds with headers", "fieldIndex", fieldIndex, "headers", slog.Any("headers", headers)) return fmt.Errorf("field index %d is out of bounds with header", fieldIndex) } @@ -142,14 +156,21 @@ func (i *IndexFile) handleCSVLine(dec *csv.Reader, headers []string, path []stri FieldLength: int(fieldLength), }) + slog.Debug("Appended index record", + slog.String("field", name), + slog.Any("value", value), + slog.Int("start", int(fieldOffset))) + case protocol.FieldTypeNull: for j := range i.Indexes { if i.Indexes[j].FieldName == name { i.Indexes[j].FieldType |= protocol.FieldTypeNull } } + slog.Debug("Marked field", "name", name) default: + slog.Error("Encountered unexpected type '%T' for field '%s'", value, name) return fmt.Errorf("unexpected type '%T'", value) } diff --git a/pkg/appendable/index_file_jsonl_test.go b/pkg/appendable/index_file_jsonl_test.go index 19f8afa5..cd015a69 100644 --- a/pkg/appendable/index_file_jsonl_test.go +++ b/pkg/appendable/index_file_jsonl_test.go @@ -9,6 +9,7 @@ import ( ) func TestAppendDataRowJSONL(t *testing.T) { + t.Run("no schema changes", func(t *testing.T) { i, err := NewIndexFile(JSONLHandler{ReadSeeker: strings.NewReader("{\"test\":\"test1\"}\n")}) diff --git a/pkg/appendable/index_file_test.go b/pkg/appendable/index_file_test.go index 900956e7..d339e5da 100644 --- a/pkg/appendable/index_file_test.go +++ b/pkg/appendable/index_file_test.go @@ -2,6 +2,8 @@ package appendable import ( "fmt" + "log/slog" + "os" "strings" "testing" @@ -21,10 +23,22 @@ jsonl <---> csv */ func TestIndexFile(t *testing.T) { + originalLogger := slog.Default() + + // Create a logger with Debug on + debugLevel := &slog.LevelVar{} + debugLevel.Set(slog.LevelDebug) + debugLogger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{ + Level: debugLevel, + })) + + slog.SetDefault(debugLogger) + + defer slog.SetDefault(originalLogger) + mockJsonl := "{\"id\":\"identification\", \"age\":\"cottoneyedjoe\"}\n" mockCsv := "id,age\nidentification,cottoneyedjoe\n" - t.Run("generate index file", func(t *testing.T) { // jsonl jif, err := NewIndexFile(JSONLHandler{ReadSeeker: strings.NewReader(mockJsonl)}) @@ -40,7 +54,7 @@ func TestIndexFile(t *testing.T) { } status, res := jif.compareTo(civ) - + if !status { t.Errorf("Not equal\n%v", res) } @@ -49,21 +63,28 @@ func TestIndexFile(t *testing.T) { } -func compareIndexRecord(ir1, ir2 *protocol.IndexRecord) (bool, string) { +func compareIndexRecord(ir1, ir2 *protocol.IndexRecord, fieldType protocol.FieldType) (bool, string) { if ir1.DataNumber != ir2.DataNumber { return false, fmt.Sprintf("Index record data numbers do not align\ti1: %v, i2: %v", ir1.DataNumber, ir2.DataNumber) } - /* - if ir1.FieldStartByteOffset != ir2.FieldStartByteOffset { - return false, fmt.Sprintf("FieldStartByteOffset do not align\ti1: %v, i2: %v", ir1.FieldStartByteOffset, ir2.FieldStartByteOffset) - } + if fieldType&protocol.FieldTypeString != protocol.FieldTypeString { + if ir1.FieldStartByteOffset != ir2.FieldStartByteOffset { + return false, fmt.Sprintf("FieldStartByteOffset do not align\ti1: %v, i2: %v", ir1.FieldStartByteOffset, ir2.FieldStartByteOffset) + } if ir1.FieldLength != ir2.FieldLength { return false, fmt.Sprintf("Field Length do not align\ti1: %v, i2: %v", ir1.FieldLength, ir2.FieldLength) } + } /* else { + if ir1.FieldStartByteOffset != ir2.FieldStartByteOffset { + return false, fmt.Sprintf("FieldStartByteOffset do not align\ti1: %v, i2: %v", ir1.FieldStartByteOffset, ir2.FieldStartByteOffset) + } - */ + if ir1.FieldLength != ir2.FieldLength { + return false, fmt.Sprintf("Field Length do not align\ti1: %v, i2: %v", ir1.FieldLength, ir2.FieldLength) + } + }*/ return true, "" } @@ -90,7 +111,7 @@ func (i1 *Index) compareIndex(i2 *Index) (bool, string) { } for i := range records1 { - status, res := compareIndexRecord(&records1[i], &records2[i]) + status, res := compareIndexRecord(&records1[i], &records2[i], i1.FieldType) if !status { return false, res } @@ -123,16 +144,15 @@ func (i1 *IndexFile) compareTo(i2 *IndexFile) (bool, string) { if len(i1.EndByteOffsets) != len(i2.EndByteOffsets) { return false, fmt.Sprintf("endbyteoffsets length not equal\ti1: %v, i2: %v", len(i1.EndByteOffsets), len(i2.EndByteOffsets)) } - + fmt.Printf("endbyteoffsets equal") if len(i1.Checksums) != len(i2.Checksums) { return false, fmt.Sprintf("checksums length not equal\ti1: %v, i2: %v", len(i1.Checksums), len(i2.Checksums)) } - fmt.Printf("checksums equal") - + /* for i, _ := range i1.EndByteOffsets { if i1.EndByteOffsets[i] != i2.EndByteOffsets[i] { diff --git a/pkg/appendable/io_test.go b/pkg/appendable/io_test.go index cfa733b2..bf71b2ca 100644 --- a/pkg/appendable/io_test.go +++ b/pkg/appendable/io_test.go @@ -8,6 +8,7 @@ import ( ) func TestReadIndexFile(t *testing.T) { + t.Run("empty index file", func(t *testing.T) { if _, err := ReadIndexFile(strings.NewReader(""), JSONLHandler{ReadSeeker: strings.NewReader("")}); !errors.Is(err, io.EOF) { t.Errorf("expected EOF, got %v", err)