Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add 2.1 read path #2968

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ rand = { version = "0.8.3", features = ["small_rng"] }
rangemap = { version = "1.0" }
rayon = "1.10"
roaring = "0.10.1"
rstest = "0.19.0"
rustc_version = "0.4"
serde = { version = "^1" }
serde_json = { version = "1" }
Expand Down
5 changes: 5 additions & 0 deletions protos/encodings.proto
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,13 @@ message MiniBlockLayout {
ArrayEncoding value_compression = 3;
}

message AllNullLayout {

}

message PageLayout {
oneof layout {
MiniBlockLayout mini_block_layout = 1;
AllNullLayout all_null_layout = 2;
}
}
222 changes: 222 additions & 0 deletions python/python/benchmarks/test_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors
from pathlib import Path

import pyarrow as pa
import pytest
from lance.file import LanceFileReader, LanceFileWriter
from lance.tracing import trace_to_chrome

trace_to_chrome(level="debug", file="/tmp/trace.json")

NUM_ROWS = 10_000_000
ROWS_TO_SAMPLE = 10


@pytest.mark.parametrize(
"version",
["2.0", "2.1"],
ids=["2_0", "2_1"],
)
@pytest.mark.benchmark(group="scan_single_column")
def test_scan_integer(tmp_path: Path, benchmark, version):
schema = pa.schema([pa.field("values", pa.uint64(), True)])

def gen_data():
remaining = NUM_ROWS
offset = 0
while remaining > 0:
to_take = min(remaining, 10000)
values = pa.array(range(offset, offset + to_take))
batch = pa.table({"values": values}).to_batches()[0]
yield batch
remaining -= to_take
offset += to_take

with LanceFileWriter(
str(tmp_path / "file.lance"), schema, version=version
) as writer:
for batch in gen_data():
writer.write_batch(batch)

def read_all():
reader = LanceFileReader(str(tmp_path / "file.lance"))
return reader.read_all(batch_size=16 * 1024).to_table()

result = benchmark.pedantic(read_all, rounds=1, iterations=1)

assert result.num_rows == NUM_ROWS


@pytest.mark.parametrize(
"version",
["2.0", "2.1"],
ids=["2_0", "2_1"],
)
@pytest.mark.benchmark(group="scan_single_column")
def test_scan_nullable_integer(tmp_path: Path, benchmark, version):
schema = pa.schema([pa.field("values", pa.uint64(), True)])

def gen_data():
remaining = NUM_ROWS
offset = 0
while remaining > 0:
to_take = min(remaining, 10000)
values = pa.array(
[None if i % 2 == 0 else i for i in range(offset, offset + to_take)]
)
batch = pa.table({"values": values}).to_batches()[0]
yield batch
remaining -= to_take
offset += to_take

with LanceFileWriter(
str(tmp_path / "file.lance"), schema, version=version
) as writer:
for batch in gen_data():
writer.write_batch(batch)

def read_all():
reader = LanceFileReader(str(tmp_path / "file.lance"))
return reader.read_all(batch_size=16 * 1024).to_table()

result = benchmark.pedantic(read_all, rounds=1, iterations=1)

assert result.num_rows == NUM_ROWS


@pytest.mark.benchmark(group="scan_single_column")
def test_scan_nested_integer(tmp_path: Path, benchmark):
def get_val(i: int):
if i % 4 == 0:
return None
elif i % 4 == 1:
return {"outer": None}
elif i % 4 == 2:
return {"outer": {"inner": None}}
else:
return {"outer": {"inner": i}}

dtype = pa.struct(
[pa.field("outer", pa.struct([pa.field("inner", pa.uint64(), True)]), True)]
)
schema = pa.schema(
[
pa.field(
"values",
dtype,
True,
)
]
)

def gen_data():
remaining = NUM_ROWS
offset = 0
while remaining > 0:
to_take = min(remaining, 10000)
values = pa.array([get_val(i) for i in range(offset, offset + to_take)])
batch = pa.table({"values": values}).to_batches()[0]
yield batch
remaining -= to_take
offset += to_take

with LanceFileWriter(str(tmp_path / "file.lance"), schema, version="2.1") as writer:
for batch in gen_data():
writer.write_batch(batch)

def read_all():
reader = LanceFileReader(str(tmp_path / "file.lance"))
return reader.read_all(batch_size=16 * 1024).to_table()

result = benchmark.pedantic(read_all, rounds=1, iterations=1)

assert result.num_rows == NUM_ROWS


@pytest.mark.parametrize(
"version",
["2.0", "2.1"],
ids=["2_0", "2_1"],
)
@pytest.mark.benchmark(group="sample_single_column")
def test_sample_integer(tmp_path: Path, benchmark, version):
schema = pa.schema([pa.field("values", pa.uint64(), True)])

