From 677d2fd2a950a56ee6bc66839fdabbc1a5f19063 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 11 Jun 2024 20:11:30 +0100 Subject: [PATCH 1/4] PARQUET-2493. Reflection based use of hadoop WrappedIO class in sync with ongoing hadoop pr, commit 3d7dc340c9a1 Change-Id: I868df6afb373d57179c9cb9d90164e71b0571faf --- .../parquet/hadoop/util/HadoopInputFile.java | 3 +- .../hadoop/util/wrapped/io/BindingUtils.java | 94 ++++ .../util/wrapped/io/DynamicWrappedIO.java | 403 ++++++++++++++++++ .../parquet/hadoop/TestParquetFileWriter.java | 6 +- 4 files changed, 503 insertions(+), 3 deletions(-) create mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/DynamicWrappedIO.java diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java index 48c79fa5eb..e9d9be8326 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.util.wrapped.io.DynamicWrappedIO; import org.apache.parquet.io.InputFile; import org.apache.parquet.io.SeekableInputStream; @@ -72,7 +73,7 @@ public long getLength() { @Override public SeekableInputStream newStream() throws IOException { - return HadoopStreams.wrap(fs.open(stat.getPath())); + return HadoopStreams.wrap(DynamicWrappedIO.openFile(fs, stat)); } @Override diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/BindingUtils.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/BindingUtils.java index 204bb5e43d..6bc910fe7b 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/BindingUtils.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/BindingUtils.java @@ -18,6 +18,11 @@ package org.apache.parquet.hadoop.util.wrapped.io; +import static org.apache.parquet.Preconditions.checkState; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.function.Supplier; import org.apache.parquet.util.DynMethods; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,6 +36,35 @@ final class BindingUtils { private BindingUtils() {} + /** + * Load a class by name. + * @param cl classloader to use. + * @param className classname + * @return the class or null if it could not be loaded. + */ + public static Class loadClass(ClassLoader cl, String className) { + try { + return cl.loadClass(className); + } catch (ClassNotFoundException e) { + LOG.debug("No class {}", className, e); + return null; + } + } + + /** + * Load a class by name. + * @param className classname + * @return the class or null if it could not be loaded. + */ + public static Class loadClass(String className) { + try { + return Class.forName(className); + } catch (ClassNotFoundException e) { + LOG.debug("No class {}", className, e); + return null; + } + } + /** * Get an invocation from the source class, which will be unavailable() if * the class is null or the method isn't found. @@ -65,6 +99,30 @@ static DynMethods.UnboundMethod loadInvocation( } } + /** + * Load a static method from the source class, which will be a noop() if + * the class is null or the method isn't found. + * If the class and method are not found, then an {@code IllegalStateException} + * is raised on the basis that this means that the binding class is broken, + * rather than missing/out of date. + * + * @param return type + * @param source source. If null, the method is a no-op. + * @param returnType return type class (unused) + * @param name method name + * @param parameterTypes parameters + * + * @return the method or a no-op. + * @throws IllegalStateException if the method is not static. + */ + public static DynMethods.UnboundMethod loadStaticMethod( + Class source, Class returnType, String name, Class... parameterTypes) { + + final DynMethods.UnboundMethod method = loadInvocation(source, returnType, name, parameterTypes); + checkState(method.isStatic(), "Method is not static %s", method); + return method; + } + /** * Create a no-op method. * @@ -91,4 +149,40 @@ static boolean implemented(DynMethods.UnboundMethod... methods) { } return true; } + + /** + * Require a method to be available. + * @param method method to probe + * @throws UnsupportedOperationException if the method was not found. + */ + static void checkAvailable(DynMethods.UnboundMethod method) throws UnsupportedOperationException { + if (method.isNoop()) { + throw new UnsupportedOperationException("Unbound " + method); + } + } + + /** + * Is a method available? + * @param method method to probe + * @return true iff the method is found and loaded. + */ + static boolean available(DynMethods.UnboundMethod method) { + return !method.isNoop(); + } + + /** + * Invoke the supplier, catching any {@code UncheckedIOException} raised, + * extracting the inner IOException and rethrowing it. + * @param call call to invoke + * @return result + * @param type of result + * @throws IOException if the call raised an IOException wrapped by an UncheckedIOException. + */ + static T extractIOEs(Supplier call) throws IOException { + try { + return call.get(); + } catch (UncheckedIOException e) { + throw e.getCause(); + } + } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/DynamicWrappedIO.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/DynamicWrappedIO.java new file mode 100644 index 0000000000..e455759e21 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/DynamicWrappedIO.java @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.parquet.hadoop.util.wrapped.io; + +import static org.apache.parquet.hadoop.util.wrapped.io.BindingUtils.available; +import static org.apache.parquet.hadoop.util.wrapped.io.BindingUtils.checkAvailable; +import static org.apache.parquet.hadoop.util.wrapped.io.BindingUtils.loadClass; +import static org.apache.parquet.hadoop.util.wrapped.io.BindingUtils.loadStaticMethod; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.util.DynMethods; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The wrapped IO methods in {@code org.apache.hadoop.io.wrappedio.WrappedIO}, + * dynamically loaded. + *

+ * This class is derived from {@code org.apache.hadoop.io.wrappedio.impl.DynamicWrappedIO}. + * If a bug is found here, check to see if it has been fixed in hadoop trunk branch. + * If not: please provide a patch for that project alongside one here. + */ +public final class DynamicWrappedIO { + + private static final Logger LOG = LoggerFactory.getLogger(DynamicWrappedIO.class); + + /** + * Classname of the wrapped IO class: {@value}. + */ + public static final String WRAPPED_IO_CLASSNAME = "org.apache.hadoop.io.wrappedio.WrappedIO"; + + /** + * Method name for openFile: {@value} + */ + public static final String FILESYSTEM_OPEN_FILE = "fileSystem_openFile"; + + /** + * Method name for bulk delete: {@value} + */ + public static final String BULKDELETE_DELETE = "bulkDelete_delete"; + + /** + * Method name for bulk delete: {@value} + */ + public static final String BULKDELETE_PAGESIZE = "bulkDelete_pageSize"; + + /** + * Method name for {@code byteBufferPositionedReadable}: {@value}. + */ + public static final String BYTE_BUFFER_POSITIONED_READABLE_READ_FULLY_AVAILABLE = + "byteBufferPositionedReadable_readFullyAvailable"; + + /** + * Method name for {@code byteBufferPositionedReadable}: {@value}. + */ + public static final String BYTE_BUFFER_POSITIONED_READABLE_READ_FULLY = "byteBufferPositionedReadable_readFully"; + + /** + * Method name for {@code PathCapabilities.hasPathCapability()}. + * {@value} + */ + public static final String PATH_CAPABILITIES_HAS_PATH_CAPABILITY = "pathCapabilities_hasPathCapability"; + + /** + * Method name for {@code StreamCapabilities.hasCapability()}. + * {@value} + */ + public static final String STREAM_CAPABILITIES_HAS_CAPABILITY = "streamCapabilities_hasCapability"; + + /** + * A singleton instance of the wrapper. + */ + private static final DynamicWrappedIO INSTANCE = new DynamicWrappedIO(); + + /** + * Read policy for parquet files: {@value}. + */ + public static final String PARQUET_READ_POLICIES = "parquet, columnar, vector, random"; + + /** + * Was wrapped IO loaded? + * In the hadoop codebase, this is true. + * But in other libraries it may not always be true...this + * field is used to assist copy-and-paste adoption. + */ + private final boolean loaded; + + /** + * Method binding. + * {@code WrappedIO.bulkDelete_delete(FileSystem, Path, Collection)}. + */ + private final DynMethods.UnboundMethod bulkDeleteDeleteMethod; + + /** + * Method binding. + * {@code WrappedIO.bulkDelete_pageSize(FileSystem, Path)}. + */ + private final DynMethods.UnboundMethod bulkDeletePageSizeMethod; + + /** + * Dynamic openFile() method. + * {@code WrappedIO.fileSystem_openFile(FileSystem, Path, String, FileStatus, Long, Map)}. + */ + private final DynMethods.UnboundMethod fileSystemOpenFileMethod; + + private final DynMethods.UnboundMethod pathCapabilitiesHasPathCapabilityMethod; + + private final DynMethods.UnboundMethod streamCapabilitiesHasCapabilityMethod; + + private final DynMethods.UnboundMethod byteBufferPositionedReadableReadFullyAvailableMethod; + + private final DynMethods.UnboundMethod byteBufferPositionedReadableReadFullyMethod; + + public DynamicWrappedIO() { + this(WRAPPED_IO_CLASSNAME); + } + + public DynamicWrappedIO(String classname) { + + // Wrapped IO class. + Class wrappedClass = loadClass(classname); + + loaded = wrappedClass != null; + + // bulk delete APIs + bulkDeleteDeleteMethod = loadStaticMethod( + wrappedClass, List.class, BULKDELETE_DELETE, FileSystem.class, Path.class, Collection.class); + + bulkDeletePageSizeMethod = + loadStaticMethod(wrappedClass, Integer.class, BULKDELETE_PAGESIZE, FileSystem.class, Path.class); + + // load the openFile method + fileSystemOpenFileMethod = loadStaticMethod( + wrappedClass, + FSDataInputStream.class, + FILESYSTEM_OPEN_FILE, + FileSystem.class, + Path.class, + String.class, + FileStatus.class, + Long.class, + Map.class); + + // path and stream capabilities + pathCapabilitiesHasPathCapabilityMethod = loadStaticMethod( + wrappedClass, + Boolean.class, + PATH_CAPABILITIES_HAS_PATH_CAPABILITY, + FileSystem.class, + Path.class, + String.class); + + streamCapabilitiesHasCapabilityMethod = loadStaticMethod( + wrappedClass, Boolean.class, STREAM_CAPABILITIES_HAS_CAPABILITY, Object.class, String.class); + + // ByteBufferPositionedReadable + byteBufferPositionedReadableReadFullyAvailableMethod = loadStaticMethod( + wrappedClass, Void.class, BYTE_BUFFER_POSITIONED_READABLE_READ_FULLY_AVAILABLE, InputStream.class); + + byteBufferPositionedReadableReadFullyMethod = loadStaticMethod( + wrappedClass, + Void.class, + BYTE_BUFFER_POSITIONED_READABLE_READ_FULLY, + InputStream.class, + long.class, + ByteBuffer.class); + } + + /** + * Is the wrapped IO class loaded? + * @return true if the wrappedIO class was found and loaded. + */ + public boolean loaded() { + return loaded; + } + + /** + * Are the bulk delete methods available? + * @return true if the methods were found. + */ + public boolean bulkDelete_available() { + return available(bulkDeleteDeleteMethod); + } + + /** + * Get the maximum number of objects/files to delete in a single request. + * @param fileSystem filesystem + * @param path path to delete under. + * @return a number greater than or equal to zero. + * @throws UnsupportedOperationException bulk delete under that path is not supported. + * @throws IllegalArgumentException path not valid. + * @throws RuntimeException invocation failure. + */ + public int bulkDelete_pageSize(final FileSystem fileSystem, final Path path) { + checkAvailable(bulkDeletePageSizeMethod); + return bulkDeletePageSizeMethod.invoke(null, fileSystem, path); + } + + /** + * Delete a list of files/objects. + *

    + *
  • Files must be under the path provided in {@code base}.
  • + *
  • The size of the list must be equal to or less than the page size.
  • + *
  • Directories are not supported; the outcome of attempting to delete + * directories is undefined (ignored; undetected, listed as failures...).
  • + *
  • The operation is not atomic.
  • + *
  • The operation is treated as idempotent: network failures may + * trigger resubmission of the request -any new objects created under a + * path in the list may then be deleted.
  • + *
  • There is no guarantee that any parent directories exist after this call. + *
  • + *
+ * @param fs filesystem + * @param base path to delete under. + * @param paths list of paths which must be absolute and under the base path. + * @return a list of all the paths which couldn't be deleted for a reason other than "not found" and any associated error message. + * @throws UnsupportedOperationException bulk delete under that path is not supported. + * @throws IllegalArgumentException if a path argument is invalid. + * @throws RuntimeException for any failure. + */ + public List> bulkDelete_delete(FileSystem fs, Path base, Collection paths) { + checkAvailable(bulkDeleteDeleteMethod); + return bulkDeleteDeleteMethod.invoke(null, fs, base, paths); + } + + /** + * Is the {@link #fileSystem_openFile(FileSystem, Path, String, FileStatus, Long, Map)} + * method available. + * @return true if the optimized open file method can be invoked. + */ + public boolean fileSystem_openFile_available() { + return available(fileSystemOpenFileMethod); + } + + /** + * OpenFile assistant, easy reflection-based access to + * {@code FileSystem#openFile(Path)} and blocks + * awaiting the operation completion. + * @param fs filesystem + * @param path path + * @param policy read policy + * @param status optional file status + * @param length optional file length + * @param options nullable map of other options + * @return stream of the opened file + * @throws RuntimeException for any failure. + */ + public FSDataInputStream fileSystem_openFile( + final FileSystem fs, + final Path path, + final String policy, + @Nullable final FileStatus status, + @Nullable final Long length, + @Nullable final Map options) { + checkAvailable(fileSystemOpenFileMethod); + return fileSystemOpenFileMethod.invoke(null, fs, path, policy, status, length, options); + } + + /** + * Does a path have a given capability? + * Calls {@code PathCapabilities#hasPathCapability(Path, String)}, + * mapping IOExceptions to false. + * @param fs filesystem + * @param path path to query the capability of. + * @param capability non-null, non-empty string to query the path for support. + * @return true if the capability is supported + * under that part of the FS + * false if the method is not loaded or the path lacks the capability. + * @throws IllegalArgumentException invalid arguments + */ + public boolean pathCapabilities_hasPathCapability(Object fs, Path path, String capability) { + if (!available(pathCapabilitiesHasPathCapabilityMethod)) { + return false; + } + return pathCapabilitiesHasPathCapabilityMethod.invoke(null, fs, path, capability); + } + + /** + * Does an object implement {@code StreamCapabilities} and, if so, + * what is the result of the probe for the capability? + * Calls {@code StreamCapabilities#hasCapability(String)}, + * @param object object to probe + * @param capability capability string + * @return true iff the object implements StreamCapabilities and the capability is + * declared available. + */ + public boolean streamCapabilities_hasCapability(Object object, String capability) { + if (!available(streamCapabilitiesHasCapabilityMethod)) { + return false; + } + return streamCapabilitiesHasCapabilityMethod.invoke(null, object, capability); + } + + /** + * Are the ByteBufferPositionedReadable methods loaded? + * This does not check that a specific stream implements the API; + * use {@link #byteBufferPositionedReadable_readFullyAvailable(InputStream)}. + * @return true if the hadoop libraries have the method. + */ + public boolean byteBufferPositionedReadable_available() { + return available(byteBufferPositionedReadableReadFullyAvailableMethod); + } + + /** + * Probe to see if the input stream is an instance of ByteBufferPositionedReadable. + * If the stream is an FSDataInputStream, the wrapped stream is checked. + * @param in input stream + * @return true if the API is available, the stream implements the interface + * (including the innermost wrapped stream) and that it declares the stream capability. + */ + public boolean byteBufferPositionedReadable_readFullyAvailable(InputStream in) { + return available(byteBufferPositionedReadableReadFullyAvailableMethod) + && (boolean) byteBufferPositionedReadableReadFullyAvailableMethod.invoke(null, in); + } + + /** + * Delegate to {@code ByteBufferPositionedReadable#read(long, ByteBuffer)}. + * @param in input stream + * @param position position within file + * @param buf the ByteBuffer to receive the results of the read operation. + * @throws UnsupportedOperationException if the input doesn't implement + * the interface or, if when invoked, it is raised. + * Note: that is the default behaviour of {@code FSDataInputStream#readFully(long, ByteBuffer)}. + */ + public void byteBufferPositionedReadable_readFully(InputStream in, long position, ByteBuffer buf) { + checkAvailable(byteBufferPositionedReadableReadFullyMethod); + byteBufferPositionedReadableReadFullyMethod.invoke(null, in, position, buf); + } + + /** + * Get the singleton instance. + * @return the instance + */ + public static DynamicWrappedIO instance() { + return INSTANCE; + } + + /** + * Is the wrapped IO class loaded? + * @return true if the instance is loaded. + */ + public static boolean isAvailable() { + return instance().loaded(); + } + + /** + * Open a parquet file. + *

+ * If the WrappedIO class is found, uses + * {@link #fileSystem_openFile(FileSystem, Path, String, FileStatus, Long, Map)} with + * {@link #PARQUET_READ_POLICIES} as the list of read policies and passing down + * the file status. + *

+ * If not, falls back to the classic {@code fs.open(Path)} call. + * @param fs filesystem + * @param status file status + * @throws IOException any IO failure. + */ + public static FSDataInputStream openFile(FileSystem fs, FileStatus status) throws IOException { + final DynamicWrappedIO instance = DynamicWrappedIO.instance(); + FSDataInputStream stream; + if (instance.fileSystem_openFile_available()) { + // use openfile for a higher performance read + // and the ability to set a read policy. + // This optimizes for cloud storage by saving on IO + // in open and choosing the range for GET requests. + // For other stores, it ultimately invokes the classic open(Path) + // call so is no more expensive than before. + LOG.debug("Opening file {} through fileSystem_openFile", status); + stream = instance.fileSystem_openFile(fs, status.getPath(), PARQUET_READ_POLICIES, status, null, null); + } else { + LOG.debug("Opening file {} through open()", status); + stream = fs.open(status.getPath()); + } + return stream; + } +} diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java index 32e2eac0b9..e90b7fc27a 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java @@ -98,6 +98,7 @@ import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.hadoop.util.HadoopOutputFile; import org.apache.parquet.hadoop.util.HiddenFileFilter; +import org.apache.parquet.hadoop.util.wrapped.io.DynamicWrappedIO; import org.apache.parquet.internal.column.columnindex.BinaryTruncator; import org.apache.parquet.internal.column.columnindex.BoundaryOrder; import org.apache.parquet.internal.column.columnindex.ColumnIndex; @@ -647,10 +648,11 @@ public void testAlignmentWithPadding() throws Exception { w.end(new HashMap()); FileSystem fs = path.getFileSystem(conf); - long fileLen = fs.getFileStatus(path).getLen(); + final FileStatus stat = fs.getFileStatus(path); + long fileLen = stat.getLen(); long footerLen; - try (FSDataInputStream data = fs.open(path)) { + try (FSDataInputStream data = DynamicWrappedIO.openFile(fs, stat)) { data.seek(fileLen - 8); // 4-byte offset + "PAR1" footerLen = BytesUtils.readIntLittleEndian(data); } From 530d6c3d7a12f5f3eba455ccdfeaa21154dba460 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 12 Jun 2024 14:15:22 +0100 Subject: [PATCH 2/4] PARQUET-2493. pull read policy declaration into reader code Change-Id: I7de43d8426b56800c540a520f1fb7fef21ae60ba --- parquet-hadoop/pom.xml | 6 +++++- .../org/apache/parquet/hadoop/util/HadoopInputFile.java | 2 +- .../parquet/hadoop/util/wrapped/io/DynamicWrappedIO.java | 9 +++++---- .../org/apache/parquet/hadoop/TestParquetFileWriter.java | 2 +- 4 files changed, 12 insertions(+), 7 deletions(-) diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml index d12948317f..18f8e47394 100644 --- a/parquet-hadoop/pom.xml +++ b/parquet-hadoop/pom.xml @@ -160,7 +160,11 @@ zstd-jni ${zstd-jni.version} - + + com.google.code.findbugs + jsr305 + ${jsr305.version} + com.google.guava guava diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java index e9d9be8326..547d2dbf16 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java @@ -73,7 +73,7 @@ public long getLength() { @Override public SeekableInputStream newStream() throws IOException { - return HadoopStreams.wrap(DynamicWrappedIO.openFile(fs, stat)); + return HadoopStreams.wrap(DynamicWrappedIO.openFile(fs, stat, DynamicWrappedIO.PARQUET_READ_POLICIES)); } @Override diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/DynamicWrappedIO.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/DynamicWrappedIO.java index e455759e21..68cea38b50 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/DynamicWrappedIO.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/DynamicWrappedIO.java @@ -374,15 +374,16 @@ public static boolean isAvailable() { *

* If the WrappedIO class is found, uses * {@link #fileSystem_openFile(FileSystem, Path, String, FileStatus, Long, Map)} with - * {@link #PARQUET_READ_POLICIES} as the list of read policies and passing down + * the supplied list of read policies and passing down * the file status. *

* If not, falls back to the classic {@code fs.open(Path)} call. * @param fs filesystem * @param status file status + * @param policy read policy * @throws IOException any IO failure. */ - public static FSDataInputStream openFile(FileSystem fs, FileStatus status) throws IOException { + public static FSDataInputStream openFile(FileSystem fs, FileStatus status, String policy) throws IOException { final DynamicWrappedIO instance = DynamicWrappedIO.instance(); FSDataInputStream stream; if (instance.fileSystem_openFile_available()) { @@ -392,8 +393,8 @@ public static FSDataInputStream openFile(FileSystem fs, FileStatus status) throw // in open and choosing the range for GET requests. // For other stores, it ultimately invokes the classic open(Path) // call so is no more expensive than before. - LOG.debug("Opening file {} through fileSystem_openFile", status); - stream = instance.fileSystem_openFile(fs, status.getPath(), PARQUET_READ_POLICIES, status, null, null); + LOG.debug("Opening file {} through fileSystem_openFile() with policy {}", status, policy); + stream = instance.fileSystem_openFile(fs, status.getPath(), policy, status, null, null); } else { LOG.debug("Opening file {} through open()", status); stream = fs.open(status.getPath()); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java index e90b7fc27a..5513761dde 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java @@ -652,7 +652,7 @@ public void testAlignmentWithPadding() throws Exception { long fileLen = stat.getLen(); long footerLen; - try (FSDataInputStream data = DynamicWrappedIO.openFile(fs, stat)) { + try (FSDataInputStream data = DynamicWrappedIO.openFile(fs, stat, DynamicWrappedIO.PARQUET_READ_POLICIES)) { data.seek(fileLen - 8); // 4-byte offset + "PAR1" footerLen = BytesUtils.readIntLittleEndian(data); } From 80dd0d83ecc3522df95047e85f414eb8661285f2 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 15 Jul 2024 21:01:42 +0100 Subject: [PATCH 3/4] Move all use of the dynamic methods into HadoopFileIO * eases future upgrades of hadoop dependencies. * updated uses of FileSystem.open() where the file read policy is known and/or exists()/getFileStatus() calls are executed immediately before. * use it in footer reading as well as file reading This looks like coverage of the core production use; ignoring CLI operations Change-Id: Id1c35619a04a500c7cccd131358b22eaa1e0f984 --- .../keytools/HadoopFSKeyMaterialStore.java | 3 +- .../parquet/hadoop/ParquetFileReader.java | 29 ++++- .../parquet/hadoop/util/HadoopFileIO.java | 101 ++++++++++++++++++ .../parquet/hadoop/util/HadoopInputFile.java | 33 +++++- .../util/wrapped/io/DynamicWrappedIO.java | 18 +++- .../parquet/hadoop/TestParquetFileWriter.java | 4 +- .../util/wrapped/io/TestVectorIoBridge.java | 3 +- 7 files changed, 176 insertions(+), 15 deletions(-) create mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopFileIO.java diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/HadoopFSKeyMaterialStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/HadoopFSKeyMaterialStore.java index 22ec14c576..2c0cc85e30 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/HadoopFSKeyMaterialStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/keytools/HadoopFSKeyMaterialStore.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.crypto.ParquetCryptoRuntimeException; +import org.apache.parquet.hadoop.util.HadoopFileIO; public class HadoopFSKeyMaterialStore implements FileKeyMaterialStore { public static final String KEY_MATERIAL_FILE_PREFIX = "_KEY_MATERIAL_FOR_"; @@ -73,7 +74,7 @@ public String getKeyMaterial(String keyIDInFile) throws ParquetCryptoRuntimeExce } private void loadKeyMaterialMap() { - try (FSDataInputStream keyMaterialStream = hadoopFileSystem.open(keyMaterialFile)) { + try (FSDataInputStream keyMaterialStream = HadoopFileIO.openFile(hadoopFileSystem, keyMaterialFile, false)) { JsonNode keyMaterialJson = objectMapper.readTree(keyMaterialStream); keyMaterialMap = objectMapper.readValue(keyMaterialJson.traverse(), new TypeReference>() {}); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 1d8cce3d8c..911c05183b 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -104,6 +104,7 @@ import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopFileIO; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.hadoop.util.counters.BenchmarkCounter; import org.apache.parquet.hadoop.util.wrapped.io.FutureIO; @@ -438,19 +439,37 @@ public static List