Skip to content

Commit

Permalink
fix: variable file fragments (#2148)
Browse files Browse the repository at this point in the history
Previous fix #2130 assumed that all fragments had same number of data
files. This wasn't a great assumption. I've updated the fix and test
accordingly.
  • Loading branch information
wjones127 authored Apr 3, 2024
1 parent 70aa8b1 commit 3999ac6
Show file tree
Hide file tree
Showing 26 changed files with 34 additions and 24 deletions.
4 changes: 2 additions & 2 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4229,15 +4229,15 @@ mod tests {
.downcast_ref::<Int64Array>()
.unwrap()
.values(),
&[0, 4]
&[0, 4, 8, 12]
);
assert_eq!(
data["c"]
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.values(),
&[0, 5]
&[0, 5, 10, 15]
);
}

Expand Down
34 changes: 17 additions & 17 deletions rust/lance/src/io/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,39 +217,39 @@ fn fix_schema(manifest: &mut Manifest) -> Result<()> {

let mut field_id_seed = manifest.max_field_id() + 1;
let mut seen_fields = HashSet::new();
// This map is for transforming the data files' field lists
let mut file_field_mapping: HashMap<(usize, usize), i32> = HashMap::new();
// This map is for transforming the schema
let mut old_field_id_mapping: HashMap<i32, i32> = HashMap::new();
let field_ids = if let Some(fragment) = manifest.fragments.first() {
fragment
.files
.iter()
.enumerate()
.flat_map(|(f_pos, f)| std::iter::repeat(f_pos).zip(f.fields.iter().enumerate()))
fragment.files.iter().flat_map(|file| file.fields.iter())
} else {
return Ok(());
};
for (file_pos, (field_pos, field_id)) in field_ids {
for field_id in field_ids {
if !seen_fields.insert(field_id) {
let new_field_id = field_id_seed;
old_field_id_mapping.insert(*field_id, field_id_seed);
field_id_seed += 1;

file_field_mapping.insert((file_pos, field_pos), new_field_id);
old_field_id_mapping.insert(*field_id, new_field_id);
}
}

if file_field_mapping.is_empty() {
if old_field_id_mapping.is_empty() {
return Ok(());
}

let mut fragments = manifest.fragments.as_ref().clone();

// Apply mapping to fragment files list
for fragment in fragments.iter_mut() {
for ((file_pos, field_pos), new_field_id) in &file_field_mapping {
fragment.files[*file_pos].fields[*field_pos] = *new_field_id;
for fragment in fragments.iter_mut().rev() {
let mut seen_fields = HashSet::new();
for field_id in fragment
.files
.iter_mut()
.rev()
.flat_map(|file| file.fields.iter_mut())
{
if let Some(new_field_id) = old_field_id_mapping.get(field_id) {
if seen_fields.insert(*field_id) {
*field_id = *new_field_id;
}
}
}
}

Expand Down
Binary file modified test_data/v0.10.5/corrupt_schema/_latest.manifest
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
$bd1e5a74-b8f9-4f94-9cc0-93b3f0dd9be0�8
$dffa23f0-c357-4935-a9c1-e286099b5533�8
x ���������*int6408
y ���������*int6408
Binary file not shown.
Binary file modified test_data/v0.10.5/corrupt_schema/_versions/1.manifest
Binary file not shown.
Binary file modified test_data/v0.10.5/corrupt_schema/_versions/2.manifest
Binary file not shown.
Binary file modified test_data/v0.10.5/corrupt_schema/_versions/3.manifest
Binary file not shown.
Binary file modified test_data/v0.10.5/corrupt_schema/_versions/4.manifest
Binary file not shown.
Binary file modified test_data/v0.10.5/corrupt_schema/_versions/5.manifest
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
18 changes: 14 additions & 4 deletions test_data/v0.10.5/datagen.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import shutil

import lance
import pyarrow as pa

Expand All @@ -6,18 +8,26 @@

name = "corrupt_schema"

shutil.rmtree(name, ignore_errors=True)

tab = pa.table({"x": range(2)})
dataset = lance.write_dataset(tab, name, mode="create")

dataset.add_columns({"y": "x * 2", "z": "x * 3"})
dataset.add_columns({"a": "-x"})

new_data = pa.table({"x": range(2, 4), "y": [4, 6], "z": [6, 9], "a": [-2, -3]})
dataset = lance.write_dataset(new_data, name, mode="append")
dataset.drop_columns(["a", "z"])
dataset.add_columns({"b": "x * 4", "c": "x * 5"})

# This is the bug: b and c will show data from z and a.
assert dataset.to_table() == pa.table({
"x": range(2),
"y": [0, 2],
"b": [0, 3],
"c": [0, -1],
"x": range(4),
"y": [0, 2, 4, 6],
"b": [0, 3, 6, 9],
"c": [0, -1, -2, -3],
})

fragment_sizes = { len(frag.data_files()) for frag in dataset.get_fragments() }
assert fragment_sizes == {4, 2}

0 comments on commit 3999ac6

Please sign in to comment.