Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support CSV #6

Merged
merged 12 commits into from
Jan 15, 2024
24 changes: 22 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ import (
)

func main() {

var jsonlFlag bool
var csvFlag bool

flag.BoolVar(&jsonlFlag, "jsonl", false, "Use JSONL handler")
flag.BoolVar(&csvFlag, "csv", false, "Use CSV handler")

var showTimings bool
flag.BoolVar(&showTimings, "t", false, "Show time-related metrics")

Expand Down Expand Up @@ -40,11 +47,24 @@ func main() {
panic(err)
}

if showTimings {

var dataHandler appendable.DataHandler

switch {
case jsonlFlag:
dataHandler = appendable.JSONLHandler{ReadSeeker: file}
case csvFlag:
dataHandler = appendable.CSVHandler{ReadSeeker: file}
default:
fmt.Println("Please specify the file type with -jsonl or -csv.")
os.Exit(1)
}
if showTimings {
readStart = time.Now()
}
// Open the index file
indexFile, err := appendable.NewIndexFile(appendable.JSONLHandler{ReadSeeker: file})
indexFile, err := appendable.NewIndexFile(dataHandler)


if showTimings {
readDuration := time.Since(readStart)
Expand Down
9 changes: 7 additions & 2 deletions examples/workspace/fetch_data.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
# Data taken from https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page

import io

import pandas as pd
import requests
import io

response = requests.get('https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-01.parquet')
pd.read_parquet(io.BytesIO(response.content)).to_json('green_tripdata_2023-01.jsonl', orient='records', lines=True)

pd.read_parquet(io.BytesIO(response.content)).to_json('green_tripdata_2023-01.jsonl', orient='records', lines=True)

df = pd.read_parquet(io.BytesIO(response.content))
df.to_csv('green_tripdata_2023-01.csv', index=False)
68,212 changes: 68,212 additions & 0 deletions examples/workspace/green_tripdata_2023-01.csv

Large diffs are not rendered by default.

167 changes: 167 additions & 0 deletions pkg/appendable/csv_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package appendable

import (
"bufio"
"bytes"
"encoding/csv"
"fmt"
"io"
"strconv"
"strings"

"github.com/cespare/xxhash/v2"
"github.com/kevmo314/appendable/pkg/protocol"
)

type CSVHandler struct {
io.ReadSeeker
}

func (c CSVHandler) Synchronize(f *IndexFile) error {

var headers []string
var err error

isHeader := true
scanner := bufio.NewScanner(f.data)

for i := 0; scanner.Scan(); i++ {
line := scanner.Bytes()

existingCount := len(f.EndByteOffsets)

// append a data range
var start uint64
if len(f.EndByteOffsets) > 0 {
start = f.EndByteOffsets[existingCount-1]
}

f.EndByteOffsets = append(f.EndByteOffsets, start+uint64(len(line))+1)

f.Checksums = append(f.Checksums, xxhash.Sum64(line))

if i == 0 {
fmt.Printf("Header %d - StartOffset: %d, EndOffset: %d, Checksum: %d\n\n", i, start, start+uint64(len(line))+1, xxhash.Sum64(line))

} else {
fmt.Printf("Line %d - StartOffset: %d, EndOffset: %d, Checksum: %d\n", i, start, start+uint64(len(line))+1, xxhash.Sum64(line))
}

if isHeader {
dec := csv.NewReader(bytes.NewReader(line))
headers, err = dec.Read()
if err != nil {
return fmt.Errorf("failed to parse CSV header: %w", err)
}
isHeader = false
continue
}

dec := csv.NewReader(bytes.NewReader(line))
f.handleCSVLine(dec, headers, []string{}, uint64(existingCount), start)
}

fmt.Printf("%v\n\n", f.Checksums)
return nil
}

func fieldRankCsvField(fieldValue any) int {
fieldStr, ok := fieldValue.(string)

if !ok {
panic("unknown type")
}

if fieldStr == "" {
return 1
}

if _, err := strconv.Atoi(fieldStr); err == nil {
return 3
}

if _, err := strconv.ParseFloat(fieldStr, 64); err == nil {
return 3
}

if _, err := strconv.ParseBool(fieldStr); err == nil {
return 2
}

return 4
}

func inferCSVField(fieldValue string) (interface{}, protocol.FieldType) {

if fieldValue == "" {
return nil, protocol.FieldTypeNull
}

if i, err := strconv.Atoi(fieldValue); err == nil {
return i, protocol.FieldTypeNumber
}

if f, err := strconv.ParseFloat(fieldValue, 64); err == nil {
return f, protocol.FieldTypeNumber
}

if b, err := strconv.ParseBool(fieldValue); err == nil {
return b, protocol.FieldTypeBoolean
}

return fieldValue, protocol.FieldTypeString
}

func (i *IndexFile) handleCSVLine(dec *csv.Reader, headers []string, path []string, dataIndex, dataOffset uint64) error {

record, err := dec.Read()

if err != nil {
return fmt.Errorf("failed to read CSV record at index %d: %w", dataIndex, err)
}

cumulativeLength := uint64(0)

for fieldIndex, fieldValue := range record {
if fieldIndex >= len(headers) {
return fmt.Errorf("field index %d is out of bounds with header", fieldIndex)
}

fieldName := headers[fieldIndex]
name := strings.Join(append(path, fieldName), ".")

fieldOffset := dataOffset + cumulativeLength
fieldLength := uint64(len(fieldValue))

value, fieldType := inferCSVField(fieldValue)

fmt.Printf("Field '%s' - Offset: %d, Length: %d, Value: %v, Type: %v\n", fieldName, fieldOffset, fieldLength, value, fieldType)

switch fieldType {
case protocol.FieldTypeBoolean, protocol.FieldTypeString, protocol.FieldTypeNumber:
tree := i.Indexes[i.findIndex(name, value)].IndexRecords

tree[value] = append(tree[value], protocol.IndexRecord{
DataNumber: dataIndex,
FieldStartByteOffset: uint64(fieldOffset),
FieldLength: int(fieldLength),
})

case protocol.FieldTypeNull:
for j := range i.Indexes {
if i.Indexes[j].FieldName == name {
i.Indexes[j].FieldType |= protocol.FieldTypeNull
}
}

default:
return fmt.Errorf("unexpected type '%T'", value)
}

cumulativeLength += fieldLength
}

fmt.Printf("\n")

return nil
}
95 changes: 0 additions & 95 deletions pkg/appendable/index_file.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package appendable

import (
"encoding/json"
"fmt"
"io"
"strings"

"github.com/kevmo314/appendable/pkg/protocol"
)
Expand Down Expand Up @@ -73,95 +70,3 @@ func (i *IndexFile) findIndex(name string, value any) int {
return match

}

func (i *IndexFile) handleJSONLObject(dec *json.Decoder, path []string, dataIndex, dataOffset uint64) error {
// while the next token is not }, read the key
for dec.More() {
key, err := dec.Token()
if err != nil {
return fmt.Errorf("failed to read token at index %d: %w", dataIndex, err)
}

// key must be a string
if key, ok := key.(string); !ok {
return fmt.Errorf("expected string key, got '%v'", key)
} else {
fieldOffset := dec.InputOffset() + 1 // skip the :

value, err := dec.Token()
if err != nil {
return fmt.Errorf("failed to read token: %w", err)
}

name := strings.Join(append(path, key), ".")

switch value := value.(type) {
case string, int, int8, int16, int32, int64, float32, float64, bool:
tree := i.Indexes[i.findIndex(name, value)].IndexRecords
// append this record to the list of records for this value
tree[value] = append(tree[value], protocol.IndexRecord{
DataNumber: dataIndex,
FieldStartByteOffset: dataOffset + uint64(fieldOffset),
FieldLength: int(dec.InputOffset() - fieldOffset),
})

case json.Token:
switch value {
case json.Delim('['):
for j := range i.Indexes {
if i.Indexes[j].FieldName == name {
i.Indexes[j].FieldType |= protocol.FieldTypeArray
}
}
// arrays are not indexed yet because we need to incorporate
// subindexing into the specification. however, we have to
// skip tokens until we reach the end of the array.
depth := 1
for {
t, err := dec.Token()
if err != nil {
return fmt.Errorf("failed to read token: %w", err)
}

switch t {
case json.Delim('['):
depth++
case json.Delim(']'):
depth--
}

if depth == 0 {
break
}
}
case json.Delim('{'):
// find the index to set the field type to unknown.
for j := range i.Indexes {
if i.Indexes[j].FieldName == name {
i.Indexes[j].FieldType |= protocol.FieldTypeObject
}
}
if err := i.handleJSONLObject(dec, append(path, key), dataIndex, dataOffset); err != nil {
return fmt.Errorf("failed to handle object: %w", err)
}
// read the }
if t, err := dec.Token(); err != nil || t != json.Delim('}') {
return fmt.Errorf("expected '}', got '%v'", t)
}
default:
return fmt.Errorf("unexpected token '%v'", value)
}
case nil:
// set the field to nullable if it's not already
for j := range i.Indexes {
if i.Indexes[j].FieldName == name {
i.Indexes[j].FieldType |= protocol.FieldTypeNull
}
}
default:
return fmt.Errorf("unexpected type '%T'", value)
}
}
}
return nil
}
Loading