exceptionfactory commented on code in PR #9286:
URL: https://github.com/apache/nifi/pull/9286#discussion_r1779607458
##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -119,28 +145,127 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
final String destinationBucket =
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String destinationKey =
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
+ final GetObjectMetadataRequest sourceMetadataRequest = new
GetObjectMetadataRequest(sourceBucket, sourceKey);
+ final ObjectMetadata metadataResult =
s3.getObjectMetadata(sourceMetadataRequest);
+
+ if (metadataResult == null) {
+ session.transfer(flowFile, REL_FAILURE);
+ }
+
+ final AtomicReference<String> multipartIdRef = new AtomicReference<>();
+ final long contentLength = metadataResult.getContentLength();
+ final boolean isMultiPart = metadataResult.getContentLength() >
MULTIPART_THRESHOLD;
Review Comment:
Recommend renaming this:
```suggestion
final boolean multipartUploadRequired =
metadataResult.getContentLength() > MULTIPART_THRESHOLD;
```
##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -119,28 +145,127 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
final String destinationBucket =
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String destinationKey =
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
+ final GetObjectMetadataRequest sourceMetadataRequest = new
GetObjectMetadataRequest(sourceBucket, sourceKey);
+ final ObjectMetadata metadataResult =
s3.getObjectMetadata(sourceMetadataRequest);
+
+ if (metadataResult == null) {
+ session.transfer(flowFile, REL_FAILURE);
+ }
+
+ final AtomicReference<String> multipartIdRef = new AtomicReference<>();
+ final long contentLength = metadataResult.getContentLength();
+ final boolean isMultiPart = metadataResult.getContentLength() >
MULTIPART_THRESHOLD;
+
try {
- final CopyObjectRequest request = new
CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
final AccessControlList acl = createACL(context, flowFile);
- if (acl != null) {
- request.setAccessControlList(acl);
- }
-
final CannedAccessControlList cannedAccessControlList =
createCannedACL(context, flowFile);
- if (cannedAccessControlList != null) {
- request.setCannedAccessControlList(cannedAccessControlList);
- }
- s3.copyObject(request);
+ if (!isMultiPart) {
+ smallFileCopy(s3, acl, cannedAccessControlList, sourceBucket,
sourceKey, destinationBucket, destinationKey);
+ } else {
+ final int retryLimit =
context.getProperty(MULTIPART_RETRIES).asInteger();
+ largeFileCopy(s3, acl, cannedAccessControlList, sourceBucket,
sourceKey, destinationBucket,
+ destinationKey, multipartIdRef, contentLength,
retryLimit);
+ }
session.getProvenanceReporter().send(flowFile,
getTransitUrl(destinationBucket, destinationKey));
session.transfer(flowFile, REL_SUCCESS);
- } catch (final IllegalArgumentException | AmazonClientException e) {
+ } catch (final ProcessException | IllegalArgumentException |
AmazonClientException e) {
+ if (isMultiPart && !StringUtils.isEmpty(multipartIdRef.get())) {
+ try {
+ AbortMultipartUploadRequest abortRequest = new
AbortMultipartUploadRequest(destinationBucket, destinationKey,
multipartIdRef.get());
+ s3.abortMultipartUpload(abortRequest);
+ } catch (AmazonS3Exception ignored) {
+ getLogger().error("Failed to cleanup the partial upload to
bucket {} and key {}", destinationBucket, destinationKey);
+ getLogger().error("Abort exception", ignored);
+ }
+ }
+
flowFile = extractExceptionDetails(e, session, flowFile);
getLogger().error("Failed to copy S3 object from Bucket [{}] Key
[{}]", sourceBucket, sourceKey, e);
session.transfer(flowFile, REL_FAILURE);
}
}
+ /*
+ * Sections of this code were derived from example code from the official
AWS S3 documentation. Specifically this example:
+ *
https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/java/example_code/s3/src/main/java/aws/example/s3/LowLevelMultipartCopy.java
+ */
+ private void largeFileCopy(final AmazonS3Client s3, final
AccessControlList acl, final CannedAccessControlList cannedAccessControlList,
Review Comment:
```suggestion
private void copyMultipart(final AmazonS3Client s3, final
AccessControlList acl, final CannedAccessControlList cannedAccessControlList,
```
##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -119,28 +145,127 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
final String destinationBucket =
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String destinationKey =
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
+ final GetObjectMetadataRequest sourceMetadataRequest = new
GetObjectMetadataRequest(sourceBucket, sourceKey);
+ final ObjectMetadata metadataResult =
s3.getObjectMetadata(sourceMetadataRequest);
+
+ if (metadataResult == null) {
+ session.transfer(flowFile, REL_FAILURE);
+ }
+
+ final AtomicReference<String> multipartIdRef = new AtomicReference<>();
+ final long contentLength = metadataResult.getContentLength();
+ final boolean isMultiPart = metadataResult.getContentLength() >
MULTIPART_THRESHOLD;
+
try {
- final CopyObjectRequest request = new
CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
final AccessControlList acl = createACL(context, flowFile);
- if (acl != null) {
- request.setAccessControlList(acl);
- }
-
final CannedAccessControlList cannedAccessControlList =
createCannedACL(context, flowFile);
- if (cannedAccessControlList != null) {
- request.setCannedAccessControlList(cannedAccessControlList);
- }
- s3.copyObject(request);
+ if (!isMultiPart) {
Review Comment:
I recommend reversing the conditional so that is reads more simply as:
```
if (multipartUploadRequired) {
copyMultipart();
} else {
copy();
}
```
##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -119,28 +145,127 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
final String destinationBucket =
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String destinationKey =
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
+ final GetObjectMetadataRequest sourceMetadataRequest = new
GetObjectMetadataRequest(sourceBucket, sourceKey);
+ final ObjectMetadata metadataResult =
s3.getObjectMetadata(sourceMetadataRequest);
+
+ if (metadataResult == null) {
+ session.transfer(flowFile, REL_FAILURE);
+ }
+
+ final AtomicReference<String> multipartIdRef = new AtomicReference<>();
+ final long contentLength = metadataResult.getContentLength();
+ final boolean isMultiPart = metadataResult.getContentLength() >
MULTIPART_THRESHOLD;
+
try {
- final CopyObjectRequest request = new
CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
final AccessControlList acl = createACL(context, flowFile);
- if (acl != null) {
- request.setAccessControlList(acl);
- }
-
final CannedAccessControlList cannedAccessControlList =
createCannedACL(context, flowFile);
- if (cannedAccessControlList != null) {
- request.setCannedAccessControlList(cannedAccessControlList);
- }
- s3.copyObject(request);
+ if (!isMultiPart) {
+ smallFileCopy(s3, acl, cannedAccessControlList, sourceBucket,
sourceKey, destinationBucket, destinationKey);
+ } else {
+ final int retryLimit =
context.getProperty(MULTIPART_RETRIES).asInteger();
+ largeFileCopy(s3, acl, cannedAccessControlList, sourceBucket,
sourceKey, destinationBucket,
+ destinationKey, multipartIdRef, contentLength,
retryLimit);
+ }
session.getProvenanceReporter().send(flowFile,
getTransitUrl(destinationBucket, destinationKey));
session.transfer(flowFile, REL_SUCCESS);
- } catch (final IllegalArgumentException | AmazonClientException e) {
+ } catch (final ProcessException | IllegalArgumentException |
AmazonClientException e) {
Review Comment:
Recommend making this a general `Exception` catch.
##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -119,28 +145,127 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
final String destinationBucket =
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String destinationKey =
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
+ final GetObjectMetadataRequest sourceMetadataRequest = new
GetObjectMetadataRequest(sourceBucket, sourceKey);
+ final ObjectMetadata metadataResult =
s3.getObjectMetadata(sourceMetadataRequest);
+
+ if (metadataResult == null) {
+ session.transfer(flowFile, REL_FAILURE);
+ }
+
+ final AtomicReference<String> multipartIdRef = new AtomicReference<>();
+ final long contentLength = metadataResult.getContentLength();
+ final boolean isMultiPart = metadataResult.getContentLength() >
MULTIPART_THRESHOLD;
+
try {
- final CopyObjectRequest request = new
CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
final AccessControlList acl = createACL(context, flowFile);
- if (acl != null) {
- request.setAccessControlList(acl);
- }
-
final CannedAccessControlList cannedAccessControlList =
createCannedACL(context, flowFile);
- if (cannedAccessControlList != null) {
- request.setCannedAccessControlList(cannedAccessControlList);
- }
- s3.copyObject(request);
+ if (!isMultiPart) {
+ smallFileCopy(s3, acl, cannedAccessControlList, sourceBucket,
sourceKey, destinationBucket, destinationKey);
+ } else {
+ final int retryLimit =
context.getProperty(MULTIPART_RETRIES).asInteger();
+ largeFileCopy(s3, acl, cannedAccessControlList, sourceBucket,
sourceKey, destinationBucket,
+ destinationKey, multipartIdRef, contentLength,
retryLimit);
+ }
session.getProvenanceReporter().send(flowFile,
getTransitUrl(destinationBucket, destinationKey));
session.transfer(flowFile, REL_SUCCESS);
- } catch (final IllegalArgumentException | AmazonClientException e) {
+ } catch (final ProcessException | IllegalArgumentException |
AmazonClientException e) {
+ if (isMultiPart && !StringUtils.isEmpty(multipartIdRef.get())) {
+ try {
+ AbortMultipartUploadRequest abortRequest = new
AbortMultipartUploadRequest(destinationBucket, destinationKey,
multipartIdRef.get());
+ s3.abortMultipartUpload(abortRequest);
+ } catch (AmazonS3Exception ignored) {
+ getLogger().error("Failed to cleanup the partial upload to
bucket {} and key {}", destinationBucket, destinationKey);
+ getLogger().error("Abort exception", ignored);
+ }
+ }
+
flowFile = extractExceptionDetails(e, session, flowFile);
getLogger().error("Failed to copy S3 object from Bucket [{}] Key
[{}]", sourceBucket, sourceKey, e);
session.transfer(flowFile, REL_FAILURE);
}
}
+ /*
+ * Sections of this code were derived from example code from the official
AWS S3 documentation. Specifically this example:
+ *
https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/java/example_code/s3/src/main/java/aws/example/s3/LowLevelMultipartCopy.java
+ */
+ private void largeFileCopy(final AmazonS3Client s3, final
AccessControlList acl, final CannedAccessControlList cannedAccessControlList,
Review Comment:
Recommend separating these arguments one per line for improved readability.
##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -119,28 +145,127 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
final String destinationBucket =
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String destinationKey =
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
+ final GetObjectMetadataRequest sourceMetadataRequest = new
GetObjectMetadataRequest(sourceBucket, sourceKey);
+ final ObjectMetadata metadataResult =
s3.getObjectMetadata(sourceMetadataRequest);
+
+ if (metadataResult == null) {
+ session.transfer(flowFile, REL_FAILURE);
+ }
+
+ final AtomicReference<String> multipartIdRef = new AtomicReference<>();
+ final long contentLength = metadataResult.getContentLength();
+ final boolean isMultiPart = metadataResult.getContentLength() >
MULTIPART_THRESHOLD;
+
try {
- final CopyObjectRequest request = new
CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
final AccessControlList acl = createACL(context, flowFile);
- if (acl != null) {
- request.setAccessControlList(acl);
- }
-
final CannedAccessControlList cannedAccessControlList =
createCannedACL(context, flowFile);
- if (cannedAccessControlList != null) {
- request.setCannedAccessControlList(cannedAccessControlList);
- }
- s3.copyObject(request);
+ if (!isMultiPart) {
+ smallFileCopy(s3, acl, cannedAccessControlList, sourceBucket,
sourceKey, destinationBucket, destinationKey);
+ } else {
+ final int retryLimit =
context.getProperty(MULTIPART_RETRIES).asInteger();
+ largeFileCopy(s3, acl, cannedAccessControlList, sourceBucket,
sourceKey, destinationBucket,
+ destinationKey, multipartIdRef, contentLength,
retryLimit);
+ }
session.getProvenanceReporter().send(flowFile,
getTransitUrl(destinationBucket, destinationKey));
session.transfer(flowFile, REL_SUCCESS);
- } catch (final IllegalArgumentException | AmazonClientException e) {
+ } catch (final ProcessException | IllegalArgumentException |
AmazonClientException e) {
+ if (isMultiPart && !StringUtils.isEmpty(multipartIdRef.get())) {
+ try {
+ AbortMultipartUploadRequest abortRequest = new
AbortMultipartUploadRequest(destinationBucket, destinationKey,
multipartIdRef.get());
+ s3.abortMultipartUpload(abortRequest);
+ } catch (AmazonS3Exception ignored) {
+ getLogger().error("Failed to cleanup the partial upload to
bucket {} and key {}", destinationBucket, destinationKey);
+ getLogger().error("Abort exception", ignored);
+ }
+ }
+
flowFile = extractExceptionDetails(e, session, flowFile);
getLogger().error("Failed to copy S3 object from Bucket [{}] Key
[{}]", sourceBucket, sourceKey, e);
session.transfer(flowFile, REL_FAILURE);
}
}
+ /*
+ * Sections of this code were derived from example code from the official
AWS S3 documentation. Specifically this example:
+ *
https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/java/example_code/s3/src/main/java/aws/example/s3/LowLevelMultipartCopy.java
+ */
+ private void largeFileCopy(final AmazonS3Client s3, final
AccessControlList acl, final CannedAccessControlList cannedAccessControlList,
+ final String sourceBucket, final String
sourceKey,
+ final String destinationBucket, final String
destinationKey, final AtomicReference<String> multipartIdRef,
+ final long contentLength, final int retryLimit)
{
+ InitiateMultipartUploadRequest initRequest = new
InitiateMultipartUploadRequest(destinationBucket,
+ destinationKey);
+ if (acl != null) {
+ initRequest.setAccessControlList(acl);
+ }
+ if (cannedAccessControlList != null) {
+ initRequest.setCannedACL(cannedAccessControlList);
+ }
+
+ InitiateMultipartUploadResult initResult =
s3.initiateMultipartUpload(initRequest);
+
+ multipartIdRef.set(initResult.getUploadId());
+
+ long bytePosition = 0;
+ int partNumber = 1;
+ List<CopyPartResult> responses = new ArrayList<>();
+ while (bytePosition < contentLength) {
+ long lastByte = Math.min(bytePosition + MULTIPART_THRESHOLD - 1,
contentLength - 1);
+
+ CopyPartRequest copyRequest = new CopyPartRequest()
Review Comment:
```suggestion
final CopyPartRequest copyPartRequest = new CopyPartRequest()
```
##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -119,28 +145,127 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
final String destinationBucket =
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String destinationKey =
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
+ final GetObjectMetadataRequest sourceMetadataRequest = new
GetObjectMetadataRequest(sourceBucket, sourceKey);
+ final ObjectMetadata metadataResult =
s3.getObjectMetadata(sourceMetadataRequest);
+
+ if (metadataResult == null) {
+ session.transfer(flowFile, REL_FAILURE);
+ }
+
+ final AtomicReference<String> multipartIdRef = new AtomicReference<>();
+ final long contentLength = metadataResult.getContentLength();
+ final boolean isMultiPart = metadataResult.getContentLength() >
MULTIPART_THRESHOLD;
+
try {
- final CopyObjectRequest request = new
CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
final AccessControlList acl = createACL(context, flowFile);
- if (acl != null) {
- request.setAccessControlList(acl);
- }
-
final CannedAccessControlList cannedAccessControlList =
createCannedACL(context, flowFile);
- if (cannedAccessControlList != null) {
- request.setCannedAccessControlList(cannedAccessControlList);
- }
- s3.copyObject(request);
+ if (!isMultiPart) {
+ smallFileCopy(s3, acl, cannedAccessControlList, sourceBucket,
sourceKey, destinationBucket, destinationKey);
+ } else {
+ final int retryLimit =
context.getProperty(MULTIPART_RETRIES).asInteger();
+ largeFileCopy(s3, acl, cannedAccessControlList, sourceBucket,
sourceKey, destinationBucket,
+ destinationKey, multipartIdRef, contentLength,
retryLimit);
+ }
session.getProvenanceReporter().send(flowFile,
getTransitUrl(destinationBucket, destinationKey));
session.transfer(flowFile, REL_SUCCESS);
- } catch (final IllegalArgumentException | AmazonClientException e) {
+ } catch (final ProcessException | IllegalArgumentException |
AmazonClientException e) {
+ if (isMultiPart && !StringUtils.isEmpty(multipartIdRef.get())) {
+ try {
+ AbortMultipartUploadRequest abortRequest = new
AbortMultipartUploadRequest(destinationBucket, destinationKey,
multipartIdRef.get());
+ s3.abortMultipartUpload(abortRequest);
+ } catch (AmazonS3Exception ignored) {
+ getLogger().error("Failed to cleanup the partial upload to
bucket {} and key {}", destinationBucket, destinationKey);
+ getLogger().error("Abort exception", ignored);
+ }
+ }
+
flowFile = extractExceptionDetails(e, session, flowFile);
getLogger().error("Failed to copy S3 object from Bucket [{}] Key
[{}]", sourceBucket, sourceKey, e);
session.transfer(flowFile, REL_FAILURE);
}
}
+ /*
+ * Sections of this code were derived from example code from the official
AWS S3 documentation. Specifically this example:
+ *
https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/java/example_code/s3/src/main/java/aws/example/s3/LowLevelMultipartCopy.java
+ */
+ private void largeFileCopy(final AmazonS3Client s3, final
AccessControlList acl, final CannedAccessControlList cannedAccessControlList,
+ final String sourceBucket, final String
sourceKey,
+ final String destinationBucket, final String
destinationKey, final AtomicReference<String> multipartIdRef,
+ final long contentLength, final int retryLimit)
{
+ InitiateMultipartUploadRequest initRequest = new
InitiateMultipartUploadRequest(destinationBucket,
+ destinationKey);
+ if (acl != null) {
+ initRequest.setAccessControlList(acl);
+ }
+ if (cannedAccessControlList != null) {
+ initRequest.setCannedACL(cannedAccessControlList);
+ }
+
+ InitiateMultipartUploadResult initResult =
s3.initiateMultipartUpload(initRequest);
Review Comment:
```suggestion
final InitiateMultipartUploadResult initResult =
s3.initiateMultipartUpload(initRequest);
```
##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -69,11 +85,21 @@ public class CopyS3Object extends AbstractS3Processor {
.defaultValue("${filename}-1")
.build();
+ static final PropertyDescriptor MULTIPART_RETRIES = new
PropertyDescriptor.Builder()
+ .name("Retry Attempt Limit")
+ .description("This configures the number of retries that will be
attempted when a part upload request " +
+ "on files larger than 5GB encounter a 503/Slow Down
error.")
+ .defaultValue("3")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .required(true)
+ .build();
Review Comment:
For this initial change, I think it is better to set a hard-coded retry of
3. Standard NiFi retry handling can support retrying the entire process, so
having an internal retry attempt of 3 for individual part operations seems
reasonable. This keeps the processor simpler from a user perspective.
##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -119,28 +145,127 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
final String destinationBucket =
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String destinationKey =
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
+ final GetObjectMetadataRequest sourceMetadataRequest = new
GetObjectMetadataRequest(sourceBucket, sourceKey);
+ final ObjectMetadata metadataResult =
s3.getObjectMetadata(sourceMetadataRequest);
+
+ if (metadataResult == null) {
+ session.transfer(flowFile, REL_FAILURE);
+ }
+
+ final AtomicReference<String> multipartIdRef = new AtomicReference<>();
+ final long contentLength = metadataResult.getContentLength();
+ final boolean isMultiPart = metadataResult.getContentLength() >
MULTIPART_THRESHOLD;
+
try {
- final CopyObjectRequest request = new
CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
final AccessControlList acl = createACL(context, flowFile);
- if (acl != null) {
- request.setAccessControlList(acl);
- }
-
final CannedAccessControlList cannedAccessControlList =
createCannedACL(context, flowFile);
- if (cannedAccessControlList != null) {
- request.setCannedAccessControlList(cannedAccessControlList);
- }
- s3.copyObject(request);
+ if (!isMultiPart) {
+ smallFileCopy(s3, acl, cannedAccessControlList, sourceBucket,
sourceKey, destinationBucket, destinationKey);
+ } else {
+ final int retryLimit =
context.getProperty(MULTIPART_RETRIES).asInteger();
+ largeFileCopy(s3, acl, cannedAccessControlList, sourceBucket,
sourceKey, destinationBucket,
+ destinationKey, multipartIdRef, contentLength,
retryLimit);
+ }
session.getProvenanceReporter().send(flowFile,
getTransitUrl(destinationBucket, destinationKey));
session.transfer(flowFile, REL_SUCCESS);
- } catch (final IllegalArgumentException | AmazonClientException e) {
+ } catch (final ProcessException | IllegalArgumentException |
AmazonClientException e) {
+ if (isMultiPart && !StringUtils.isEmpty(multipartIdRef.get())) {
+ try {
+ AbortMultipartUploadRequest abortRequest = new
AbortMultipartUploadRequest(destinationBucket, destinationKey,
multipartIdRef.get());
+ s3.abortMultipartUpload(abortRequest);
+ } catch (AmazonS3Exception ignored) {
+ getLogger().error("Failed to cleanup the partial upload to
bucket {} and key {}", destinationBucket, destinationKey);
+ getLogger().error("Abort exception", ignored);
Review Comment:
This should be consolidated, also recommend changing this to a warning:
```suggestion
} catch (final AmazonS3Exception s3e) {
getLogger().warn("Abort Multipart Upload failed for
Bucket [{}] Key [{}]", destinationBucket, destinationKey, s3e);
##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -119,28 +145,127 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
final String destinationBucket =
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String destinationKey =
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
+ final GetObjectMetadataRequest sourceMetadataRequest = new
GetObjectMetadataRequest(sourceBucket, sourceKey);
+ final ObjectMetadata metadataResult =
s3.getObjectMetadata(sourceMetadataRequest);
+
+ if (metadataResult == null) {
+ session.transfer(flowFile, REL_FAILURE);
+ }
Review Comment:
I recommend removing this check, since it could be handled through the the
GetObjectMetadata Processor. This introduces another request which may not be
necessary in many cases.
##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -119,28 +145,127 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
final String destinationBucket =
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String destinationKey =
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
+ final GetObjectMetadataRequest sourceMetadataRequest = new
GetObjectMetadataRequest(sourceBucket, sourceKey);
+ final ObjectMetadata metadataResult =
s3.getObjectMetadata(sourceMetadataRequest);
+
+ if (metadataResult == null) {
+ session.transfer(flowFile, REL_FAILURE);
+ }
+
+ final AtomicReference<String> multipartIdRef = new AtomicReference<>();
+ final long contentLength = metadataResult.getContentLength();
+ final boolean isMultiPart = metadataResult.getContentLength() >
MULTIPART_THRESHOLD;
+
try {
- final CopyObjectRequest request = new
CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
final AccessControlList acl = createACL(context, flowFile);
- if (acl != null) {
- request.setAccessControlList(acl);
- }
-
final CannedAccessControlList cannedAccessControlList =
createCannedACL(context, flowFile);
- if (cannedAccessControlList != null) {
- request.setCannedAccessControlList(cannedAccessControlList);
- }
- s3.copyObject(request);
+ if (!isMultiPart) {
+ smallFileCopy(s3, acl, cannedAccessControlList, sourceBucket,
sourceKey, destinationBucket, destinationKey);
+ } else {
+ final int retryLimit =
context.getProperty(MULTIPART_RETRIES).asInteger();
+ largeFileCopy(s3, acl, cannedAccessControlList, sourceBucket,
sourceKey, destinationBucket,
+ destinationKey, multipartIdRef, contentLength,
retryLimit);
+ }
session.getProvenanceReporter().send(flowFile,
getTransitUrl(destinationBucket, destinationKey));
session.transfer(flowFile, REL_SUCCESS);
- } catch (final IllegalArgumentException | AmazonClientException e) {
+ } catch (final ProcessException | IllegalArgumentException |
AmazonClientException e) {
+ if (isMultiPart && !StringUtils.isEmpty(multipartIdRef.get())) {
+ try {
+ AbortMultipartUploadRequest abortRequest = new
AbortMultipartUploadRequest(destinationBucket, destinationKey,
multipartIdRef.get());
+ s3.abortMultipartUpload(abortRequest);
+ } catch (AmazonS3Exception ignored) {
+ getLogger().error("Failed to cleanup the partial upload to
bucket {} and key {}", destinationBucket, destinationKey);
+ getLogger().error("Abort exception", ignored);
+ }
+ }
+
flowFile = extractExceptionDetails(e, session, flowFile);
getLogger().error("Failed to copy S3 object from Bucket [{}] Key
[{}]", sourceBucket, sourceKey, e);
session.transfer(flowFile, REL_FAILURE);
}
}
+ /*
+ * Sections of this code were derived from example code from the official
AWS S3 documentation. Specifically this example:
+ *
https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/java/example_code/s3/src/main/java/aws/example/s3/LowLevelMultipartCopy.java
+ */
+ private void largeFileCopy(final AmazonS3Client s3, final
AccessControlList acl, final CannedAccessControlList cannedAccessControlList,
+ final String sourceBucket, final String
sourceKey,
+ final String destinationBucket, final String
destinationKey, final AtomicReference<String> multipartIdRef,
+ final long contentLength, final int retryLimit)
{
+ InitiateMultipartUploadRequest initRequest = new
InitiateMultipartUploadRequest(destinationBucket,
+ destinationKey);
+ if (acl != null) {
+ initRequest.setAccessControlList(acl);
+ }
+ if (cannedAccessControlList != null) {
+ initRequest.setCannedACL(cannedAccessControlList);
+ }
+
+ InitiateMultipartUploadResult initResult =
s3.initiateMultipartUpload(initRequest);
+
+ multipartIdRef.set(initResult.getUploadId());
+
+ long bytePosition = 0;
+ int partNumber = 1;
+ List<CopyPartResult> responses = new ArrayList<>();
Review Comment:
```suggestion
final List<CopyPartResult> copyPartResults = new ArrayList<>();
```
##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -119,28 +145,127 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
final String destinationBucket =
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String destinationKey =
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
+ final GetObjectMetadataRequest sourceMetadataRequest = new
GetObjectMetadataRequest(sourceBucket, sourceKey);
+ final ObjectMetadata metadataResult =
s3.getObjectMetadata(sourceMetadataRequest);
+
+ if (metadataResult == null) {
+ session.transfer(flowFile, REL_FAILURE);
+ }
+
+ final AtomicReference<String> multipartIdRef = new AtomicReference<>();
+ final long contentLength = metadataResult.getContentLength();
+ final boolean isMultiPart = metadataResult.getContentLength() >
MULTIPART_THRESHOLD;
+
try {
- final CopyObjectRequest request = new
CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
final AccessControlList acl = createACL(context, flowFile);
- if (acl != null) {
- request.setAccessControlList(acl);
- }
-
final CannedAccessControlList cannedAccessControlList =
createCannedACL(context, flowFile);
- if (cannedAccessControlList != null) {
- request.setCannedAccessControlList(cannedAccessControlList);
- }
- s3.copyObject(request);
+ if (!isMultiPart) {
+ smallFileCopy(s3, acl, cannedAccessControlList, sourceBucket,
sourceKey, destinationBucket, destinationKey);
+ } else {
+ final int retryLimit =
context.getProperty(MULTIPART_RETRIES).asInteger();
+ largeFileCopy(s3, acl, cannedAccessControlList, sourceBucket,
sourceKey, destinationBucket,
+ destinationKey, multipartIdRef, contentLength,
retryLimit);
+ }
session.getProvenanceReporter().send(flowFile,
getTransitUrl(destinationBucket, destinationKey));
session.transfer(flowFile, REL_SUCCESS);
- } catch (final IllegalArgumentException | AmazonClientException e) {
+ } catch (final ProcessException | IllegalArgumentException |
AmazonClientException e) {
+ if (isMultiPart && !StringUtils.isEmpty(multipartIdRef.get())) {
+ try {
+ AbortMultipartUploadRequest abortRequest = new
AbortMultipartUploadRequest(destinationBucket, destinationKey,
multipartIdRef.get());
Review Comment:
```suggestion
final AbortMultipartUploadRequest abortRequest = new
AbortMultipartUploadRequest(destinationBucket, destinationKey,
multipartIdRef.get());
```
##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -119,28 +145,127 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
final String destinationBucket =
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String destinationKey =
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
+ final GetObjectMetadataRequest sourceMetadataRequest = new
GetObjectMetadataRequest(sourceBucket, sourceKey);
+ final ObjectMetadata metadataResult =
s3.getObjectMetadata(sourceMetadataRequest);
+
+ if (metadataResult == null) {
+ session.transfer(flowFile, REL_FAILURE);
+ }
+
+ final AtomicReference<String> multipartIdRef = new AtomicReference<>();
+ final long contentLength = metadataResult.getContentLength();
+ final boolean isMultiPart = metadataResult.getContentLength() >
MULTIPART_THRESHOLD;
+
try {
- final CopyObjectRequest request = new
CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
final AccessControlList acl = createACL(context, flowFile);
- if (acl != null) {
- request.setAccessControlList(acl);
- }
-
final CannedAccessControlList cannedAccessControlList =
createCannedACL(context, flowFile);
- if (cannedAccessControlList != null) {
- request.setCannedAccessControlList(cannedAccessControlList);
- }
- s3.copyObject(request);
+ if (!isMultiPart) {
+ smallFileCopy(s3, acl, cannedAccessControlList, sourceBucket,
sourceKey, destinationBucket, destinationKey);
+ } else {
+ final int retryLimit =
context.getProperty(MULTIPART_RETRIES).asInteger();
+ largeFileCopy(s3, acl, cannedAccessControlList, sourceBucket,
sourceKey, destinationBucket,
+ destinationKey, multipartIdRef, contentLength,
retryLimit);
+ }
session.getProvenanceReporter().send(flowFile,
getTransitUrl(destinationBucket, destinationKey));
session.transfer(flowFile, REL_SUCCESS);
- } catch (final IllegalArgumentException | AmazonClientException e) {
+ } catch (final ProcessException | IllegalArgumentException |
AmazonClientException e) {
+ if (isMultiPart && !StringUtils.isEmpty(multipartIdRef.get())) {
+ try {
+ AbortMultipartUploadRequest abortRequest = new
AbortMultipartUploadRequest(destinationBucket, destinationKey,
multipartIdRef.get());
+ s3.abortMultipartUpload(abortRequest);
+ } catch (AmazonS3Exception ignored) {
+ getLogger().error("Failed to cleanup the partial upload to
bucket {} and key {}", destinationBucket, destinationKey);
+ getLogger().error("Abort exception", ignored);
+ }
+ }
+
flowFile = extractExceptionDetails(e, session, flowFile);
getLogger().error("Failed to copy S3 object from Bucket [{}] Key
[{}]", sourceBucket, sourceKey, e);
session.transfer(flowFile, REL_FAILURE);
}
}
+ /*
+ * Sections of this code were derived from example code from the official
AWS S3 documentation. Specifically this example:
+ *
https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/java/example_code/s3/src/main/java/aws/example/s3/LowLevelMultipartCopy.java
+ */
+ private void largeFileCopy(final AmazonS3Client s3, final
AccessControlList acl, final CannedAccessControlList cannedAccessControlList,
+ final String sourceBucket, final String
sourceKey,
+ final String destinationBucket, final String
destinationKey, final AtomicReference<String> multipartIdRef,
+ final long contentLength, final int retryLimit)
{
+ InitiateMultipartUploadRequest initRequest = new
InitiateMultipartUploadRequest(destinationBucket,
+ destinationKey);
+ if (acl != null) {
+ initRequest.setAccessControlList(acl);
+ }
+ if (cannedAccessControlList != null) {
+ initRequest.setCannedACL(cannedAccessControlList);
+ }
+
+ InitiateMultipartUploadResult initResult =
s3.initiateMultipartUpload(initRequest);
+
+ multipartIdRef.set(initResult.getUploadId());
+
+ long bytePosition = 0;
+ int partNumber = 1;
+ List<CopyPartResult> responses = new ArrayList<>();
+ while (bytePosition < contentLength) {
+ long lastByte = Math.min(bytePosition + MULTIPART_THRESHOLD - 1,
contentLength - 1);
+
+ CopyPartRequest copyRequest = new CopyPartRequest()
+ .withSourceBucketName(sourceBucket)
+ .withSourceKey(sourceKey)
+ .withDestinationBucketName(destinationBucket)
+ .withDestinationKey(destinationKey)
+ .withUploadId(initResult.getUploadId())
+ .withFirstByte(bytePosition)
+ .withLastByte(lastByte)
+ .withPartNumber(partNumber++);
+ boolean partIsDone = false;
+ int retryIndex = 0;
+
+ while (!partIsDone) {
+ try {
+ responses.add(s3.copyPart(copyRequest));
+ partIsDone = true;
+ } catch (AmazonS3Exception e) {
+ if (e.getStatusCode() == 503 && retryLimit > 0 &&
retryIndex < retryLimit) {
+ retryIndex++;
+ } else {
+ throw e;
+ }
+ }
+ }
+ bytePosition += MULTIPART_THRESHOLD;
+ }
+
+ CompleteMultipartUploadRequest completeRequest = new
CompleteMultipartUploadRequest(
Review Comment:
```suggestion
final CompleteMultipartUploadRequest completeRequest = new
CompleteMultipartUploadRequest(
```
##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -119,28 +145,127 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
final String destinationBucket =
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String destinationKey =
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
+ final GetObjectMetadataRequest sourceMetadataRequest = new
GetObjectMetadataRequest(sourceBucket, sourceKey);
+ final ObjectMetadata metadataResult =
s3.getObjectMetadata(sourceMetadataRequest);
+
+ if (metadataResult == null) {
+ session.transfer(flowFile, REL_FAILURE);
+ }
+
+ final AtomicReference<String> multipartIdRef = new AtomicReference<>();
+ final long contentLength = metadataResult.getContentLength();
+ final boolean isMultiPart = metadataResult.getContentLength() >
MULTIPART_THRESHOLD;
+
try {
- final CopyObjectRequest request = new
CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
final AccessControlList acl = createACL(context, flowFile);
- if (acl != null) {
- request.setAccessControlList(acl);
- }
-
final CannedAccessControlList cannedAccessControlList =
createCannedACL(context, flowFile);
- if (cannedAccessControlList != null) {
- request.setCannedAccessControlList(cannedAccessControlList);
- }
- s3.copyObject(request);
+ if (!isMultiPart) {
+ smallFileCopy(s3, acl, cannedAccessControlList, sourceBucket,
sourceKey, destinationBucket, destinationKey);
+ } else {
+ final int retryLimit =
context.getProperty(MULTIPART_RETRIES).asInteger();
+ largeFileCopy(s3, acl, cannedAccessControlList, sourceBucket,
sourceKey, destinationBucket,
+ destinationKey, multipartIdRef, contentLength,
retryLimit);
+ }
session.getProvenanceReporter().send(flowFile,
getTransitUrl(destinationBucket, destinationKey));
session.transfer(flowFile, REL_SUCCESS);
- } catch (final IllegalArgumentException | AmazonClientException e) {
+ } catch (final ProcessException | IllegalArgumentException |
AmazonClientException e) {
+ if (isMultiPart && !StringUtils.isEmpty(multipartIdRef.get())) {
+ try {
+ AbortMultipartUploadRequest abortRequest = new
AbortMultipartUploadRequest(destinationBucket, destinationKey,
multipartIdRef.get());
+ s3.abortMultipartUpload(abortRequest);
+ } catch (AmazonS3Exception ignored) {
+ getLogger().error("Failed to cleanup the partial upload to
bucket {} and key {}", destinationBucket, destinationKey);
+ getLogger().error("Abort exception", ignored);
+ }
+ }
+
flowFile = extractExceptionDetails(e, session, flowFile);
getLogger().error("Failed to copy S3 object from Bucket [{}] Key
[{}]", sourceBucket, sourceKey, e);
session.transfer(flowFile, REL_FAILURE);
}
}
+ /*
+ * Sections of this code were derived from example code from the official
AWS S3 documentation. Specifically this example:
+ *
https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/java/example_code/s3/src/main/java/aws/example/s3/LowLevelMultipartCopy.java
+ */
+ private void largeFileCopy(final AmazonS3Client s3, final
AccessControlList acl, final CannedAccessControlList cannedAccessControlList,
+ final String sourceBucket, final String
sourceKey,
+ final String destinationBucket, final String
destinationKey, final AtomicReference<String> multipartIdRef,
+ final long contentLength, final int retryLimit)
{
+ InitiateMultipartUploadRequest initRequest = new
InitiateMultipartUploadRequest(destinationBucket,
+ destinationKey);
+ if (acl != null) {
+ initRequest.setAccessControlList(acl);
+ }
+ if (cannedAccessControlList != null) {
+ initRequest.setCannedACL(cannedAccessControlList);
+ }
+
+ InitiateMultipartUploadResult initResult =
s3.initiateMultipartUpload(initRequest);
+
+ multipartIdRef.set(initResult.getUploadId());
+
+ long bytePosition = 0;
+ int partNumber = 1;
+ List<CopyPartResult> responses = new ArrayList<>();
+ while (bytePosition < contentLength) {
+ long lastByte = Math.min(bytePosition + MULTIPART_THRESHOLD - 1,
contentLength - 1);
+
+ CopyPartRequest copyRequest = new CopyPartRequest()
+ .withSourceBucketName(sourceBucket)
+ .withSourceKey(sourceKey)
+ .withDestinationBucketName(destinationBucket)
+ .withDestinationKey(destinationKey)
+ .withUploadId(initResult.getUploadId())
+ .withFirstByte(bytePosition)
+ .withLastByte(lastByte)
+ .withPartNumber(partNumber++);
+ boolean partIsDone = false;
+ int retryIndex = 0;
+
+ while (!partIsDone) {
+ try {
+ responses.add(s3.copyPart(copyRequest));
+ partIsDone = true;
+ } catch (AmazonS3Exception e) {
+ if (e.getStatusCode() == 503 && retryLimit > 0 &&
retryIndex < retryLimit) {
+ retryIndex++;
+ } else {
+ throw e;
+ }
+ }
+ }
+ bytePosition += MULTIPART_THRESHOLD;
+ }
+
+ CompleteMultipartUploadRequest completeRequest = new
CompleteMultipartUploadRequest(
+ destinationBucket,
+ destinationKey,
+ initResult.getUploadId(),
+ responses.stream().map(response -> new
PartETag(response.getPartNumber(), response.getETag()))
+ .collect(Collectors.toList()));
+ s3.completeMultipartUpload(completeRequest);
+ }
+
+ private void smallFileCopy(final AmazonS3Client s3, final
AccessControlList acl,
Review Comment:
```suggestion
private void copyObject(final AmazonS3Client s3, final AccessControlList
acl,
```
##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -119,28 +145,127 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
final String destinationBucket =
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String destinationKey =
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
+ final GetObjectMetadataRequest sourceMetadataRequest = new
GetObjectMetadataRequest(sourceBucket, sourceKey);
+ final ObjectMetadata metadataResult =
s3.getObjectMetadata(sourceMetadataRequest);
+
+ if (metadataResult == null) {
+ session.transfer(flowFile, REL_FAILURE);
+ }
+
+ final AtomicReference<String> multipartIdRef = new AtomicReference<>();
+ final long contentLength = metadataResult.getContentLength();
+ final boolean isMultiPart = metadataResult.getContentLength() >
MULTIPART_THRESHOLD;
+
try {
- final CopyObjectRequest request = new
CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
final AccessControlList acl = createACL(context, flowFile);
- if (acl != null) {
- request.setAccessControlList(acl);
- }
-
final CannedAccessControlList cannedAccessControlList =
createCannedACL(context, flowFile);
- if (cannedAccessControlList != null) {
- request.setCannedAccessControlList(cannedAccessControlList);
- }
- s3.copyObject(request);
+ if (!isMultiPart) {
+ smallFileCopy(s3, acl, cannedAccessControlList, sourceBucket,
sourceKey, destinationBucket, destinationKey);
+ } else {
+ final int retryLimit =
context.getProperty(MULTIPART_RETRIES).asInteger();
+ largeFileCopy(s3, acl, cannedAccessControlList, sourceBucket,
sourceKey, destinationBucket,
+ destinationKey, multipartIdRef, contentLength,
retryLimit);
+ }
session.getProvenanceReporter().send(flowFile,
getTransitUrl(destinationBucket, destinationKey));
session.transfer(flowFile, REL_SUCCESS);
- } catch (final IllegalArgumentException | AmazonClientException e) {
+ } catch (final ProcessException | IllegalArgumentException |
AmazonClientException e) {
+ if (isMultiPart && !StringUtils.isEmpty(multipartIdRef.get())) {
+ try {
+ AbortMultipartUploadRequest abortRequest = new
AbortMultipartUploadRequest(destinationBucket, destinationKey,
multipartIdRef.get());
+ s3.abortMultipartUpload(abortRequest);
+ } catch (AmazonS3Exception ignored) {
+ getLogger().error("Failed to cleanup the partial upload to
bucket {} and key {}", destinationBucket, destinationKey);
+ getLogger().error("Abort exception", ignored);
+ }
+ }
+
flowFile = extractExceptionDetails(e, session, flowFile);
getLogger().error("Failed to copy S3 object from Bucket [{}] Key
[{}]", sourceBucket, sourceKey, e);
session.transfer(flowFile, REL_FAILURE);
}
}
+ /*
+ * Sections of this code were derived from example code from the official
AWS S3 documentation. Specifically this example:
+ *
https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/java/example_code/s3/src/main/java/aws/example/s3/LowLevelMultipartCopy.java
+ */
+ private void largeFileCopy(final AmazonS3Client s3, final
AccessControlList acl, final CannedAccessControlList cannedAccessControlList,
+ final String sourceBucket, final String
sourceKey,
+ final String destinationBucket, final String
destinationKey, final AtomicReference<String> multipartIdRef,
+ final long contentLength, final int retryLimit)
{
+ InitiateMultipartUploadRequest initRequest = new
InitiateMultipartUploadRequest(destinationBucket,
+ destinationKey);
+ if (acl != null) {
+ initRequest.setAccessControlList(acl);
+ }
+ if (cannedAccessControlList != null) {
+ initRequest.setCannedACL(cannedAccessControlList);
+ }
+
+ InitiateMultipartUploadResult initResult =
s3.initiateMultipartUpload(initRequest);
+
+ multipartIdRef.set(initResult.getUploadId());
+
+ long bytePosition = 0;
+ int partNumber = 1;
+ List<CopyPartResult> responses = new ArrayList<>();
+ while (bytePosition < contentLength) {
+ long lastByte = Math.min(bytePosition + MULTIPART_THRESHOLD - 1,
contentLength - 1);
+
+ CopyPartRequest copyRequest = new CopyPartRequest()
+ .withSourceBucketName(sourceBucket)
+ .withSourceKey(sourceKey)
+ .withDestinationBucketName(destinationBucket)
+ .withDestinationKey(destinationKey)
+ .withUploadId(initResult.getUploadId())
+ .withFirstByte(bytePosition)
+ .withLastByte(lastByte)
+ .withPartNumber(partNumber++);
+ boolean partIsDone = false;
Review Comment:
```suggestion
boolean partRequestCompleted = false;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]