def gen_data():
remaining = NUM_ROWS
offset = 0
while remaining > 0:
to_take = min(remaining, 10000)
values = pa.array(range(offset, offset + to_take))
batch = pa.table({"values": values}).to_batches()[0]
yield batch
remaining -= to_take
offset += to_take

with LanceFileWriter(
str(tmp_path / "file.lance"), schema, version=version
) as writer:
for batch in gen_data():
writer.write_batch(batch)

reader = LanceFileReader(str(tmp_path / "file.lance"))
indices = list(range(0, NUM_ROWS, NUM_ROWS // ROWS_TO_SAMPLE))

def sample():
return reader.take_rows(indices).to_table()

result = benchmark.pedantic(sample, rounds=30, iterations=1)

assert result.num_rows == NUM_ROWS


@pytest.mark.benchmark(group="sample_single_column")
def test_sample_nested_integer(tmp_path: Path, benchmark):
def get_val(i: int):
if i % 4 == 0:
return None
elif i % 4 == 1:
return {"outer": None}
elif i % 4 == 2:
return {"outer": {"inner": None}}
else:
return {"outer": {"inner": i}}

dtype = pa.struct(
[pa.field("outer", pa.struct([pa.field("inner", pa.uint64(), True)]), True)]
)
schema = pa.schema(
[
pa.field(
"values",
dtype,
True,
)
]
)

def gen_data():
remaining = NUM_ROWS
offset = 0
while remaining > 0:
to_take = min(remaining, 10000)
values = pa.array([get_val(i) for i in range(offset, offset + to_take)])
batch = pa.table({"values": values}).to_batches()[0]
yield batch
remaining -= to_take
offset += to_take

with LanceFileWriter(str(tmp_path / "file.lance"), schema, version="2.1") as writer:
for batch in gen_data():
writer.write_batch(batch)

reader = LanceFileReader(str(tmp_path / "file.lance"))
indices = list(range(0, NUM_ROWS, NUM_ROWS // ROWS_TO_SAMPLE))

def sample():
return reader.take_rows(indices).to_table()

result = benchmark.pedantic(sample, rounds=30, iterations=1)

assert result.num_rows == NUM_ROWS
16 changes: 7 additions & 9 deletions python/python/tests/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,13 @@ def test_version(tmp_path):
assert metadata.major_version == 0
assert metadata.minor_version == 3

# TODO: Temporarily disabled until read path for 2.1 is added
#
# path = tmp_path / "foo2.lance"
# with LanceFileWriter(str(path), schema, version="2.1") as writer:
# writer.write_batch(pa.table({"a": [1, 2, 3]}))
# reader = LanceFileReader(str(path))
# metadata = reader.metadata()
# assert metadata.major_version == 2
# assert metadata.minor_version == 1
path = tmp_path / "foo2.lance"
with LanceFileWriter(str(path), schema, version="2.1") as writer:
writer.write_batch(pa.table({"a": [1, 2, 3]}))
reader = LanceFileReader(str(path))
metadata = reader.metadata()
assert metadata.major_version == 2
assert metadata.minor_version == 1


def test_take(tmp_path):
Expand Down
7 changes: 4 additions & 3 deletions python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ use bytes::Bytes;
use futures::stream::StreamExt;
use lance::io::{ObjectStore, RecordBatchStream};
use lance_core::cache::FileMetadataCache;
use lance_encoding::decoder::{DecoderMiddlewareChain, FilterExpression};
use lance_encoding::decoder::{DecoderPlugins, FilterExpression};
use lance_file::{
v2::{
reader::{BufferDescriptor, CachedFileMetadata, FileReader},
reader::{BufferDescriptor, CachedFileMetadata, FileReader, FileReaderOptions},
writer::{FileWriter, FileWriterOptions},
},
version::LanceFileVersion,
Expand Down Expand Up @@ -335,8 +335,9 @@ impl LanceFileReader {
let inner = FileReader::try_open(
file,
None,
Arc::<DecoderMiddlewareChain>::default(),
Arc::<DecoderPlugins>::default(),
&FileMetadataCache::no_cache(),
FileReaderOptions::default(),
)
.await
.infer_error()?;
Expand Down
1 change: 1 addition & 0 deletions rust/lance-core/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ lazy_static::lazy_static! {
]);
pub static ref BLOB_DESC_FIELD: ArrowField =
ArrowField::new("description", DataType::Struct(BLOB_DESC_FIELDS.clone()), false);
pub static ref BLOB_DESC_LANCE_FIELD: Field = Field::try_from(&*BLOB_DESC_FIELD).unwrap();
}

/// LogicalType is a string presentation of arrow type.
Expand Down
Loading
Loading