Propagate gcs-connector options to GcsUtil#32769
Conversation
| try { | ||
| // Check if gcs-connector-hadoop is loaded into classpath | ||
| Class.forName("com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration"); | ||
| Configuration config = new Configuration(); | ||
| return GoogleCloudStorageReadOptions.builder() | ||
| .setFastFailOnNotFound( | ||
| GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_FAST_FAIL_ON_NOT_FOUND_ENABLE | ||
| .get(config, config::getBoolean)) | ||
| .setSupportGzipEncoding( | ||
| GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_SUPPORT_GZIP_ENCODING_ENABLE | ||
| .get(config, config::getBoolean)) | ||
| .setInplaceSeekLimit( | ||
| GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT.get( | ||
| config, config::getLong)) | ||
| .setFadvise( | ||
| GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_FADVISE.get( | ||
| config, config::getEnum)) | ||
| .setMinRangeRequestSize( | ||
| GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_MIN_RANGE_REQUEST_SIZE.get( | ||
| config, config::getInt)) | ||
| .setGrpcChecksumsEnabled( | ||
| GoogleHadoopFileSystemConfiguration.GCS_GRPC_CHECKSUMS_ENABLE.get( | ||
| config, config::getBoolean)) | ||
| .setGrpcReadTimeoutMillis( | ||
| GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_TIMEOUT_MS.get( | ||
| config, config::getLong)) | ||
| .setGrpcReadMessageTimeoutMillis( | ||
| GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_MESSAGE_TIMEOUT_MS.get( | ||
| config, config::getLong)) | ||
| .setGrpcReadMetadataTimeoutMillis( | ||
| GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_METADATA_TIMEOUT_MS.get( | ||
| config, config::getLong)) | ||
| .setGrpcReadZeroCopyEnabled( | ||
| GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_ZEROCOPY_ENABLE.get( | ||
| config, config::getBoolean)) | ||
| .setTraceLogEnabled( | ||
| GoogleHadoopFileSystemConfiguration.GCS_TRACE_LOG_ENABLE.get( | ||
| config, config::getBoolean)) | ||
| .setTraceLogTimeThreshold( | ||
| GoogleHadoopFileSystemConfiguration.GCS_TRACE_LOG_TIME_THRESHOLD_MS.get( | ||
| config, config::getLong)) | ||
| .build(); | ||
| } catch (ClassNotFoundException e) { |
There was a problem hiding this comment.
Copy-pasted from here: https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/v2.2.25/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java#L656-L677
I think we could make a case to make that method public in a future release so we're not pulling in Hadoop explicitly here.
Or, I could omit this if/else branch entirely and always return GoogleCloudStorageReadOptions.DEFAULT, and leave it up to the user to supply a GoogleCloudStorageReadOptions instance (thus passing the Hadoop dependency down to the user-end).
There was a problem hiding this comment.
Or, I could omit this if/else branch entirely and always return GoogleCloudStorageReadOptions.DEFAULT, and leave it up to the user to supply a GoogleCloudStorageReadOptions instance (thus passing the Hadoop dependency down to the user-end).
I think this would be preferable to avoid having to pull in the other packages but asking for some other Beam maintainers more familiar with dep management etc to take a look as well.
There was a problem hiding this comment.
Agree with @scwhittle. It is also easier for maintainers and if new options are added, we don't need to change the code to support that.
There was a problem hiding this comment.
Sounds good! I removed the Hadoop dep and this parsing block; this leaves it up to the user to pass in a GoogleCloudStorageReadOptions constructed however they prefer 👍
| classesInPackage("com.google.auth"), | ||
| classesInPackage("com.fasterxml.jackson.annotation"), | ||
| classesInPackage("com.google.cloud.hadoop.gcsio"), | ||
| classesInPackage("com.google.common.collect"), // via GoogleCloudStorageReadOptions |
There was a problem hiding this comment.
this is unfortunate 😓 GoogleCloudStorageReadOptions has one property that's strictly an ImmutableSet: https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/v2.2.25/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java#L519
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
|
assign set of reviewers |
|
Assigning reviewers. If you would like to opt out of this review, comment R: @damondouglas for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
|
assigned reviewers since at first glance, the GHA failures did not look related/might be transient? could be wrong though. |
|
bumping this PR -- cc @scwhittle since I saw you recently made changes to GcsUtil? |
| try { | ||
| // Check if gcs-connector-hadoop is loaded into classpath | ||
| Class.forName("com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration"); | ||
| Configuration config = new Configuration(); | ||
| return GoogleCloudStorageReadOptions.builder() | ||
| .setFastFailOnNotFound( | ||
| GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_FAST_FAIL_ON_NOT_FOUND_ENABLE | ||
| .get(config, config::getBoolean)) | ||
| .setSupportGzipEncoding( | ||
| GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_SUPPORT_GZIP_ENCODING_ENABLE | ||
| .get(config, config::getBoolean)) | ||
| .setInplaceSeekLimit( | ||
| GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT.get( | ||
| config, config::getLong)) | ||
| .setFadvise( | ||
| GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_FADVISE.get( | ||
| config, config::getEnum)) | ||
| .setMinRangeRequestSize( | ||
| GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_MIN_RANGE_REQUEST_SIZE.get( | ||
| config, config::getInt)) | ||
| .setGrpcChecksumsEnabled( | ||
| GoogleHadoopFileSystemConfiguration.GCS_GRPC_CHECKSUMS_ENABLE.get( | ||
| config, config::getBoolean)) | ||
| .setGrpcReadTimeoutMillis( | ||
| GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_TIMEOUT_MS.get( | ||
| config, config::getLong)) | ||
| .setGrpcReadMessageTimeoutMillis( | ||
| GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_MESSAGE_TIMEOUT_MS.get( | ||
| config, config::getLong)) | ||
| .setGrpcReadMetadataTimeoutMillis( | ||
| GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_METADATA_TIMEOUT_MS.get( | ||
| config, config::getLong)) | ||
| .setGrpcReadZeroCopyEnabled( | ||
| GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_ZEROCOPY_ENABLE.get( | ||
| config, config::getBoolean)) | ||
| .setTraceLogEnabled( | ||
| GoogleHadoopFileSystemConfiguration.GCS_TRACE_LOG_ENABLE.get( | ||
| config, config::getBoolean)) | ||
| .setTraceLogTimeThreshold( | ||
| GoogleHadoopFileSystemConfiguration.GCS_TRACE_LOG_TIME_THRESHOLD_MS.get( | ||
| config, config::getLong)) | ||
| .build(); | ||
| } catch (ClassNotFoundException e) { |
There was a problem hiding this comment.
Or, I could omit this if/else branch entirely and always return GoogleCloudStorageReadOptions.DEFAULT, and leave it up to the user to supply a GoogleCloudStorageReadOptions instance (thus passing the Hadoop dependency down to the user-end).
I think this would be preferable to avoid having to pull in the other packages but asking for some other Beam maintainers more familiar with dep management etc to take a look as well.
| googleCloudStorageOptions = | ||
| GoogleCloudStorageOptions.builder() | ||
| .setAppName("Beam") | ||
| .setReadChannelOptions(this.googleCloudStorageReadOptions) |
There was a problem hiding this comment.
This seems like a bug in GoogleCloudStorageImpl.open method that it doesn't use these options but you can remove the separate member variable and use googleCloudStorageOptions.getReadChannelOptions() in open below below.
There was a problem hiding this comment.
yeah GoogleCloudStorageImpl is set up in a slightly strange way -- it's constructed with a GoogleCloudStorageOptions arg but also accepts a separate GoogleCloudStorageOptions as an argument to open, while ignoring the former instance variable. Dropping the separate member variable here makes sense, will do 👍
|
Reminder, please take a look at this pr: @damondouglas |
|
R: @shunping (XQ suggested you to help review this) |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
ack. will take a look today |
|
@shunping , whenever you have a chance I'd appreciate any feedback on this! |
sdks/java/extensions/google-cloud-platform-core/src/test/resources/test-hadoop-conf.xml
Outdated
Show resolved
Hide resolved
shunping
left a comment
There was a problem hiding this comment.
Thanks for adding this option. I left some minor comments there, and overall it looks good.
After you finish revising, could you please sync to the current HEAD and so the tests are re-triggered? Thanks!
57c0b50 to
661a826
Compare
|
Running the failed precommit test again, though the failure seems unrelated to the code change here. |
| classesInPackage("com.google.api.services.storage"), | ||
| classesInPackage("com.google.auth"), | ||
| classesInPackage("com.fasterxml.jackson.annotation"), | ||
| classesInPackage("com.google.cloud.hadoop.gcsio"), |
There was a problem hiding this comment.
Looks like this breaks a test:
https://ge.apache.org/s/5ubkbvlvrjzgi/console-log/task/:sdks:java:extensions:google-cloud-platform-core:test?anchor=5&page=1
There was a problem hiding this comment.
thanks for tracking that down! Pushed a fix. one of the precommit tests is still failing though:
org.apache.beam.runners.dataflow.worker.streaming.harness.FanOutStreamingEngineWorkerHarnessTest > testOnNewWorkerMetadata_correctlyRemovesStaleWindmillServers FAILED
java.lang.AssertionError at GrpcCleanupRule.java:201
Not sure if/how this could be related to my PR
|
Run Java PreCommit |
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #32769 +/- ##
=========================================
Coverage 57.39% 57.39%
Complexity 1474 1474
=========================================
Files 970 970
Lines 154426 154426
Branches 1076 1076
=========================================
Hits 88637 88637
Misses 63585 63585
Partials 2204 2204 ☔ View full report in Codecov by Sentry. |
Context: I was reading GCS Parquet files via SplittableDoFn and noticed that
ReadableFile#openSeekabledoes not propagate any of the gcs-connector options specified in mycore-site.xmlfile. Particularly, I wanted to turn off fs.gs.inputstream.fast.fail.on.not.found.enable, which is redundant in a SDF with default empty-match treatment, and tweak fs.gs.inputstream.fadvise. It looks like theseGoogleCloudStorageReadOptionsoptions need to be set explicitly in GcsUtil, and passed to any GoogleCloudStorage#open calls (see reference).The big downside of this PR is of course, pulling in Hadoop :( The alternative is to manually copy-paste all the Configuration keys manually into
GcsUtil, which seems harder to maintain. Or, I could omit theGcsReadOptionsFactoryfactory logic entirely and leave it 100% up to the user to constructGoogleCloudStorageReadOptionsinstances.Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.