Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSV demo + support null fields #67

Merged
merged 5 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/example-client.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ jobs:
# Fetch the data in workspace
cd examples/workspace
python3 -m pip install -r requirements.txt
python3 fetch_jsonl.py
python3 fetch_csv.py
cd -

# Build the index
go run cmd/main.go -jsonl examples/workspace/green_tripdata_2023-01.jsonl
go run cmd/main.go -csv examples/workspace/green_tripdata_2023-01.csv

# Copy to client
cp examples/workspace/green_tripdata_2023-01.jsonl examples/client
cp examples/workspace/green_tripdata_2023-01.jsonl.index examples/client
cp examples/workspace/green_tripdata_2023-01.csv examples/client
cp examples/workspace/green_tripdata_2023-01.csv.index examples/client

# Build the js lib
npm ci
Expand Down
12 changes: 11 additions & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ python3 fetch_jsonl.py
Then run the indexing process:

```sh
npm run build-index
# for jsonl:
npm run build-index-jsonl

# or for csv:
npm run build-index-csv
```

Copy the `.jsonl` and index file to `/client`
Expand All @@ -35,6 +39,12 @@ cp green_tripdata_2023-01.jsonl ../client
cp green_tripdata_2023-01.jsonl.index ../client
```

or for csv:
```sh
cp green_tripdata_2023-01.csv ../client
cp green_tripdata_2023-01.csv.index ../client
```

Build the AppendableDB client library:

```sh
Expand Down
6 changes: 3 additions & 3 deletions examples/client/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
<script src="appendable.min.js"></script>
<script>
Appendable.init(
"green_tripdata_2023-01.jsonl",
"green_tripdata_2023-01.jsonl.index",
Appendable.FormatType.Jsonl
"green_tripdata_2023-01.csv",
"green_tripdata_2023-01.csv.index",
Appendable.FormatType.Csv
).then(async (db) => {
let dbFields = new Set();
let fieldTypes = {};
Expand Down
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
"main": "index.js",
"scripts": {
"build": "esbuild src/index.ts --bundle --minify --sourcemap --outfile=dist/appendable.min.js",
"build-index": "go run cmd/main.go -jsonl examples/workspace/green_tripdata_2023-01.jsonl",
"build-index-jsonl": "go run cmd/main.go -jsonl examples/workspace/green_tripdata_2023-01.jsonl",
"build-index-csv": "go run cmd/main.go -csv examples/workspace/green_tripdata_2023-01.csv",
"reset": "rm -rf dist examples/client/appendable.min.js examples/client/appendable.min.js.map",
"example": "cd examples/client && go run server.go",
"test": "jest"
},
Expand Down
20 changes: 16 additions & 4 deletions pkg/appendable/csv_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (c CSVHandler) Synchronize(f *IndexFile) error {
f.handleCSVLine(dec, headers, []string{}, uint64(existingCount), start)
}

slog.Info("Succesfully processed", "line", i)
slog.Debug("Succesfully processed", "line", i)
}

if fromNewIndexFile && len(f.EndByteOffsets) > 0 {
Expand Down Expand Up @@ -137,6 +137,7 @@ func (i *IndexFile) handleCSVLine(dec *csv.Reader, headers []string, path []stri
}

fieldName := headers[fieldIndex]

name := strings.Join(append(path, fieldName), ".")

fieldOffset := dataOffset + cumulativeLength
Expand All @@ -146,26 +147,37 @@ func (i *IndexFile) handleCSVLine(dec *csv.Reader, headers []string, path []stri

switch fieldType {
case protocol.FieldTypeBoolean, protocol.FieldTypeString, protocol.FieldTypeNumber:

tree := i.Indexes[i.findIndex(name, value)].IndexRecords

tree[value] = append(tree[value], protocol.IndexRecord{
DataNumber: dataIndex,
FieldStartByteOffset: uint64(fieldOffset),
FieldLength: int(fieldLength),
})

slog.Debug("Appended index record",
slog.String("field", name),
slog.Any("value", value),
slog.Int("start", int(fieldOffset)))

case protocol.FieldTypeNull:

found := false
for j := range i.Indexes {
if i.Indexes[j].FieldName == name {
i.Indexes[j].FieldType |= protocol.FieldTypeNull

found = true
}
}

friendlymatthew marked this conversation as resolved.
Show resolved Hide resolved
if !found {
tree := i.Indexes[i.findIndex(name, value)].IndexRecords
tree[value] = append(tree[value], protocol.IndexRecord{
DataNumber: dataIndex,
FieldStartByteOffset: uint64(fieldOffset),
FieldLength: int(fieldLength),
})
}

slog.Debug("Marked field", "name", name)

default:
Expand Down
4 changes: 4 additions & 0 deletions pkg/appendable/index_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ func fieldType(data any) protocol.FieldType {
return protocol.FieldTypeBoolean
case []any:
return protocol.FieldTypeArray
case nil:
return protocol.FieldTypeNull
default:
return protocol.FieldTypeObject
}
Expand All @@ -53,8 +55,10 @@ func (i *IndexFile) findIndex(name string, value any) int {
break
}
}

// if the index doesn't exist, create it
ft := fieldType(value)

if match == -1 {
index := Index{
FieldName: name,
Expand Down
24 changes: 24 additions & 0 deletions pkg/appendable/index_file_csv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,30 @@ func TestAppendDataRowCSV(t *testing.T) {

})

t.Run("record null columns", func(t *testing.T) {

i, err := NewIndexFile(CSVHandler{ReadSeeker: strings.NewReader("null1,null2\n,\n,\n,\n,\n")})
if err != nil {
t.Fatal(err)
}

buf := &bytes.Buffer{}

if err := i.Serialize(buf); err != nil {
t.Fatal(err)
}

fmt.Printf("index file looks like: %v", i.Indexes)

if len(i.Indexes) != 2 {
t.Errorf("got len(i.Indexes) = %d, want 2", len(i.Indexes))
}

if i.Indexes[0].FieldType != protocol.FieldTypeNull {
t.Errorf("got %d, wanted protocol.FieldTypeNull", i.Indexes[0].FieldType)
}
})

t.Run("multiple headers", func(t *testing.T) {

i, err := NewIndexFile(CSVHandler{ReadSeeker: strings.NewReader("name,move\nmica,coyote\n")})
Expand Down
25 changes: 25 additions & 0 deletions pkg/appendable/index_file_jsonl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package appendable

import (
"bytes"
"fmt"
"strings"
"testing"

Expand Down Expand Up @@ -304,4 +305,28 @@ func TestAppendDataRowJSONL(t *testing.T) {
t.Errorf("got i.Indexes[0].FieldType = %#v, want protocol.FieldTypeNullableString", j.Indexes[0].FieldType)
}
})

t.Run("record null columns", func(t *testing.T) {

i, err := NewIndexFile(JSONLHandler{ReadSeeker: strings.NewReader("{\"test\":null}\n{\"test\":null}")})
if err != nil {
t.Fatal(err)
}

buf := &bytes.Buffer{}

if err := i.Serialize(buf); err != nil {
t.Fatal(err)
}

fmt.Printf("index file looks like: %v", i.Indexes)

if len(i.Indexes) != 1 {
t.Errorf("got len(i.Indexes) = %d, want 1", len(i.Indexes))
}

if i.Indexes[0].FieldType != protocol.FieldTypeNull {
t.Errorf("got %d, wanted protocol.FieldTypeNull", i.Indexes[0].FieldType)
}
})
}
15 changes: 15 additions & 0 deletions pkg/appendable/jsonl_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,27 @@ func (i *IndexFile) handleJSONLObject(dec *json.Decoder, path []string, dataInde
return fmt.Errorf("unexpected token '%v'", value)
}
case nil:

found := false

// set the field to nullable if it's not already
for j := range i.Indexes {
if i.Indexes[j].FieldName == name {
i.Indexes[j].FieldType |= protocol.FieldTypeNull
found = true
}
}

if !found {
tree := i.Indexes[i.findIndex(name, value)].IndexRecords
// append this record to the list of records for this value
tree[value] = append(tree[value], protocol.IndexRecord{
DataNumber: dataIndex,
FieldStartByteOffset: dataOffset + uint64(fieldOffset),
FieldLength: int(dec.InputOffset() - fieldOffset),
})
}

default:
return fmt.Errorf("unexpected type '%T'", value)
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,10 @@ func InferCSVField(fieldValue string) (interface{}, FieldType) {
}

if i, err := strconv.Atoi(fieldValue); err == nil {

fmt.Printf("\n%v is a integer\n", fieldValue)
return float64(i), FieldTypeNumber
}

if f, err := strconv.ParseFloat(fieldValue, 64); err == nil {

fmt.Printf("\n%v is a float\n", fieldValue)
return float64(f), FieldTypeNumber
}

Expand Down
72 changes: 40 additions & 32 deletions src/database.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { FormatType } from ".";
import { DataFile } from "./data-file";
import { VersionedIndexFile } from "./index-file";
import { IndexFile, VersionedIndexFile } from "./index-file";

type Schema = {
[key: string]: {};
Expand Down Expand Up @@ -37,14 +37,16 @@ export function containsType(fieldType: bigint, desiredType: FieldType) {
return (fieldType & BigInt(desiredType)) !== BigInt(0);
}

function parseIgnoringSuffix(x: string, format: FormatType) {
function parseIgnoringSuffix(
x: string,
format: FormatType,
headerFields: string[]
) {
switch (format) {
case FormatType.Jsonl:
try {
console.log("parsing no error", JSON.parse(x));
return JSON.parse(x);
} catch (error) {
console.log("registered as an error");
const e = error as Error;
let m = e.message.match(/position\s+(\d+)/);
if (m) {
Expand All @@ -56,33 +58,34 @@ function parseIgnoringSuffix(x: string, format: FormatType) {
return JSON.parse(x);

case FormatType.Csv:
try {
console.log("parsing no error", parseCsvLine(x));
return parseCsvLine(x);
} catch (error) {
console.log("registered as an error");
let lastCompleteLine = findLastCompleteCsvLine(x);
console.log(lastCompleteLine);
return parseCsvLine(lastCompleteLine);
}
}
}
const fields = x.split(",");

export function parseCsvLine(line: string) {
console.log("parsing csv: ");
let fields: string[] = line.split(",");
if (fields.length === 2) {
friendlymatthew marked this conversation as resolved.
Show resolved Hide resolved
x = fields[0];
return JSON.parse(x);
} else {
const newlinePos = x.indexOf("\n");
const result = newlinePos !== -1 ? x.substring(0, newlinePos) : x;
const csvFields = result.split(",");

fields.forEach((field) => {
if (field.length > 0) {
console.log("parsing: ", field);
return JSON.parse(field);
}
});
// assert lengths are equal
if (csvFields.length === headerFields.length) {
return buildJsonFromCsv(csvFields, headerFields);
} else {
return result;
}
}
}
}

function findLastCompleteCsvLine(data: string) {
let lastNewlineIndex = data.lastIndexOf("\n");
return lastNewlineIndex >= 0 ? data.slice(0, lastNewlineIndex) : data;
function buildJsonFromCsv(csvFields: string[], headerFields: string[]) {
return headerFields.reduce<{ [key: string]: string }>(
(acc, header, index) => {
acc[header] = csvFields[index];
return acc;
},
{}
);
}

function fieldRank(token: any) {
Expand Down Expand Up @@ -147,6 +150,7 @@ export class Database<T extends Schema> {
}
// convert each of the where nodes into a range of field values.
const headers = await this.indexFile.indexHeaders();
const headerFields = headers.map((header) => header.fieldName);
const fieldRanges = await Promise.all(
(query.where ?? []).map(async ({ key, value, operation }) => {
const header = headers.find((header) => header.fieldName === key);
Expand All @@ -165,8 +169,11 @@ export class Database<T extends Schema> {
indexRecord.fieldStartByteOffset,
indexRecord.fieldStartByteOffset + indexRecord.fieldLength
);
console.log("data looks like: ", data);
const dataFieldValue = parseIgnoringSuffix(data, this.formatType);
const dataFieldValue = parseIgnoringSuffix(
data,
this.formatType,
headerFields
);
console.log(mid, dataFieldValue);
if (cmp(value, dataFieldValue) < 0) {
end = mid;
Expand All @@ -191,7 +198,8 @@ export class Database<T extends Schema> {
indexRecord.fieldStartByteOffset,
indexRecord.fieldStartByteOffset + indexRecord.fieldLength
),
this.formatType
this.formatType,
headerFields
);
if (cmp(value, dataFieldValue) < 0) {
end = mid;
Expand Down Expand Up @@ -237,7 +245,6 @@ export class Database<T extends Schema> {
}
}

console.log("Field ranges: ", fieldRanges);
// evaluate the field ranges in order.
for (const [key, [start, end]] of fieldRangesSorted) {
// check if the iteration order should be reversed.
Expand All @@ -257,7 +264,8 @@ export class Database<T extends Schema> {
dataRecord.startByteOffset,
dataRecord.endByteOffset
),
this.formatType
this.formatType,
headerFields
);

let dataFieldValue = parsedFieldValue;
Expand Down
Loading
Loading