steveloughran commented on code in PR #7295:
URL: https://github.com/apache/hadoop/pull/7295#discussion_r1923552755
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java:
##########
@@ -24,28 +24,29 @@
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters;
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
Review Comment:
keep in the "not apache, not java" imports. (there's often some hadoop
precondition or org.apache.hadoop.thirdparty imports in that block -traces of
the "get off guava" work)
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -1934,14 +1902,6 @@ private FSDataInputStream executeOpen(
true,
inputStreamStats);
- if (this.analyticsAcceleratorEnabled) {
Review Comment:
nit: we don't use the `this.` prefix except in the constructor or, where
needed, setter/builder methods -i.e. where it is needed to distinguish
parameters. Nobody should be using variables whose names conflict with fields,
so it is unambiguous everywhere else
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java:
##########
@@ -123,7 +123,7 @@ public void testInvalidConfigurationThrows() {
ConnectorConfiguration connectorConfiguration =
new ConnectorConfiguration(conf,
ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
- assertThrows("S3ASeekableStream illegal configuration does not throw",
+ assertThrows("S3ASeekableInputStream illegal configuration does not throw",
Review Comment:
use `LambdaTestUtils.intercept()`, here with the closure
* closing any stream created -so if it didn't throw, we clean up
* return stream.toString()
the return value of the closure is used in the exception text...it is where
diagnostics should go
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableInputStream.java:
##########
@@ -30,20 +30,18 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.FSInputStream;
-
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
import software.amazon.s3.analyticsaccelerator.util.S3URI;
-public class S3ASeekableStream extends ObjectInputStream implements
StreamCapabilities {
+public class S3ASeekableInputStream extends ObjectInputStream implements
StreamCapabilities {
Review Comment:
* move to org.apache.hadoop.fs.s3a.impl.streams
* I'd rather a better name like AnalyticsStream; seekable input stream isn't
clear enough about what it delivers
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java:
##########
@@ -94,10 +94,10 @@ public void testConnectorFrameworkConfigurable(boolean
useCrtClient) {
S3SeekableInputStreamConfiguration configuration =
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration);
- assertSame("S3ASeekableStream configuration is not set to expected value",
+ assertSame("S3ASeekableInputStream configuration is not set to expected
value",
PrefetchMode.ALL,
configuration.getLogicalIOConfiguration().getPrefetchingMode());
- assertEquals("S3ASeekableStream configuration is not set to expected
value",
+ assertEquals("S3ASeekableInputStream configuration is not set to expected
value",
Review Comment:
new tests should be using AssertJ asserts, unless there's a good reason not
to (maintenance of old test suites, mainly)
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java:
##########
@@ -230,7 +232,23 @@ public class S3AStoreImpl
@Override
protected void serviceInit(final Configuration conf) throws Exception {
- objectInputStreamFactory = createStreamFactory(conf);
+ if(conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY,
ANALYTICS_ACCELERATOR_ENABLED_DEFAULT)) {
Review Comment:
Can you comment on this in that PR, ask for `ClientManager` to be passed in.
This will have to come after serviceInit, as it won't exist until then.
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java:
##########
@@ -230,7 +232,23 @@ public class S3AStoreImpl
@Override
protected void serviceInit(final Configuration conf) throws Exception {
- objectInputStreamFactory = createStreamFactory(conf);
+ if(conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY,
ANALYTICS_ACCELERATOR_ENABLED_DEFAULT)) {
+ boolean analyticsAcceleratorCRTEnabled =
conf.getBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED,
+ ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT);
+ final S3AsyncClient s3AsyncClient;
+ LOG.info("Using S3SeekableInputStream");
+ if(analyticsAcceleratorCRTEnabled) {
+ LOG.info("Using S3 CRT client for analytics accelerator S3");
+ s3AsyncClient = S3CrtAsyncClient.builder().maxConcurrency(600).build();
Review Comment:
OK for the WiP, but it will be culled later
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]