This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new d45136e6de6 CAMEL-21631 - Enable Checksum algorithm on S3 streaming
upload producer (#16849)
d45136e6de6 is described below
commit d45136e6de61c09cca934ba4a13d8ff2a6f5b52d
Author: Andrea Cosentino <[email protected]>
AuthorDate: Fri Jan 17 11:53:07 2025 +0100
CAMEL-21631 - Enable Checksum algorithm on S3 streaming upload producer
(#16849)
* CAMEL-21631 - Enable Checksum algorithm on S3 streaming upload producer
Signed-off-by: Andrea Cosentino <[email protected]>
* CAMEL-21631 - Enable Checksum algorithm on S3 streaming upload producer
Signed-off-by: Andrea Cosentino <[email protected]>
---------
Signed-off-by: Andrea Cosentino <[email protected]>
---
components/camel-aws/camel-aws2-s3/pom.xml | 5 +++++
.../aws2/s3/stream/AWS2S3StreamUploadProducer.java | 14 +++++++++-----
2 files changed, 14 insertions(+), 5 deletions(-)
diff --git a/components/camel-aws/camel-aws2-s3/pom.xml
b/components/camel-aws/camel-aws2-s3/pom.xml
index 6378926e0a6..d35fa75fd7f 100644
--- a/components/camel-aws/camel-aws2-s3/pom.xml
+++ b/components/camel-aws/camel-aws2-s3/pom.xml
@@ -45,6 +45,11 @@
<artifactId>s3</artifactId>
<version>${aws-java-sdk2-version}</version>
</dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>checksums</artifactId>
+ <version>${aws-java-sdk2-version}</version>
+ </dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>apache-client</artifactId>
diff --git
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
index 57c8459fb9c..0834341cf4a 100644
---
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
+++
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.BucketCannedACL;
+import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import
software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
@@ -53,6 +54,7 @@ import
software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
+import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
/**
@@ -68,6 +70,7 @@ public class AWS2S3StreamUploadProducer extends
DefaultProducer {
private final Lock lock = new ReentrantLock();
private transient String s3ProducerToString;
private ScheduledExecutorService timeoutCheckerExecutorService;
+ private ChecksumAlgorithm algorithm = ChecksumAlgorithm.CRC32;
public AWS2S3StreamUploadProducer(final Endpoint endpoint) {
super(endpoint);
@@ -194,7 +197,7 @@ public class AWS2S3StreamUploadProducer extends
DefaultProducer {
state.part, state.id);
CreateMultipartUploadRequest.Builder createMultipartUploadRequest
=
CreateMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName())
- .key(state.dynamicKeyName);
+
.key(state.dynamicKeyName).checksumAlgorithm(algorithm);
String storageClass = AWS2S3Utils.determineStorageClass(exchange,
getConfiguration());
if (storageClass != null) {
@@ -300,14 +303,15 @@ public class AWS2S3StreamUploadProducer extends
DefaultProducer {
private void uploadPart(UploadState state) {
UploadPartRequest uploadRequest =
UploadPartRequest.builder().bucket(getConfiguration().getBucketName())
.key(state.dynamicKeyName).uploadId(state.initResponse.uploadId())
- .partNumber(state.multipartIndex).build();
+
.partNumber(state.multipartIndex).checksumAlgorithm(algorithm).build();
LOG.trace("Uploading part {}, multipart {} at index {} for {}",
state.part, state.multipartIndex, state.index,
getConfiguration().getKeyName());
- String etag = getEndpoint().getS3Client()
- .uploadPart(uploadRequest,
RequestBody.fromBytes(state.buffer.toByteArray())).eTag();
- CompletedPart partUpload =
CompletedPart.builder().partNumber(state.multipartIndex).eTag(etag).build();
+ UploadPartResponse partResponse = getEndpoint().getS3Client()
+ .uploadPart(uploadRequest,
RequestBody.fromBytes(state.buffer.toByteArray()));
+ CompletedPart partUpload =
CompletedPart.builder().partNumber(state.multipartIndex)
+
.checksumCRC32(partResponse.checksumCRC32()).eTag(partResponse.eTag()).build();
state.completedParts.add(partUpload);
state.buffer.reset();
state.multipartIndex++;