gaborgsomogyi commented on code in PR #28136: URL: https://github.com/apache/flink/pull/28136#discussion_r3259552729
########## flink-filesystems/flink-s3-fs-native/README.md: ########## @@ -327,6 +327,53 @@ When enabled, file uploads automatically use TransferManager for: - Better utilization of available bandwidth - Lower heap requirements for write operations +## AWS Common Runtime (CRT) Support + +The filesystem optionally supports the [AWS Common Runtime (CRT)](https://github.com/awslabs/aws-crt-java) HTTP transport +for higher throughput on large S3 workloads. + +When enabled, the CRT transport replaces: +- **Sync client**: Apache HTTP Client → `AwsCrtHttpClient` +- **Async client**: Netty NIO → `S3AsyncClient.crtBuilder()` (with built-in multipart acceleration) + +### Prerequisites + +The `aws-crt` artifact contains JNI-linked native libraries whose C-side `FindClass` paths are +hardcoded, making Maven shade relocation incompatible. Therefore **CRT JARs are not bundled** in +the fat JAR and must be placed manually. + +### Setup (step-by-step) + +1. Download `aws-crt-client-<version>.jar` (`groupId: software.amazon.awssdk`) and + `aws-crt-<version>.jar` (`groupId: software.amazon.awssdk.crt`) for the same AWS SDK version + used by this module (check `fs.s3.aws.sdk.version` in `pom.xml`). + +2. Place both JARs in the Flink plugin directory alongside `flink-s3-fs-native.jar`: + + ```bash + cp aws-crt-client-<version>.jar $FLINK_HOME/plugins/s3-fs-native/ + cp aws-crt-<version>.jar $FLINK_HOME/plugins/s3-fs-native/ + ``` + +3. Enable CRT in your Flink configuration (`conf/config.yaml`): + + ```yaml + s3.crt.enabled: true + ``` + +4. Optionally tune the soft throughput target (default: 10.0 Gbps): Review Comment: I would remove optional things since we plan to add several new flags. ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java: ########## @@ -326,6 +345,11 @@ public static class Builder { // Custom credentials provider class names (comma-separated) @Nullable private String credentialsProviderClasses; + // CRT configuration + private boolean useCrt = false; + private double crtTargetThroughputGbps = 10.0; + private long crtMinPartSizeInBytes = 5L << 20; // 5MB Review Comment: The hardcoded `5L << 20` duplicates knowledge already encoded in `PART_UPLOAD_MIN_SIZE.defaultValue()`. If the builder is ever used outside the factory this default will silently diverge from the config. Suggest deriving it from the config option instead: ```java private long crtMinPartSizeInBytes = NativeS3FileSystemFactory.PART_UPLOAD_MIN_SIZE.defaultValue(); ``` That way there is a single source of truth and the comment explaining the magic number is no longer needed. ########## flink-filesystems/flink-s3-fs-native/README.md: ########## @@ -327,6 +327,53 @@ When enabled, file uploads automatically use TransferManager for: - Better utilization of available bandwidth - Lower heap requirements for write operations +## AWS Common Runtime (CRT) Support + +The filesystem optionally supports the [AWS Common Runtime (CRT)](https://github.com/awslabs/aws-crt-java) HTTP transport +for higher throughput on large S3 workloads. + +When enabled, the CRT transport replaces: +- **Sync client**: Apache HTTP Client → `AwsCrtHttpClient` +- **Async client**: Netty NIO → `S3AsyncClient.crtBuilder()` (with built-in multipart acceleration) + +### Prerequisites + +The `aws-crt` artifact contains JNI-linked native libraries whose C-side `FindClass` paths are +hardcoded, making Maven shade relocation incompatible. Therefore **CRT JARs are not bundled** in +the fat JAR and must be placed manually. + +### Setup (step-by-step) + +1. Download `aws-crt-client-<version>.jar` (`groupId: software.amazon.awssdk`) and + `aws-crt-<version>.jar` (`groupId: software.amazon.awssdk.crt`) for the same AWS SDK version + used by this module (check `fs.s3.aws.sdk.version` in `pom.xml`). Review Comment: The version guidance here is misleading: `software.amazon.awssdk.crt` (`aws-crt`) uses a completely independent versioning scheme (e.g. `0.33.x`) that does not align with the AWS SDK version (`2.x.y`). Telling users to download "the same AWS SDK version" will lead them to the wrong artifact. Two suggestions: 1. Pin the tested `aws-crt` version explicitly, or explain that the correct version can be resolved from `aws-crt-client-<version>.pom` → its `<dependency>` on `software.amazon.awssdk.crt:aws-crt`. 2. Consider shipping a small helper script (e.g. `tools/download-crt-jars.sh`) using `mvn dependency:get` or `curl` against Maven Central to fetch the exact compatible JARs and place them in the right directory — this would significantly reduce the friction of the manual setup step. ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java: ########## @@ -476,30 +515,60 @@ S3ClientProvider build() { .credentialsProvider(credentialsProvider) .region(awsRegion) .serviceConfiguration(s3Config) - .httpClientBuilder(httpClientBuilder) .overrideConfiguration(overrideConfig); + if (useCrt) { + clientBuilder.httpClientBuilder( + AwsCrtHttpClient.builder() + .maxConcurrency(maxConnections) + .connectionTimeout(connectionTimeout) + .connectionMaxIdleTime(connectionMaxIdleTime)); Review Comment: The CRT sync client omits `socketTimeout`. The Apache path sets `.socketTimeout(socketTimeout)` to bound how long a stalled read can block; `AwsCrtHttpClient.Builder` exposes `.readTimeout(Duration)` for exactly this purpose. Without it a slow connection can hang indefinitely. ```java AwsCrtHttpClient.builder() .maxConcurrency(maxConnections) .connectionTimeout(connectionTimeout) .connectionMaxIdleTime(connectionMaxIdleTime) .readTimeout(socketTimeout) ``` ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java: ########## @@ -460,6 +482,9 @@ public FileSystem create(URI fsUri) throws IOException { .maxRetries(config.get(MAX_RETRIES)) .credentialsProviderClasses(credentialsProviderClasses) .encryptionConfig(encryptionConfig) + .useCrt(config.get(CRT_ENABLED)) + .crtTargetThroughputGbps(config.get(CRT_TARGET_THROUGHPUT_GBPS)) + .crtMinPartSizeInBytes(config.get(PART_UPLOAD_MIN_SIZE)) Review Comment: There is no validation that `crtTargetThroughputGbps > 0`. Other connection-related values (`maxConnections`, `s3minPartSize`, etc.) are guarded with `Preconditions.checkArgument` a few lines above. Passing `0` or a negative value to the CRT builder will produce an opaque SDK exception. Suggest adding: ```java Preconditions.checkArgument( config.get(CRT_TARGET_THROUGHPUT_GBPS) > 0, "'%s' must be positive, but was %s", CRT_TARGET_THROUGHPUT_GBPS.key(), config.get(CRT_TARGET_THROUGHPUT_GBPS)); ``` ########## flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ClientProviderTest.java: ########## @@ -203,6 +203,27 @@ void testEmptyProviderStringThrows() { .hasMessageContaining("no valid provider class names"); } + @Test + void testCrtDisabledByDefault() { + S3ClientProvider provider = + S3ClientProvider.builder().endpoint(DUMMY_ENDPOINT).region(DUMMY_REGION).build(); + assertThat(provider.isUseCrt()).isFalse(); + } + + @Test + void testCrtFlagIsRecorded() { + S3ClientProvider provider = + S3ClientProvider.builder() + .endpoint(DUMMY_ENDPOINT) + .region(DUMMY_REGION) + .useCrt(true) + .crtTargetThroughputGbps(20.0) + .build(); + + assertThat(provider.isUseCrt()).isTrue(); + assertThat(provider.getCrtTargetThroughputGbps()).isEqualTo(20.0); Review Comment: This test only asserts that the builder stored the flag values — it does not verify that the CRT branch was actually taken during client construction. The `if (useCrt)` block could be silently deleted and this test would still pass. Consider adding a type-level assertion, e.g. via reflection on the `s3Client` or `asyncClient` field, to confirm that the constructed clients are actually CRT-backed when `useCrt=true`. ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java: ########## @@ -476,30 +515,60 @@ S3ClientProvider build() { .credentialsProvider(credentialsProvider) .region(awsRegion) .serviceConfiguration(s3Config) - .httpClientBuilder(httpClientBuilder) .overrideConfiguration(overrideConfig); + if (useCrt) { + clientBuilder.httpClientBuilder( + AwsCrtHttpClient.builder() + .maxConcurrency(maxConnections) + .connectionTimeout(connectionTimeout) + .connectionMaxIdleTime(connectionMaxIdleTime)); + } else { + clientBuilder.httpClientBuilder(httpClientBuilder); + } if (endpointUri != null) { clientBuilder.endpointOverride(endpointUri); } S3Client s3Client = clientBuilder.build(); - S3AsyncClientBuilder asyncClientBuilder = - S3AsyncClient.builder() - .credentialsProvider(credentialsProvider) - .region(awsRegion) - .serviceConfiguration(s3Config) - .httpClientBuilder( - NettyNioAsyncHttpClient.builder() - .maxConcurrency(maxConnections) - .connectionTimeout(connectionTimeout) - .readTimeout(socketTimeout) - .connectionAcquisitionTimeout(connectionTimeout)) - .overrideConfiguration(overrideConfig); - if (endpointUri != null) { - asyncClientBuilder.endpointOverride(endpointUri); + S3AsyncClient asyncClient; + if (useCrt) { + S3CrtAsyncClientBuilder crtAsyncBuilder = + S3AsyncClient.crtBuilder() + .credentialsProvider(credentialsProvider) + .region(awsRegion) + .forcePathStyle(pathStyleAccess) + .checksumValidationEnabled(checksumValidation) + .retryConfiguration( + S3CrtRetryConfiguration.builder() + .numRetries(maxRetries) + .build()) + .maxConcurrency(maxConnections) + .targetThroughputInGbps((double) crtTargetThroughputGbps) + .minimumPartSizeInBytes(crtMinPartSizeInBytes); + if (endpointUri != null) { + crtAsyncBuilder.endpointOverride(endpointUri); + } + asyncClient = crtAsyncBuilder.build(); Review Comment: The non-CRT async path applies `s3Config` (which carries `chunkedEncodingEnabled(chunkedEncoding)`), but `S3CrtAsyncClientBuilder` has no equivalent setter so the `chunkedEncoding` setting is silently dropped here. If CRT always manages wire encoding internally and the option is intentionally inapplicable, it is worth a short comment (or a note in the `s3.chunked-encoding.enabled` config-option description) so operators are not surprised when that setting has no effect in CRT mode. ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java: ########## @@ -476,30 +515,60 @@ S3ClientProvider build() { .credentialsProvider(credentialsProvider) .region(awsRegion) .serviceConfiguration(s3Config) - .httpClientBuilder(httpClientBuilder) .overrideConfiguration(overrideConfig); + if (useCrt) { + clientBuilder.httpClientBuilder( + AwsCrtHttpClient.builder() + .maxConcurrency(maxConnections) + .connectionTimeout(connectionTimeout) + .connectionMaxIdleTime(connectionMaxIdleTime)); + } else { + clientBuilder.httpClientBuilder(httpClientBuilder); + } Review Comment: Two related issues with the `build()` method: 1. `ApacheHttpClient.Builder httpClientBuilder` (a few lines above) is always constructed even in the CRT path where it is never used. It should move inside the `else` branch. 2. `build()` is now ~120 lines with credential setup, two interleaved CRT/non-CRT transport branches, and object wiring all mixed together. Consider extracting two private helpers: ```java private S3Client buildSyncClient(...) { /* CRT vs Apache */ } private S3AsyncClient buildAsyncClient(...) { /* CRT vs Netty */ } ``` This brings `build()` down to ~50 lines of orchestration and makes each transport path independently readable and testable. ########## flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ClientProviderTest.java: ########## @@ -203,6 +203,27 @@ void testEmptyProviderStringThrows() { .hasMessageContaining("no valid provider class names"); } + @Test + void testCrtDisabledByDefault() { + S3ClientProvider provider = + S3ClientProvider.builder().endpoint(DUMMY_ENDPOINT).region(DUMMY_REGION).build(); + assertThat(provider.isUseCrt()).isFalse(); + } + + @Test + void testCrtFlagIsRecorded() { + S3ClientProvider provider = + S3ClientProvider.builder() + .endpoint(DUMMY_ENDPOINT) + .region(DUMMY_REGION) + .useCrt(true) + .crtTargetThroughputGbps(20.0) + .build(); + + assertThat(provider.isUseCrt()).isTrue(); + assertThat(provider.getCrtTargetThroughputGbps()).isEqualTo(20.0); + } + @SuppressWarnings("unchecked") private static List<AwsCredentialsProvider> extractChain(AwsCredentialsProvider provider) Review Comment: There is no test covering the case where `s3.crt.enabled=true` is configured but the CRT JARs are absent from the classpath. Given the manual installation requirement this is one of the most likely operator mistakes, and without a test the friendly error path (if added) has no coverage. Consider a test that removes/mocks the CRT classes from the classloader and asserts an `IllegalStateException` with an actionable message is thrown. ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java: ########## @@ -476,30 +515,60 @@ S3ClientProvider build() { .credentialsProvider(credentialsProvider) .region(awsRegion) .serviceConfiguration(s3Config) - .httpClientBuilder(httpClientBuilder) .overrideConfiguration(overrideConfig); + if (useCrt) { + clientBuilder.httpClientBuilder( + AwsCrtHttpClient.builder() + .maxConcurrency(maxConnections) + .connectionTimeout(connectionTimeout) + .connectionMaxIdleTime(connectionMaxIdleTime)); + } else { + clientBuilder.httpClientBuilder(httpClientBuilder); + } Review Comment: If `s3.crt.enabled: true` is set but the CRT JARs are missing from the plugin directory, the failure will be a raw `NoClassDefFoundError` for `AwsCrtHttpClient` or `S3CrtAsyncClientBuilder` — very hard to diagnose given the manual installation requirement. Consider guarding the CRT branches with a try/catch and re-throwing a clear error: ```java try { clientBuilder.httpClientBuilder(AwsCrtHttpClient.builder()...); } catch (NoClassDefFoundError e) { throw new IllegalStateException( "CRT transport requested (s3.crt.enabled=true) but aws-crt-client and aws-crt JARs " + "are not on the classpath. Place them in $FLINK_HOME/plugins/s3-fs-native/.", e); } ``` ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java: ########## @@ -476,30 +515,60 @@ S3ClientProvider build() { .credentialsProvider(credentialsProvider) .region(awsRegion) .serviceConfiguration(s3Config) - .httpClientBuilder(httpClientBuilder) .overrideConfiguration(overrideConfig); + if (useCrt) { + clientBuilder.httpClientBuilder( + AwsCrtHttpClient.builder() + .maxConcurrency(maxConnections) + .connectionTimeout(connectionTimeout) + .connectionMaxIdleTime(connectionMaxIdleTime)); + } else { + clientBuilder.httpClientBuilder(httpClientBuilder); + } if (endpointUri != null) { clientBuilder.endpointOverride(endpointUri); } S3Client s3Client = clientBuilder.build(); - S3AsyncClientBuilder asyncClientBuilder = - S3AsyncClient.builder() - .credentialsProvider(credentialsProvider) - .region(awsRegion) - .serviceConfiguration(s3Config) - .httpClientBuilder( - NettyNioAsyncHttpClient.builder() - .maxConcurrency(maxConnections) - .connectionTimeout(connectionTimeout) - .readTimeout(socketTimeout) - .connectionAcquisitionTimeout(connectionTimeout)) - .overrideConfiguration(overrideConfig); - if (endpointUri != null) { - asyncClientBuilder.endpointOverride(endpointUri); + S3AsyncClient asyncClient; + if (useCrt) { + S3CrtAsyncClientBuilder crtAsyncBuilder = + S3AsyncClient.crtBuilder() + .credentialsProvider(credentialsProvider) + .region(awsRegion) + .forcePathStyle(pathStyleAccess) + .checksumValidationEnabled(checksumValidation) + .retryConfiguration( + S3CrtRetryConfiguration.builder() + .numRetries(maxRetries) + .build()) + .maxConcurrency(maxConnections) + .targetThroughputInGbps((double) crtTargetThroughputGbps) Review Comment: Redundant cast: `crtTargetThroughputGbps` is already declared as `double`, so `(double)` is a no-op. Just pass the field directly: ```java .targetThroughputInGbps(crtTargetThroughputGbps) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
