This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 11559d4de8f [CAMEL-20728] add S3 multipart upload support for stream
producer in camel-aws2-s3 (#14062)
11559d4de8f is described below
commit 11559d4de8fcd114900fabe1e790c1a6dd1c3d74
Author: Benjamin BONNET <[email protected]>
AuthorDate: Tue May 7 10:36:46 2024 +0200
[CAMEL-20728] add S3 multipart upload support for stream producer in
camel-aws2-s3 (#14062)
* add S3 multipart upload support for stream producer
* clean useless condition
* review feedback on logging
---
.../aws2/s3/stream/AWS2S3StreamUploadProducer.java | 72 +++++++++++----
.../integration/S3StreamUploadS3MultipartIT.java | 102 +++++++++++++++++++++
2 files changed, 154 insertions(+), 20 deletions(-)
diff --git
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
index 9fe8bb11042..8d23db6f1a5 100644
---
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
+++
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
@@ -127,21 +127,36 @@ public class AWS2S3StreamUploadProducer extends
DefaultProducer {
UploadState state = null;
int totalSize = 0;
byte[] b;
- while ((b = AWS2S3Utils.toByteArray(is,
getConfiguration().getBufferSize())).length > 0) {
+ int maxRead = (getConfiguration().isMultiPartUpload()
+ ? Math.toIntExact(getConfiguration().getPartSize()) :
getConfiguration().getBufferSize());
+ if (uploadAggregate != null) {
+ uploadAggregate.index++;
+ maxRead -= uploadAggregate.buffer.size();
+ }
+
+ while ((b = AWS2S3Utils.toByteArray(is, maxRead)).length
+ > 0) {
totalSize += b.length;
+ if (getConfiguration().isMultiPartUpload())
+ maxRead -= b.length;
synchronized (lock) {
// aggregate with previously received exchanges
if (ObjectHelper.isNotEmpty(uploadAggregate)) {
uploadAggregate.buffer.write(b);
- uploadAggregate.index++;
-
+ if (getConfiguration().isMultiPartUpload() &&
+ uploadAggregate.buffer.size() >=
getConfiguration().getPartSize()) {
+ uploadPart(uploadAggregate);
+ maxRead =
Math.toIntExact(getConfiguration().getPartSize());
+ continue;
+ }
if (uploadAggregate.buffer.size() >=
getConfiguration().getBatchSize()
- || uploadAggregate.index ==
getConfiguration().getBatchMessageNumber()) {
+ || (uploadAggregate.index >=
getConfiguration().getBatchMessageNumber()
+ && uploadAggregate.buffer.size() <
getConfiguration().getPartSize())) {
- uploadPart(uploadAggregate);
+ if (uploadAggregate.buffer.size() > 0)
+ uploadPart(uploadAggregate);
CompleteMultipartUploadResponse uploadResult =
completeUpload(uploadAggregate);
this.uploadAggregate = null;
-
Message message = getMessageForResponse(exchange);
message.setHeader(AWS2S3Constants.E_TAG,
uploadResult.eTag());
if (uploadResult.versionId() != null) {
@@ -151,11 +166,10 @@ public class AWS2S3StreamUploadProducer extends
DefaultProducer {
continue;
}
}
-
if (state == null) {
state = new UploadState();
} else {
- state.index++;
+ state.index = 1;
}
state.buffer.write(b);
@@ -201,17 +215,21 @@ public class AWS2S3StreamUploadProducer extends
DefaultProducer {
try {
if (totalSize >= getConfiguration().getBatchSize()
|| state.buffer.size() >=
getConfiguration().getBatchSize()
- || state.index ==
getConfiguration().getBatchMessageNumber()) {
+ || state.index >=
getConfiguration().getBatchMessageNumber()) {
uploadPart(state);
CompleteMultipartUploadResponse uploadResult =
completeUpload(state);
-
Message message = getMessageForResponse(exchange);
message.setHeader(AWS2S3Constants.E_TAG,
uploadResult.eTag());
if (uploadResult.versionId() != null) {
message.setHeader(AWS2S3Constants.VERSION_ID,
uploadResult.versionId());
}
state = null;
+ continue;
+ }
+ if (getConfiguration().isMultiPartUpload() &&
state.buffer.size() >= getConfiguration().getPartSize()) {
+ uploadPart(state);
+ maxRead =
Math.toIntExact(getConfiguration().getPartSize());
}
} catch (Exception e) {
@@ -244,29 +262,41 @@ public class AWS2S3StreamUploadProducer extends
DefaultProducer {
.uploadId(state.initResponse.uploadId())
.build();
- CompleteMultipartUploadResponse uploadResult =
getEndpoint().getS3Client().completeMultipartUpload(compRequest);
+ try {
+ final CompleteMultipartUploadResponse uploadResult
+ =
getEndpoint().getS3Client().completeMultipartUpload(compRequest);
- // Converting the index to String can cause extra overhead
- if (LOG.isInfoEnabled()) {
- LOG.info("Completed upload for the part {} with etag {} at index
{}", part, uploadResult.eTag(),
- state.index);
+ // Converting the index to String can cause extra overhead
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Completed upload for the part {}, multipart {} with
etag {} at index {}", part, state.multipartIndex,
+ uploadResult.eTag(),
+ state.index);
+ }
+ part.getAndIncrement();
+ return uploadResult;
+ } catch (Exception e) {
+ LOG.warn("Error completing multipart upload - Multipart upload
will be aborted", e);
+ getEndpoint().getS3Client()
+
.abortMultipartUpload(AbortMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName())
+
.key(state.dynamicKeyName).uploadId(state.initResponse.uploadId()).build());
+ throw e;
}
- return uploadResult;
}
private void uploadPart(UploadState state) {
UploadPartRequest uploadRequest =
UploadPartRequest.builder().bucket(getConfiguration().getBucketName())
.key(state.dynamicKeyName).uploadId(state.initResponse.uploadId())
- .partNumber(state.index).build();
+ .partNumber(state.multipartIndex).build();
- LOG.trace("Uploading part {} at index {} for {}", state.part,
state.index, getConfiguration().getKeyName());
+ LOG.trace("Uploading part {}, multipart {} at index {} for {}",
state.part, state.multipartIndex, state.index,
+ getConfiguration().getKeyName());
String etag = getEndpoint().getS3Client()
.uploadPart(uploadRequest,
RequestBody.fromBytes(state.buffer.toByteArray())).eTag();
- CompletedPart partUpload =
CompletedPart.builder().partNumber(state.index).eTag(etag).build();
+ CompletedPart partUpload =
CompletedPart.builder().partNumber(state.multipartIndex).eTag(etag).build();
state.completedParts.add(partUpload);
state.buffer.reset();
- part.getAndIncrement();
+ state.multipartIndex++;
}
private String fileNameToUpload(
@@ -360,6 +390,7 @@ public class AWS2S3StreamUploadProducer extends
DefaultProducer {
private class UploadState {
int index;
+ int multipartIndex;
int part;
List<CompletedPart> completedParts = new ArrayList<>();
ByteArrayOutputStream buffer;
@@ -369,6 +400,7 @@ public class AWS2S3StreamUploadProducer extends
DefaultProducer {
UploadState() {
index = 1;
+ multipartIndex = 1;
part = AWS2S3StreamUploadProducer.this.part.get();
buffer = new ByteArrayOutputStream();
}
diff --git
a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadS3MultipartIT.java
b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadS3MultipartIT.java
new file mode 100644
index 00000000000..c78817420ba
--- /dev/null
+++
b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadS3MultipartIT.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.s3.integration;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.aws2.s3.AWS2S3Constants;
+import org.apache.camel.component.aws2.s3.AWS2S3Operations;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class S3StreamUploadS3MultipartIT extends Aws2S3Base {
+
+ @EndpointInject
+ private ProducerTemplate template;
+
+ @EndpointInject("mock:result")
+ private MockEndpoint result;
+
+ @Test
+ public void sendIn() throws Exception {
+ result.expectedMessageCount(10);
+
+ for (int i = 0; i < 10; i++) {
+ template.send("direct:stream1", new Processor() {
+
+ @Override
+ public void process(Exchange exchange) {
+ exchange.getIn().setHeader(AWS2S3Constants.KEY,
"empty.bin");
+ exchange.getIn().setBody(new
File("src/test/resources/empty.bin"));
+ }
+ });
+ }
+
+ Exchange ex = template.request("direct:listObjects", new Processor() {
+
+ @Override
+ public void process(Exchange exchange) {
+ exchange.getIn().setHeader(AWS2S3Constants.S3_OPERATION,
AWS2S3Operations.listObjects);
+ }
+ });
+
+ List<S3Object> resp = ex.getMessage().getBody(List.class);
+ // expect 1 file
+ assertEquals(1, resp.size());
+ // file size: 5,242,880 bytes
+ assertEquals(10 *
Files.size(Paths.get("src/test/resources/empty.bin")),
+ resp.stream().mapToLong(S3Object::size).sum());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ String awsEndpoint1
+ = "aws2-s3://mycamel-1?autoCreateBucket=true" +
+ "&streamingUploadMode=true" +
+ "&keyName=fileTest.txt" +
+ "&batchMessageNumber=10" +
+ "&batchSize=1000000000" +
+ "&namingStrategy=random" +
+ "&multiPartUpload=true" +
+ "&bufferSize=0" +
+ "&partSize=10000000";
+
+ from("direct:stream1").process(exchange -> {
+ }).to(awsEndpoint1).process(exchange -> {
+ }).to("mock:result");
+
+ String awsEndpoint =
"aws2-s3://mycamel-1?autoCreateBucket=true";
+
+ from("direct:listObjects").to(awsEndpoint);
+ }
+ };
+ }
+}