Skip to content

Commit

Permalink
Add logger (#62)
Browse files Browse the repository at this point in the history
* add zap logger

* remove zap, use slog

* global logger
  • Loading branch information
friendlymatthew authored Jan 24, 2024
1 parent 55ed423 commit 2a6030d
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 34 deletions.
51 changes: 30 additions & 21 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,32 @@ import (
"bufio"
"flag"
"fmt"
"log"
"log/slog"
"os"
"time"

"github.com/kevmo314/appendable/pkg/appendable"
)

func main() {
var debugFlag, jsonlFlag, csvFlag, showTimings bool

var jsonlFlag bool
var csvFlag bool

flag.BoolVar(&debugFlag, "debug", false, "Use logger that prints at the debug-level")
flag.BoolVar(&jsonlFlag, "jsonl", false, "Use JSONL handler")
flag.BoolVar(&csvFlag, "csv", false, "Use CSV handler")

var showTimings bool
flag.BoolVar(&showTimings, "t", false, "Show time-related metrics")

flag.Parse()

logLevel := &slog.LevelVar{}

if debugFlag {
logLevel.Set(slog.LevelDebug)
}

logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: logLevel}))
slog.SetDefault(logger)

var totalStart, readStart, writeStart time.Time
if showTimings {
totalStart = time.Now()
Expand All @@ -33,7 +41,6 @@ func main() {
flag.PrintDefaults()
os.Exit(1)
}
flag.Parse()

args := flag.Args()

Expand All @@ -47,32 +54,34 @@ func main() {
panic(err)
}


var dataHandler appendable.DataHandler

switch {
case jsonlFlag:
dataHandler = appendable.JSONLHandler{ReadSeeker: file}
dataHandler = appendable.JSONLHandler{
ReadSeeker: file,
}
case csvFlag:
dataHandler = appendable.CSVHandler{ReadSeeker: file}
dataHandler = appendable.CSVHandler{
ReadSeeker: file,
}
default:
fmt.Println("Please specify the file type with -jsonl or -csv.")
logger.Error("Please specify the file type with -jsonl or -csv.")
os.Exit(1)
}
if showTimings {
if showTimings {
readStart = time.Now()
}
// Open the index file
indexFile, err := appendable.NewIndexFile(dataHandler)

if err != nil {
panic(err)
}

if showTimings {
readDuration := time.Since(readStart)
log.Printf("Opening + synchronizing index file took: %s", readDuration)
}

if err != nil {
panic(err)
logger.Info("Opening + synchronizing index file took", slog.Duration("duration", readDuration))
}

// Write the index file
Expand All @@ -83,7 +92,7 @@ func main() {
if err != nil {
panic(err)
}
log.Printf("Writing index file to %s", args[0]+".index")
logger.Info("Writing index file to", slog.String("path", args[0]+".index"))
bufof := bufio.NewWriter(of)
if err := indexFile.Serialize(bufof); err != nil {
panic(err)
Expand All @@ -97,11 +106,11 @@ func main() {

if showTimings {
writeDuration := time.Since(writeStart)
log.Printf("Writing index file took: %s", writeDuration)
logger.Info("Writing index file took", slog.Duration("duration", writeDuration))

totalDuration := time.Since(totalStart)
log.Printf("Total execution time: %s", totalDuration)
logger.Info("Total execution time", slog.Duration("duration", totalDuration))
}

log.Printf("Done!")
logger.Info("Done!")
}
23 changes: 22 additions & 1 deletion pkg/appendable/csv_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/csv"
"fmt"
"io"
"log/slog"
"strconv"
"strings"

Expand All @@ -18,6 +19,8 @@ type CSVHandler struct {
}

func (c CSVHandler) Synchronize(f *IndexFile) error {
slog.Debug("Starting CSV synchronization")

var headers []string
var err error

Expand Down Expand Up @@ -52,24 +55,31 @@ func (c CSVHandler) Synchronize(f *IndexFile) error {
f.Checksums = append(f.Checksums, xxhash.Sum64(line))

if isHeader {
slog.Info("Parsing CSV headers")
dec := csv.NewReader(bytes.NewReader(line))
headers, err = dec.Read()
if err != nil {
slog.Error("failed to parse CSV header", "error", err)
return fmt.Errorf("failed to parse CSV header: %w", err)
}
isHeader = false
continue
}

dec := csv.NewReader(bytes.NewReader(line))
slog.Debug("Handling csv", "line", i)
f.handleCSVLine(dec, headers, []string{}, uint64(existingCount)-1, start)
slog.Info("Succesfully processed", "line", i)
}

if fromNewIndexFile && len(f.EndByteOffsets) > 0 {
f.EndByteOffsets = f.EndByteOffsets[1:]
f.Checksums = f.Checksums[1:]

slog.Debug("Trimming endbyte offsets and checksums", "endByteOffsets", slog.Any("endByteOffsets", f.EndByteOffsets), "checksums", slog.Any("checksums", f.Checksums))
}

slog.Debug("Ending CSV synchronization")
return nil
}

Expand All @@ -89,7 +99,6 @@ func fieldRankCsvField(fieldValue any) int {
}

func inferCSVField(fieldValue string) (interface{}, protocol.FieldType) {

if fieldValue == "" {
return nil, protocol.FieldTypeNull
}
Expand All @@ -110,17 +119,22 @@ func inferCSVField(fieldValue string) (interface{}, protocol.FieldType) {
}

func (i *IndexFile) handleCSVLine(dec *csv.Reader, headers []string, path []string, dataIndex, dataOffset uint64) error {
slog.Debug("Processing CSV line", slog.Int("dataIndex", int(dataIndex)), slog.Int("dataOffset", int(dataOffset)))

record, err := dec.Read()

if err != nil {
slog.Error("Failed to read CSV record at index", "dataIndex", dataIndex, "error", err)
return fmt.Errorf("failed to read CSV record at index %d: %w", dataIndex, err)
}

slog.Debug("CSV line read successfully", "record", record)

cumulativeLength := uint64(0)

for fieldIndex, fieldValue := range record {
if fieldIndex >= len(headers) {
slog.Error("Field index is out of bounds with headers", "fieldIndex", fieldIndex, "headers", slog.Any("headers", headers))
return fmt.Errorf("field index %d is out of bounds with header", fieldIndex)
}

Expand All @@ -142,14 +156,21 @@ func (i *IndexFile) handleCSVLine(dec *csv.Reader, headers []string, path []stri
FieldLength: int(fieldLength),
})

slog.Debug("Appended index record",
slog.String("field", name),
slog.Any("value", value),
slog.Int("start", int(fieldOffset)))

case protocol.FieldTypeNull:
for j := range i.Indexes {
if i.Indexes[j].FieldName == name {
i.Indexes[j].FieldType |= protocol.FieldTypeNull
}
}
slog.Debug("Marked field", "name", name)

default:
slog.Error("Encountered unexpected type '%T' for field '%s'", value, name)
return fmt.Errorf("unexpected type '%T'", value)
}

Expand Down
1 change: 1 addition & 0 deletions pkg/appendable/index_file_jsonl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
)

func TestAppendDataRowJSONL(t *testing.T) {

t.Run("no schema changes", func(t *testing.T) {

i, err := NewIndexFile(JSONLHandler{ReadSeeker: strings.NewReader("{\"test\":\"test1\"}\n")})
Expand Down
44 changes: 32 additions & 12 deletions pkg/appendable/index_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package appendable

import (
"fmt"
"log/slog"
"os"
"strings"
"testing"

Expand All @@ -21,10 +23,22 @@ jsonl <---> csv
*/
func TestIndexFile(t *testing.T) {

originalLogger := slog.Default()

// Create a logger with Debug on
debugLevel := &slog.LevelVar{}
debugLevel.Set(slog.LevelDebug)
debugLogger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
Level: debugLevel,
}))

slog.SetDefault(debugLogger)

defer slog.SetDefault(originalLogger)

mockJsonl := "{\"id\":\"identification\", \"age\":\"cottoneyedjoe\"}\n"
mockCsv := "id,age\nidentification,cottoneyedjoe\n"


t.Run("generate index file", func(t *testing.T) {
// jsonl
jif, err := NewIndexFile(JSONLHandler{ReadSeeker: strings.NewReader(mockJsonl)})
Expand All @@ -40,7 +54,7 @@ func TestIndexFile(t *testing.T) {
}

status, res := jif.compareTo(civ)

if !status {
t.Errorf("Not equal\n%v", res)
}
Expand All @@ -49,21 +63,28 @@ func TestIndexFile(t *testing.T) {

}

func compareIndexRecord(ir1, ir2 *protocol.IndexRecord) (bool, string) {
func compareIndexRecord(ir1, ir2 *protocol.IndexRecord, fieldType protocol.FieldType) (bool, string) {
if ir1.DataNumber != ir2.DataNumber {
return false, fmt.Sprintf("Index record data numbers do not align\ti1: %v, i2: %v", ir1.DataNumber, ir2.DataNumber)
}

/*
if ir1.FieldStartByteOffset != ir2.FieldStartByteOffset {
return false, fmt.Sprintf("FieldStartByteOffset do not align\ti1: %v, i2: %v", ir1.FieldStartByteOffset, ir2.FieldStartByteOffset)
}
if fieldType&protocol.FieldTypeString != protocol.FieldTypeString {
if ir1.FieldStartByteOffset != ir2.FieldStartByteOffset {
return false, fmt.Sprintf("FieldStartByteOffset do not align\ti1: %v, i2: %v", ir1.FieldStartByteOffset, ir2.FieldStartByteOffset)
}

if ir1.FieldLength != ir2.FieldLength {
return false, fmt.Sprintf("Field Length do not align\ti1: %v, i2: %v", ir1.FieldLength, ir2.FieldLength)
}
} /* else {
if ir1.FieldStartByteOffset != ir2.FieldStartByteOffset {
return false, fmt.Sprintf("FieldStartByteOffset do not align\ti1: %v, i2: %v", ir1.FieldStartByteOffset, ir2.FieldStartByteOffset)
}
*/
if ir1.FieldLength != ir2.FieldLength {
return false, fmt.Sprintf("Field Length do not align\ti1: %v, i2: %v", ir1.FieldLength, ir2.FieldLength)
}
}*/
return true, ""
}

Expand All @@ -90,7 +111,7 @@ func (i1 *Index) compareIndex(i2 *Index) (bool, string) {
}

for i := range records1 {
status, res := compareIndexRecord(&records1[i], &records2[i])
status, res := compareIndexRecord(&records1[i], &records2[i], i1.FieldType)
if !status {
return false, res
}
Expand Down Expand Up @@ -123,16 +144,15 @@ func (i1 *IndexFile) compareTo(i2 *IndexFile) (bool, string) {
if len(i1.EndByteOffsets) != len(i2.EndByteOffsets) {
return false, fmt.Sprintf("endbyteoffsets length not equal\ti1: %v, i2: %v", len(i1.EndByteOffsets), len(i2.EndByteOffsets))
}

fmt.Printf("endbyteoffsets equal")

if len(i1.Checksums) != len(i2.Checksums) {
return false, fmt.Sprintf("checksums length not equal\ti1: %v, i2: %v", len(i1.Checksums), len(i2.Checksums))
}


fmt.Printf("checksums equal")

/*
for i, _ := range i1.EndByteOffsets {
if i1.EndByteOffsets[i] != i2.EndByteOffsets[i] {
Expand Down
1 change: 1 addition & 0 deletions pkg/appendable/io_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
)

func TestReadIndexFile(t *testing.T) {

t.Run("empty index file", func(t *testing.T) {
if _, err := ReadIndexFile(strings.NewReader(""), JSONLHandler{ReadSeeker: strings.NewReader("")}); !errors.Is(err, io.EOF) {
t.Errorf("expected EOF, got %v", err)
Expand Down

0 comments on commit 2a6030d

Please sign in to comment.