Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Filter out marker files from glob scan #1999

Merged
merged 3 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/daft-io/src/object_store_glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ const SCHEME_SUFFIX_LEN: usize = "://".len();
/// the `glob` utility can only be used with POSIX-style paths.
const GLOB_DELIMITER: &str = "/";

// NOTE: We use the following suffixes to filter out Spark marker files
const MARKER_SUFFIXES: [&str; 4] = ["_metadata", "_common_metadata", ".crc", "_success"];

#[derive(Clone)]
pub(crate) struct GlobState {
// Current path in dirtree and glob_fragments
Expand Down Expand Up @@ -309,6 +312,14 @@ fn _should_return(fm: &FileMetadata) -> bool {
{
false
}
// Do not return Spark marker files
FileType::File
if MARKER_SUFFIXES
.iter()
.any(|suffix| fm.filepath.to_lowercase().ends_with(suffix)) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for _metadata, _common_metadata, and _success this may be bug prone.
Imagine you have a parquet file called image_metadata. this would filter that file out. Instead you want to find an exact match of filename where filename is basepath / filename for those 3.

{
false
}
// Return all other File entries
FileType::File => true,
// Globbing does not return Directory results
Expand Down
22 changes: 22 additions & 0 deletions tests/integration/io/parquet/test_reads_s3_minio.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,25 @@ def test_minio_parquet_read_no_files(minio_io_config):

with pytest.raises(FileNotFoundError, match="Glob path had no matches:"):
daft.read_parquet("s3://data-engineering-prod/foo/**.parquet", io_config=minio_io_config)


@pytest.mark.integration()
def test_minio_parquet_ignore_marker_files(minio_io_config):
bucket_name = "data-engineering-prod"
with minio_create_bucket(minio_io_config, bucket_name=bucket_name) as fs:
target_paths = [
f"s3://{bucket_name}/Y",
f"s3://{bucket_name}/Z",
]
data = {"x": [1, 2, 3, 4]}
df = daft.from_pydict(data)
for path in target_paths:
df.write_parquet(path, io_config=minio_io_config)

marker_files = ["_metadata", "_SUCCESS", "_common_metadata", "a.crc"]
for marker in marker_files:
fs.touch(f"s3://{bucket_name}/Y/{marker}")
fs.touch(f"s3://{bucket_name}/Z/{marker}")

read = daft.read_parquet(f"s3://{bucket_name}/**", io_config=minio_io_config)
assert read.to_pydict() == {"x": [1, 2, 3, 4] * 2}
Loading