Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HadoopInputFile to pass down FileStatus when opening file #2955

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

steveloughran
Copy link
Contributor

Rationale for this change

  • Saves overhead of HTTP head request when opening a file
  • tells the hadoop FS client that the file being opened is parquet, and should use the first recognized policy of "parquet, columnar, vector, random". These can disable prefetch and limit ranges requested to those optimal for columns.

What changes are included in this PR?

1,. Uses reflection to load reflection-friendly bindings to the enhanced openFile method of apache/hadoop#6686 . Although openFile() has been present since Hadoop 3.3.0, because parquet still builds against hadoop 2.x reflection is required.

Are these changes tested?

Existing tests have been modified.
apache/hadoop#6686

Are there any user-facing changes?

no

Closes #${#2915}

in sync with ongoing hadoop pr, commit 3d7dc340c9a1

Change-Id: I868df6afb373d57179c9cb9d90164e71b0571faf
Change-Id: I7de43d8426b56800c540a520f1fb7fef21ae60ba
@steveloughran steveloughran marked this pull request as draft July 15, 2024 19:00
* eases future upgrades of hadoop dependencies.
* updated uses of FileSystem.open() where the file read policy
  is known and/or exists()/getFileStatus() calls are executed
  immediately before.
* use it in footer reading as well as file reading

This looks like coverage of the core production use;
ignoring CLI operations

Change-Id: Id1c35619a04a500c7cccd131358b22eaa1e0f984
Got signature wrong.

Change-Id: I2923fa0eb11b4cf779eb7b7fc79dcc7917d14db1
/**
* Read policy for parquet files: {@value}.
*/
public static final String PARQUET_READ_POLICIES = "parquet, columnar, vector, random";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment here on what effect this has? It's not immediately obvious why this would be better than a sequential read.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do. key thing: it tells all prefetching/caching/range limiting logic what you will have to do, so avoids inefficiencies such as: aborting reads against s3, wasted prefetch on abfs etc.

The parquet option is to say 'do what you need in terms of footer prefetch/cache'; google gcs connector does this but not so explicitly

https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/filesystem/fsdatainputstreambuilder.html

@@ -160,7 +160,11 @@
<artifactId>zstd-jni</artifactId>
<version>${zstd-jni.version}</version>
</dependency>

<dependency>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to revisit why this got in

try {
commonFileStatus = fileSystem.getFileStatus(filePath);
} catch (FileNotFoundException e) {
// file does not exist

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a DEBUG log here if we are not throwing the exception.

*
* @throws IOException failure to open the file.
*/
public static FSDataInputStream openFile(final FileSystem fileSystem, final Path path, final boolean randomIO)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The readPolicies method's parameter randomIO could be confusing. It might be more clear if renamed to useRandomIO or isRandomIO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or we could take a full string list of read policies, which is what is really happening underneath. Hadoop 3.4.1 explicitly adds "parquet" as an input format to tell the FS to optimise for that (footer caching, assume random IO everywhere else...)

* false if the method is not loaded or the path lacks the capability.
* @throws IllegalArgumentException invalid arguments
*/
public boolean pathCapabilities_hasPathCapability(Object fs, Path path, String capability) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method name can be shortened.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you look at the design, we have pulled in a lot of interfaces and their methods, so the naming is designed to isolate them both. It's a bit clunky but it is intended to show where operations come from

Class<?> source, Class<? extends T> returnType, String name, Class<?>... parameterTypes) {

final DynMethods.UnboundMethod method = loadInvocation(source, returnType, name, parameterTypes);
checkState(method.isStatic(), "Method is not static %s", method);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to add class name as well in the log ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point

*/
public List<Map.Entry<Path, String>> bulkDelete_delete(FileSystem fs, Path base, Collection<Path> paths) {
checkAvailable(bulkDeleteDeleteMethod);
return bulkDeleteDeleteMethod.invoke(null, fs, base, paths);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be renamed to bulkDeleteMethod ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. in the actual new api we've got an interface_method split, but here it looks clunky in the actual invocation

* This class is derived from {@code org.apache.hadoop.io.wrappedio.impl.DynamicWrappedIO}.
* If a bug is found here, check to see if it has been fixed in hadoop trunk branch.
* If not: please provide a patch for that project alongside one here.
*/

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generic nit :- found the variable and method names longer in the entire file

* Note: that is the default behaviour of {@code FSDataInputStream#readFully(long, ByteBuffer)}.
*/
public void byteBufferPositionedReadable_readFully(InputStream in, long position, ByteBuffer buf) {
checkAvailable(byteBufferPositionedReadableReadFullyMethod);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be combined into a single statement like the previous method

long.class,
ByteBuffer.class);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also a suggestion that we can cache the Method objects after they are first loaded. This can avoid repeated lookups using reflection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice idea!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants