Skip to content

Commit

Permalink
Merge branch 'main' into paging
Browse files Browse the repository at this point in the history
  • Loading branch information
kevmo314 authored Jan 20, 2024
2 parents f3336ae + a4f4e83 commit d739aee
Show file tree
Hide file tree
Showing 18 changed files with 1,571 additions and 436 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/example-client.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
cd -
# Build the index
go run cmd/main.go examples/workspace/green_tripdata_2023-01.jsonl
go run cmd/main.go -jsonl examples/workspace/green_tripdata_2023-01.jsonl
# Copy to client
cp examples/workspace/green_tripdata_2023-01.jsonl examples/client
Expand Down
24 changes: 22 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ import (
)

func main() {

var jsonlFlag bool
var csvFlag bool

flag.BoolVar(&jsonlFlag, "jsonl", false, "Use JSONL handler")
flag.BoolVar(&csvFlag, "csv", false, "Use CSV handler")

var showTimings bool
flag.BoolVar(&showTimings, "t", false, "Show time-related metrics")

Expand Down Expand Up @@ -40,11 +47,24 @@ func main() {
panic(err)
}

if showTimings {

var dataHandler appendable.DataHandler

switch {
case jsonlFlag:
dataHandler = appendable.JSONLHandler{ReadSeeker: file}
case csvFlag:
dataHandler = appendable.CSVHandler{ReadSeeker: file}
default:
fmt.Println("Please specify the file type with -jsonl or -csv.")
os.Exit(1)
}
if showTimings {
readStart = time.Now()
}
// Open the index file
indexFile, err := appendable.NewIndexFile(appendable.JSONLHandler{ReadSeeker: file})
indexFile, err := appendable.NewIndexFile(dataHandler)


if showTimings {
readDuration := time.Since(readStart)
Expand Down
9 changes: 7 additions & 2 deletions examples/workspace/fetch_data.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
# Data taken from https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page

import io

import pandas as pd
import requests
import io

response = requests.get('https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-01.parquet')
pd.read_parquet(io.BytesIO(response.content)).to_json('green_tripdata_2023-01.jsonl', orient='records', lines=True)

pd.read_parquet(io.BytesIO(response.content)).to_json('green_tripdata_2023-01.jsonl', orient='records', lines=True)

df = pd.read_parquet(io.BytesIO(response.content))
df.to_csv('green_tripdata_2023-01.csv', index=False)
16 changes: 16 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"devDependencies": {
"@types/jest": "^29.5.11",
"http-server": "^14.1.1",
"prettier": "^3.2.1",
"ts-jest": "^29.1.1",
"ts-node": "^10.9.2"
}
Expand Down
161 changes: 161 additions & 0 deletions pkg/appendable/csv_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package appendable

import (
"bufio"
"bytes"
"encoding/csv"
"fmt"
"io"
"strconv"
"strings"

"github.com/cespare/xxhash/v2"
"github.com/kevmo314/appendable/pkg/protocol"
)

type CSVHandler struct {
io.ReadSeeker
}

func (c CSVHandler) Synchronize(f *IndexFile) error {
var headers []string
var err error

fromNewIndexFile := false

isHeader := false

if len(f.Indexes) == 0 {
isHeader = true
fromNewIndexFile = true
} else {
for _, index := range f.Indexes {
headers = append(headers, index.FieldName)
}
}

scanner := bufio.NewScanner(f.data)

for i := 0; scanner.Scan(); i++ {
line := scanner.Bytes()

existingCount := len(f.EndByteOffsets)

// append a data range
var start uint64
if len(f.EndByteOffsets) > 0 {
start = f.EndByteOffsets[existingCount-1]
}

f.EndByteOffsets = append(f.EndByteOffsets, start+uint64(len(line))+1)

f.Checksums = append(f.Checksums, xxhash.Sum64(line))

if isHeader {
dec := csv.NewReader(bytes.NewReader(line))
headers, err = dec.Read()
if err != nil {
return fmt.Errorf("failed to parse CSV header: %w", err)
}
isHeader = false
continue
}

dec := csv.NewReader(bytes.NewReader(line))
f.handleCSVLine(dec, headers, []string{}, uint64(existingCount)-1, start)
}

if fromNewIndexFile && len(f.EndByteOffsets) > 0 {
f.EndByteOffsets = f.EndByteOffsets[1:]
f.Checksums = f.Checksums[1:]
}

return nil
}

func fieldRankCsvField(fieldValue any) int {
switch fieldValue.(type) {
case nil:
return 1
case bool:
return 2
case int, int8, int16, int32, int64, float32, float64:
return 3
case string:
return 4
default:
panic("unknown type")
}
}

func inferCSVField(fieldValue string) (interface{}, protocol.FieldType) {

if fieldValue == "" {
fmt.Printf("sir this is empty")
return nil, protocol.FieldTypeNull
}

if i, err := strconv.Atoi(fieldValue); err == nil {
return i, protocol.FieldTypeNumber
}

if f, err := strconv.ParseFloat(fieldValue, 64); err == nil {
return f, protocol.FieldTypeNumber
}

if b, err := strconv.ParseBool(fieldValue); err == nil {
return b, protocol.FieldTypeBoolean
}

return fieldValue, protocol.FieldTypeString
}

func (i *IndexFile) handleCSVLine(dec *csv.Reader, headers []string, path []string, dataIndex, dataOffset uint64) error {

record, err := dec.Read()

if err != nil {
return fmt.Errorf("failed to read CSV record at index %d: %w", dataIndex, err)
}

cumulativeLength := uint64(0)

for fieldIndex, fieldValue := range record {
if fieldIndex >= len(headers) {
return fmt.Errorf("field index %d is out of bounds with header", fieldIndex)
}

fieldName := headers[fieldIndex]
name := strings.Join(append(path, fieldName), ".")

fieldOffset := dataOffset + cumulativeLength
fieldLength := uint64(len(fieldValue))

value, fieldType := inferCSVField(fieldValue)

switch fieldType {
case protocol.FieldTypeBoolean, protocol.FieldTypeString, protocol.FieldTypeNumber:
tree := i.Indexes[i.findIndex(name, value)].IndexRecords

tree[value] = append(tree[value], protocol.IndexRecord{
DataNumber: dataIndex,
FieldStartByteOffset: uint64(fieldOffset),
FieldLength: int(fieldLength),
})

case protocol.FieldTypeNull:
for j := range i.Indexes {
if i.Indexes[j].FieldName == name {
i.Indexes[j].FieldType |= protocol.FieldTypeNull
}
}

default:
return fmt.Errorf("unexpected type '%T'", value)
}

cumulativeLength += fieldLength
}

return nil
}
95 changes: 0 additions & 95 deletions pkg/appendable/index_file.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package appendable

import (
"encoding/json"
"fmt"
"io"
"strings"

"github.com/kevmo314/appendable/pkg/protocol"
)
Expand Down Expand Up @@ -73,95 +70,3 @@ func (i *IndexFile) findIndex(name string, value any) int {
return match

}

func (i *IndexFile) handleJSONLObject(dec *json.Decoder, path []string, dataIndex, dataOffset uint64) error {
// while the next token is not }, read the key
for dec.More() {
key, err := dec.Token()
if err != nil {
return fmt.Errorf("failed to read token at index %d: %w", dataIndex, err)
}

// key must be a string
if key, ok := key.(string); !ok {
return fmt.Errorf("expected string key, got '%v'", key)
} else {
fieldOffset := dec.InputOffset() + 1 // skip the :

value, err := dec.Token()
if err != nil {
return fmt.Errorf("failed to read token: %w", err)
}

name := strings.Join(append(path, key), ".")

switch value := value.(type) {
case string, int, int8, int16, int32, int64, float32, float64, bool:
tree := i.Indexes[i.findIndex(name, value)].IndexRecords
// append this record to the list of records for this value
tree[value] = append(tree[value], protocol.IndexRecord{
DataNumber: dataIndex,
FieldStartByteOffset: dataOffset + uint64(fieldOffset),
FieldLength: int(dec.InputOffset() - fieldOffset),
})

case json.Token:
switch value {
case json.Delim('['):
for j := range i.Indexes {
if i.Indexes[j].FieldName == name {
i.Indexes[j].FieldType |= protocol.FieldTypeArray
}
}
// arrays are not indexed yet because we need to incorporate
// subindexing into the specification. however, we have to
// skip tokens until we reach the end of the array.
depth := 1
for {
t, err := dec.Token()
if err != nil {
return fmt.Errorf("failed to read token: %w", err)
}

switch t {
case json.Delim('['):
depth++
case json.Delim(']'):
depth--
}

if depth == 0 {
break
}
}
case json.Delim('{'):
// find the index to set the field type to unknown.
for j := range i.Indexes {
if i.Indexes[j].FieldName == name {
i.Indexes[j].FieldType |= protocol.FieldTypeObject
}
}
if err := i.handleJSONLObject(dec, append(path, key), dataIndex, dataOffset); err != nil {
return fmt.Errorf("failed to handle object: %w", err)
}
// read the }
if t, err := dec.Token(); err != nil || t != json.Delim('}') {
return fmt.Errorf("expected '}', got '%v'", t)
}
default:
return fmt.Errorf("unexpected token '%v'", value)
}
case nil:
// set the field to nullable if it's not already
for j := range i.Indexes {
if i.Indexes[j].FieldName == name {
i.Indexes[j].FieldType |= protocol.FieldTypeNull
}
}
default:
return fmt.Errorf("unexpected type '%T'", value)
}
}
}
return nil
}
Loading

0 comments on commit d739aee

Please sign in to comment.