Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HadoopInputFile to pass down FileStatus when opening file #2955

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion parquet-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,11 @@
<artifactId>zstd-jni</artifactId>
<version>${zstd-jni.version}</version>
</dependency>

<dependency>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to revisit why this got in

<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>${jsr305.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
import org.apache.parquet.hadoop.util.HadoopFileIO;

public class HadoopFSKeyMaterialStore implements FileKeyMaterialStore {
public static final String KEY_MATERIAL_FILE_PREFIX = "_KEY_MATERIAL_FOR_";
Expand Down Expand Up @@ -73,7 +74,7 @@ public String getKeyMaterial(String keyIDInFile) throws ParquetCryptoRuntimeExce
}

private void loadKeyMaterialMap() {
try (FSDataInputStream keyMaterialStream = hadoopFileSystem.open(keyMaterialFile)) {
try (FSDataInputStream keyMaterialStream = HadoopFileIO.openFile(hadoopFileSystem, keyMaterialFile, false)) {
JsonNode keyMaterialJson = objectMapper.readTree(keyMaterialStream);
keyMaterialMap =
objectMapper.readValue(keyMaterialJson.traverse(), new TypeReference<Map<String, String>>() {});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopFileIO;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
import org.apache.parquet.hadoop.util.wrapped.io.FutureIO;
Expand Down Expand Up @@ -438,19 +439,37 @@ public static List<Footer> readSummaryFile(Configuration configuration, FileStat
return footersFromSummaryFile(parent, mergedFooters);
}

/**
* Read the summary metadata for a path, first looking for {@code basePath/_common_metadata}
* if {@code skipRowGroups} is true.
* If row groups are not to be skipped, or the common file was not found, look for {@code basePath/_metadata},
* @param configuration configuration to use
* @param basePath parent path
* @param skipRowGroups should row groups be excluded from the summary
* @return the metadata or null if no summary was found.
* @throws IOException failure to load a summary.
*/
static ParquetMetadata readSummaryMetadata(Configuration configuration, Path basePath, boolean skipRowGroups)
throws IOException {
Path metadataFile = new Path(basePath, PARQUET_METADATA_FILE);
Path commonMetaDataFile = new Path(basePath, PARQUET_COMMON_METADATA_FILE);
FileSystem fileSystem = basePath.getFileSystem(configuration);
if (skipRowGroups && fileSystem.exists(commonMetaDataFile)) {
if (skipRowGroups) {
// reading the summary file that does not contain the row groups
LOG.info("reading summary file: {}", commonMetaDataFile);
return readFooter(configuration, commonMetaDataFile, filter(skipRowGroups));
} else if (fileSystem.exists(metadataFile)) {
// fetch the file status to save another probe when opening the file
FileStatus commonFileStatus = HadoopFileIO.getFileStatusOrNull(fileSystem, commonMetaDataFile);
if (commonFileStatus != null) {
LOG.info("reading summary file: {}", commonMetaDataFile);
return readFooter(configuration, commonFileStatus, filter(true));
}
}
// row groups required of the common medata data not found: try to read the file specific metadata;
FileStatus metadataFileStatus = HadoopFileIO.getFileStatusOrNull(fileSystem, metadataFile);
if (metadataFileStatus != null) {
LOG.info("reading summary file: {}", metadataFile);
return readFooter(configuration, metadataFile, filter(skipRowGroups));
return readFooter(configuration, metadataFileStatus, filter(skipRowGroups));
} else {
// no metadata files found
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.parquet.hadoop.util;

import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.util.wrapped.io.DynamicWrappedIO;

/**
* Class for enhanced FileIO, calling {@link DynamicWrappedIO} as appropriate.
* If/when Parquet sets a baseline release with the relevant methods directly
* invocable, this class is where changes would need to be made.
*/
public class HadoopFileIO {

/**
* Get the status of file; if the file is not found downgrade to null.
* @param fileSystem FileSystem to use
* @param filePath file to load
* @return the status or null
* @throws IOException IO failure other than FileNotFoundException
*/
public static FileStatus getFileStatusOrNull(final FileSystem fileSystem, final Path filePath) throws IOException {
final FileStatus commonFileStatus;
try {
commonFileStatus = fileSystem.getFileStatus(filePath);
} catch (FileNotFoundException e) {
// file does not exist

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a DEBUG log here if we are not throwing the exception.

return null;
}
return commonFileStatus;
}

/**
* Open a file for reading using a read policy appropriate for the purpose,
* passing in a status object containing filename, length and possibly more
* <p>
* Note that for filesystems with lazy IO, existence checks may be delayed until
* the first read() operation.
*
* @param fileSystem FileSystem to use
* @param status file status
* @param randomIO is the file parquet file to be read with random IO?
*
* @return an input stream.
*
* @throws IOException failure to open the file.
*/
public static FSDataInputStream openFile(
final FileSystem fileSystem, final FileStatus status, final boolean randomIO) throws IOException {
return DynamicWrappedIO.openFile(fileSystem, status.getPath(), status, readPolicies(randomIO));
}

/**
* Open a file for reading using a read policy appropriate for the purpose.
* <p>
* Note that for filesystems with lazy IO, existence checks may be delayed until
* the first read() operation.
*
* @param fileSystem FileSystem to use
* @param path path to file
* @param randomIO is the file parquet file to be read with random IO?
*
* @return an input stream.
*
* @throws IOException failure to open the file.
*/
public static FSDataInputStream openFile(final FileSystem fileSystem, final Path path, final boolean randomIO)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The readPolicies method's parameter randomIO could be confusing. It might be more clear if renamed to useRandomIO or isRandomIO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or we could take a full string list of read policies, which is what is really happening underneath. Hadoop 3.4.1 explicitly adds "parquet" as an input format to tell the FS to optimise for that (footer caching, assume random IO everywhere else...)

throws IOException {
return DynamicWrappedIO.openFile(fileSystem, path, null, readPolicies(randomIO));
}

/**
* Choose the read policies for the desired purpose.
* @param randomIO is the file parquet file to be read with random IO?
* @return appropriate policy string
*/
private static String readPolicies(final boolean randomIO) {
return randomIO ? DynamicWrappedIO.PARQUET_READ_POLICIES : DynamicWrappedIO.SEQUENTIAL_READ_POLICIES;
}

/**
* Does a path have a given capability?
* Calls {@code PathCapabilities#hasPathCapability(Path, String)},
* mapping IOExceptions to false.
* @param fs filesystem
* @param path path to query the capability of.
* @param capability non-null, non-empty string to query the path for support.
* @return true if the capability is supported
* under that part of the FS
* false if the method is not loaded or the path lacks the capability.
* @throws IllegalArgumentException invalid arguments
*/
public boolean hasPathCapability(Object fs, Path path, String capability) {
return DynamicWrappedIO.instance().pathCapabilities_hasPathCapability(fs, path, capability);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,37 @@
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.SeekableInputStream;

/**
* Create a Hadoop input file.
*/
public class HadoopInputFile implements InputFile {

private final FileSystem fs;
private final FileStatus stat;
private final Configuration conf;

/**
* Open from a path.
* <p>
* Includes a {@code getFileStatus()} call to fill in {@link #stat}.
* @param path path of the file
* @param conf configuration.
* @return the input file
* @throws IOException IO failure creating the FS or retrieving the FileStatus
*/
public static HadoopInputFile fromPath(Path path, Configuration conf) throws IOException {
FileSystem fs = path.getFileSystem(conf);
return new HadoopInputFile(fs, fs.getFileStatus(path), conf);
}

/**
* Create from path raising an RTE on any IO failure.
* See {@link #fromPath(Path, Configuration)}
* @param path path of the file
* @param conf configuration.
* @return the input file
* @throws RuntimeException IO failure creating the FS or retrieving the FileStatus
*/
public static HadoopInputFile fromPathUnchecked(Path path, Configuration conf) {
try {
return fromPath(path, conf);
Expand All @@ -46,6 +66,16 @@ public static HadoopInputFile fromPathUnchecked(Path path, Configuration conf) {
}
}

/**
* Create from a file status; determing both the path and the file length.
* <p>
* When opening a file from an object store, this may eliminate the overhead
* of a HEAD request when later opening the file.
* @param stat status of the file
* @param conf configuration
* @return the input file
* @throws IOException IO failure creating the FS or retrieving the FileStatus
*/
public static HadoopInputFile fromStatus(FileStatus stat, Configuration conf) throws IOException {
FileSystem fs = stat.getPath().getFileSystem(conf);
return new HadoopInputFile(fs, stat, conf);
Expand All @@ -72,7 +102,7 @@ public long getLength() {

@Override
public SeekableInputStream newStream() throws IOException {
return HadoopStreams.wrap(fs.open(stat.getPath()));
return HadoopStreams.wrap(HadoopFileIO.openFile(fs, stat, true));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@

package org.apache.parquet.hadoop.util.wrapped.io;

import static org.apache.parquet.Preconditions.checkState;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.function.Supplier;
import org.apache.parquet.util.DynMethods;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -31,6 +36,35 @@ final class BindingUtils {

private BindingUtils() {}

/**
* Load a class by name.
* @param cl classloader to use.
* @param className classname
* @return the class or null if it could not be loaded.
*/
public static Class<?> loadClass(ClassLoader cl, String className) {
try {
return cl.loadClass(className);
} catch (ClassNotFoundException e) {
LOG.debug("No class {}", className, e);
return null;
}
}

/**
* Load a class by name.
* @param className classname
* @return the class or null if it could not be loaded.
*/
public static Class<?> loadClass(String className) {
try {
return Class.forName(className);
} catch (ClassNotFoundException e) {
LOG.debug("No class {}", className, e);
return null;
}
}

/**
* Get an invocation from the source class, which will be unavailable() if
* the class is null or the method isn't found.
Expand Down Expand Up @@ -65,6 +99,30 @@ static <T> DynMethods.UnboundMethod loadInvocation(
}
}

/**
* Load a static method from the source class, which will be a noop() if
* the class is null or the method isn't found.
* If the class and method are not found, then an {@code IllegalStateException}
* is raised on the basis that this means that the binding class is broken,
* rather than missing/out of date.
*
* @param <T> return type
* @param source source. If null, the method is a no-op.
* @param returnType return type class (unused)
* @param name method name
* @param parameterTypes parameters
*
* @return the method or a no-op.
* @throws IllegalStateException if the method is not static.
*/
public static <T> DynMethods.UnboundMethod loadStaticMethod(
Class<?> source, Class<? extends T> returnType, String name, Class<?>... parameterTypes) {

final DynMethods.UnboundMethod method = loadInvocation(source, returnType, name, parameterTypes);
checkState(method.isStatic(), "Method is not static %s", method);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to add class name as well in the log ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point

return method;
}

/**
* Create a no-op method.
*
Expand All @@ -91,4 +149,40 @@ static boolean implemented(DynMethods.UnboundMethod... methods) {
}
return true;
}

/**
* Require a method to be available.
* @param method method to probe
* @throws UnsupportedOperationException if the method was not found.
*/
static void checkAvailable(DynMethods.UnboundMethod method) throws UnsupportedOperationException {
if (method.isNoop()) {
throw new UnsupportedOperationException("Unbound " + method);
}
}

/**
* Is a method available?
* @param method method to probe
* @return true iff the method is found and loaded.
*/
static boolean available(DynMethods.UnboundMethod method) {
return !method.isNoop();
}

/**
* Invoke the supplier, catching any {@code UncheckedIOException} raised,
* extracting the inner IOException and rethrowing it.
* @param call call to invoke
* @return result
* @param <T> type of result
* @throws IOException if the call raised an IOException wrapped by an UncheckedIOException.
*/
static <T> T extractIOEs(Supplier<T> call) throws IOException {
try {
return call.get();
} catch (UncheckedIOException e) {
throw e.getCause();
}
}
}
Loading