exceptionfactory commented on code in PR #9286:
URL: https://github.com/apache/nifi/pull/9286#discussion_r1779607458


##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -119,28 +145,127 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
         final String destinationBucket = 
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
         final String destinationKey = 
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
 
+        final GetObjectMetadataRequest sourceMetadataRequest = new 
GetObjectMetadataRequest(sourceBucket, sourceKey);
+        final ObjectMetadata metadataResult = 
s3.getObjectMetadata(sourceMetadataRequest);
+
+        if (metadataResult == null) {
+            session.transfer(flowFile, REL_FAILURE);
+        }
+
+        final AtomicReference<String> multipartIdRef = new AtomicReference<>();
+        final long contentLength = metadataResult.getContentLength();
+        final boolean isMultiPart = metadataResult.getContentLength() > 
MULTIPART_THRESHOLD;

Review Comment:
   Recommend renaming this:
   ```suggestion
           final boolean multipartUploadRequired = 
metadataResult.getContentLength() > MULTIPART_THRESHOLD;
   ```



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -119,28 +145,127 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
         final String destinationBucket = 
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
         final String destinationKey = 
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
 
+        final GetObjectMetadataRequest sourceMetadataRequest = new 
GetObjectMetadataRequest(sourceBucket, sourceKey);
+        final ObjectMetadata metadataResult = 
s3.getObjectMetadata(sourceMetadataRequest);
+
+        if (metadataResult == null) {
+            session.transfer(flowFile, REL_FAILURE);
+        }
+
+        final AtomicReference<String> multipartIdRef = new AtomicReference<>();
+        final long contentLength = metadataResult.getContentLength();
+        final boolean isMultiPart = metadataResult.getContentLength() > 
MULTIPART_THRESHOLD;
+
         try {
-            final CopyObjectRequest request = new 
CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
             final AccessControlList acl = createACL(context, flowFile);
-            if (acl != null) {
-                request.setAccessControlList(acl);
-            }
-
             final CannedAccessControlList cannedAccessControlList = 
createCannedACL(context, flowFile);
-            if (cannedAccessControlList != null) {
-                request.setCannedAccessControlList(cannedAccessControlList);
-            }
 
-            s3.copyObject(request);
+            if (!isMultiPart) {
+                smallFileCopy(s3, acl, cannedAccessControlList, sourceBucket, 
sourceKey, destinationBucket, destinationKey);
+            } else {
+                final int retryLimit = 
context.getProperty(MULTIPART_RETRIES).asInteger();
+                largeFileCopy(s3, acl, cannedAccessControlList, sourceBucket, 
sourceKey, destinationBucket,
+                        destinationKey, multipartIdRef, contentLength, 
retryLimit);
+            }
             session.getProvenanceReporter().send(flowFile, 
getTransitUrl(destinationBucket, destinationKey));
             session.transfer(flowFile, REL_SUCCESS);
-        } catch (final IllegalArgumentException | AmazonClientException e) {
+        } catch (final ProcessException | IllegalArgumentException | 
AmazonClientException e) {
+            if (isMultiPart && !StringUtils.isEmpty(multipartIdRef.get())) {
+                try {
+                    AbortMultipartUploadRequest abortRequest = new 
AbortMultipartUploadRequest(destinationBucket, destinationKey, 
multipartIdRef.get());
+                    s3.abortMultipartUpload(abortRequest);
+                } catch (AmazonS3Exception ignored) {
+                    getLogger().error("Failed to cleanup the partial upload to 
bucket {} and key {}", destinationBucket, destinationKey);
+                    getLogger().error("Abort exception", ignored);
+                }
+            }
+
             flowFile = extractExceptionDetails(e, session, flowFile);
             getLogger().error("Failed to copy S3 object from Bucket [{}] Key 
[{}]", sourceBucket, sourceKey, e);
             session.transfer(flowFile, REL_FAILURE);
         }
     }
 
+    /*
+     * Sections of this code were derived from example code from the official 
AWS S3 documentation. Specifically this example:
+     * 
https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/java/example_code/s3/src/main/java/aws/example/s3/LowLevelMultipartCopy.java
+     */
+    private void largeFileCopy(final AmazonS3Client s3, final 
AccessControlList acl, final CannedAccessControlList cannedAccessControlList,

Review Comment:
   ```suggestion
       private void copyMultipart(final AmazonS3Client s3, final 
AccessControlList acl, final CannedAccessControlList cannedAccessControlList,
   ```



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -119,28 +145,127 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
         final String destinationBucket = 
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
         final String destinationKey = 
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
 
+        final GetObjectMetadataRequest sourceMetadataRequest = new 
GetObjectMetadataRequest(sourceBucket, sourceKey);
+        final ObjectMetadata metadataResult = 
s3.getObjectMetadata(sourceMetadataRequest);
+
+        if (metadataResult == null) {
+            session.transfer(flowFile, REL_FAILURE);
+        }
+
+        final AtomicReference<String> multipartIdRef = new AtomicReference<>();
+        final long contentLength = metadataResult.getContentLength();
+        final boolean isMultiPart = metadataResult.getContentLength() > 
MULTIPART_THRESHOLD;
+
         try {
-            final CopyObjectRequest request = new 
CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
             final AccessControlList acl = createACL(context, flowFile);
-            if (acl != null) {
-                request.setAccessControlList(acl);
-            }
-
             final CannedAccessControlList cannedAccessControlList = 
createCannedACL(context, flowFile);
-            if (cannedAccessControlList != null) {
-                request.setCannedAccessControlList(cannedAccessControlList);
-            }
 
-            s3.copyObject(request);
+            if (!isMultiPart) {

Review Comment:
   I recommend reversing the conditional so that is reads more simply as:
   ```
   if (multipartUploadRequired) {
       copyMultipart();
   } else {
       copy();
   }
   ```



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -119,28 +145,127 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
         final String destinationBucket = 
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
         final String destinationKey = 
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
 
+        final GetObjectMetadataRequest sourceMetadataRequest = new 
GetObjectMetadataRequest(sourceBucket, sourceKey);
+        final ObjectMetadata metadataResult = 
s3.getObjectMetadata(sourceMetadataRequest);
+
+        if (metadataResult == null) {
+            session.transfer(flowFile, REL_FAILURE);
+        }
+
+        final AtomicReference<String> multipartIdRef = new AtomicReference<>();
+        final long contentLength = metadataResult.getContentLength();
+        final boolean isMultiPart = metadataResult.getContentLength() > 
MULTIPART_THRESHOLD;
+
         try {
-            final CopyObjectRequest request = new 
CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
             final AccessControlList acl = createACL(context, flowFile);
-            if (acl != null) {
-                request.setAccessControlList(acl);
-            }
-
             final CannedAccessControlList cannedAccessControlList = 
createCannedACL(context, flowFile);
-            if (cannedAccessControlList != null) {
-                request.setCannedAccessControlList(cannedAccessControlList);
-            }
 
-            s3.copyObject(request);
+            if (!isMultiPart) {
+                smallFileCopy(s3, acl, cannedAccessControlList, sourceBucket, 
sourceKey, destinationBucket, destinationKey);
+            } else {
+                final int retryLimit = 
context.getProperty(MULTIPART_RETRIES).asInteger();
+                largeFileCopy(s3, acl, cannedAccessControlList, sourceBucket, 
sourceKey, destinationBucket,
+                        destinationKey, multipartIdRef, contentLength, 
retryLimit);
+            }
             session.getProvenanceReporter().send(flowFile, 
getTransitUrl(destinationBucket, destinationKey));
             session.transfer(flowFile, REL_SUCCESS);
-        } catch (final IllegalArgumentException | AmazonClientException e) {
+        } catch (final ProcessException | IllegalArgumentException | 
AmazonClientException e) {

Review Comment:
   Recommend making this a general `Exception` catch.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -119,28 +145,127 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
         final String destinationBucket = 
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
         final String destinationKey = 
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
 
+        final GetObjectMetadataRequest sourceMetadataRequest = new 
GetObjectMetadataRequest(sourceBucket, sourceKey);
+        final ObjectMetadata metadataResult = 
s3.getObjectMetadata(sourceMetadataRequest);
+
+        if (metadataResult == null) {
+            session.transfer(flowFile, REL_FAILURE);
+        }
+
+        final AtomicReference<String> multipartIdRef = new AtomicReference<>();
+        final long contentLength = metadataResult.getContentLength();
+        final boolean isMultiPart = metadataResult.getContentLength() > 
MULTIPART_THRESHOLD;
+
         try {
-            final CopyObjectRequest request = new 
CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
             final AccessControlList acl = createACL(context, flowFile);
-            if (acl != null) {
-                request.setAccessControlList(acl);
-            }
-
             final CannedAccessControlList cannedAccessControlList = 
createCannedACL(context, flowFile);
-            if (cannedAccessControlList != null) {
-                request.setCannedAccessControlList(cannedAccessControlList);
-            }
 
-            s3.copyObject(request);
+            if (!isMultiPart) {
+                smallFileCopy(s3, acl, cannedAccessControlList, sourceBucket, 
sourceKey, destinationBucket, destinationKey);
+            } else {
+                final int retryLimit = 
context.getProperty(MULTIPART_RETRIES).asInteger();
+                largeFileCopy(s3, acl, cannedAccessControlList, sourceBucket, 
sourceKey, destinationBucket,
+                        destinationKey, multipartIdRef, contentLength, 
retryLimit);
+            }
             session.getProvenanceReporter().send(flowFile, 
getTransitUrl(destinationBucket, destinationKey));
             session.transfer(flowFile, REL_SUCCESS);
-        } catch (final IllegalArgumentException | AmazonClientException e) {
+        } catch (final ProcessException | IllegalArgumentException | 
AmazonClientException e) {
+            if (isMultiPart && !StringUtils.isEmpty(multipartIdRef.get())) {
+                try {
+                    AbortMultipartUploadRequest abortRequest = new 
AbortMultipartUploadRequest(destinationBucket, destinationKey, 
multipartIdRef.get());
+                    s3.abortMultipartUpload(abortRequest);
+                } catch (AmazonS3Exception ignored) {
+                    getLogger().error("Failed to cleanup the partial upload to 
bucket {} and key {}", destinationBucket, destinationKey);
+                    getLogger().error("Abort exception", ignored);
+                }
+            }
+
             flowFile = extractExceptionDetails(e, session, flowFile);
             getLogger().error("Failed to copy S3 object from Bucket [{}] Key 
[{}]", sourceBucket, sourceKey, e);
             session.transfer(flowFile, REL_FAILURE);
         }
     }
 
+    /*
+     * Sections of this code were derived from example code from the official 
AWS S3 documentation. Specifically this example:
+     * 
https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/java/example_code/s3/src/main/java/aws/example/s3/LowLevelMultipartCopy.java
+     */
+    private void largeFileCopy(final AmazonS3Client s3, final 
AccessControlList acl, final CannedAccessControlList cannedAccessControlList,

Review Comment:
   Recommend separating these arguments one per line for improved readability.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -119,28 +145,127 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
         final String destinationBucket = 
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
         final String destinationKey = 
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
 
+        final GetObjectMetadataRequest sourceMetadataRequest = new 
GetObjectMetadataRequest(sourceBucket, sourceKey);
+        final ObjectMetadata metadataResult = 
s3.getObjectMetadata(sourceMetadataRequest);
+
+        if (metadataResult == null) {
+            session.transfer(flowFile, REL_FAILURE);
+        }
+
+        final AtomicReference<String> multipartIdRef = new AtomicReference<>();
+        final long contentLength = metadataResult.getContentLength();
+        final boolean isMultiPart = metadataResult.getContentLength() > 
MULTIPART_THRESHOLD;
+
         try {
-            final CopyObjectRequest request = new 
CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
             final AccessControlList acl = createACL(context, flowFile);
-            if (acl != null) {
-                request.setAccessControlList(acl);
-            }
-
             final CannedAccessControlList cannedAccessControlList = 
createCannedACL(context, flowFile);
-            if (cannedAccessControlList != null) {
-                request.setCannedAccessControlList(cannedAccessControlList);
-            }
 
-            s3.copyObject(request);
+            if (!isMultiPart) {
+                smallFileCopy(s3, acl, cannedAccessControlList, sourceBucket, 
sourceKey, destinationBucket, destinationKey);
+            } else {
+                final int retryLimit = 
context.getProperty(MULTIPART_RETRIES).asInteger();
+                largeFileCopy(s3, acl, cannedAccessControlList, sourceBucket, 
sourceKey, destinationBucket,
+                        destinationKey, multipartIdRef, contentLength, 
retryLimit);
+            }
             session.getProvenanceReporter().send(flowFile, 
getTransitUrl(destinationBucket, destinationKey));
             session.transfer(flowFile, REL_SUCCESS);
-        } catch (final IllegalArgumentException | AmazonClientException e) {
+        } catch (final ProcessException | IllegalArgumentException | 
AmazonClientException e) {
+            if (isMultiPart && !StringUtils.isEmpty(multipartIdRef.get())) {
+                try {
+                    AbortMultipartUploadRequest abortRequest = new 
AbortMultipartUploadRequest(destinationBucket, destinationKey, 
multipartIdRef.get());
+                    s3.abortMultipartUpload(abortRequest);
+                } catch (AmazonS3Exception ignored) {
+                    getLogger().error("Failed to cleanup the partial upload to 
bucket {} and key {}", destinationBucket, destinationKey);
+                    getLogger().error("Abort exception", ignored);
+                }
+            }
+
             flowFile = extractExceptionDetails(e, session, flowFile);
             getLogger().error("Failed to copy S3 object from Bucket [{}] Key 
[{}]", sourceBucket, sourceKey, e);
             session.transfer(flowFile, REL_FAILURE);
         }
     }
 
+    /*
+     * Sections of this code were derived from example code from the official 
AWS S3 documentation. Specifically this example:
+     * 
https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/java/example_code/s3/src/main/java/aws/example/s3/LowLevelMultipartCopy.java
+     */
+    private void largeFileCopy(final AmazonS3Client s3, final 
AccessControlList acl, final CannedAccessControlList cannedAccessControlList,
+                               final String sourceBucket, final String 
sourceKey,
+                               final String destinationBucket, final String 
destinationKey, final AtomicReference<String> multipartIdRef,
+                               final long contentLength, final int retryLimit) 
{
+        InitiateMultipartUploadRequest initRequest = new 
InitiateMultipartUploadRequest(destinationBucket,
+                destinationKey);
+        if (acl != null) {
+            initRequest.setAccessControlList(acl);
+        }
+        if (cannedAccessControlList != null) {
+            initRequest.setCannedACL(cannedAccessControlList);
+        }
+
+        InitiateMultipartUploadResult initResult = 
s3.initiateMultipartUpload(initRequest);
+
+        multipartIdRef.set(initResult.getUploadId());
+
+        long bytePosition = 0;
+        int partNumber = 1;
+        List<CopyPartResult> responses = new ArrayList<>();
+        while (bytePosition < contentLength) {
+            long lastByte = Math.min(bytePosition + MULTIPART_THRESHOLD - 1, 
contentLength - 1);
+
+            CopyPartRequest copyRequest = new CopyPartRequest()

Review Comment:
   ```suggestion
               final CopyPartRequest copyPartRequest = new CopyPartRequest()
   ```



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -119,28 +145,127 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
         final String destinationBucket = 
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
         final String destinationKey = 
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
 
+        final GetObjectMetadataRequest sourceMetadataRequest = new 
GetObjectMetadataRequest(sourceBucket, sourceKey);
+        final ObjectMetadata metadataResult = 
s3.getObjectMetadata(sourceMetadataRequest);
+
+        if (metadataResult == null) {
+            session.transfer(flowFile, REL_FAILURE);
+        }
+
+        final AtomicReference<String> multipartIdRef = new AtomicReference<>();
+        final long contentLength = metadataResult.getContentLength();
+        final boolean isMultiPart = metadataResult.getContentLength() > 
MULTIPART_THRESHOLD;
+
         try {
-            final CopyObjectRequest request = new 
CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
             final AccessControlList acl = createACL(context, flowFile);
-            if (acl != null) {
-                request.setAccessControlList(acl);
-            }
-
             final CannedAccessControlList cannedAccessControlList = 
createCannedACL(context, flowFile);
-            if (cannedAccessControlList != null) {
-                request.setCannedAccessControlList(cannedAccessControlList);
-            }
 
-            s3.copyObject(request);
+            if (!isMultiPart) {
+                smallFileCopy(s3, acl, cannedAccessControlList, sourceBucket, 
sourceKey, destinationBucket, destinationKey);
+            } else {
+                final int retryLimit = 
context.getProperty(MULTIPART_RETRIES).asInteger();
+                largeFileCopy(s3, acl, cannedAccessControlList, sourceBucket, 
sourceKey, destinationBucket,
+                        destinationKey, multipartIdRef, contentLength, 
retryLimit);
+            }
             session.getProvenanceReporter().send(flowFile, 
getTransitUrl(destinationBucket, destinationKey));
             session.transfer(flowFile, REL_SUCCESS);
-        } catch (final IllegalArgumentException | AmazonClientException e) {
+        } catch (final ProcessException | IllegalArgumentException | 
AmazonClientException e) {
+            if (isMultiPart && !StringUtils.isEmpty(multipartIdRef.get())) {
+                try {
+                    AbortMultipartUploadRequest abortRequest = new 
AbortMultipartUploadRequest(destinationBucket, destinationKey, 
multipartIdRef.get());
+                    s3.abortMultipartUpload(abortRequest);
+                } catch (AmazonS3Exception ignored) {
+                    getLogger().error("Failed to cleanup the partial upload to 
bucket {} and key {}", destinationBucket, destinationKey);
+                    getLogger().error("Abort exception", ignored);
+                }
+            }
+
             flowFile = extractExceptionDetails(e, session, flowFile);
             getLogger().error("Failed to copy S3 object from Bucket [{}] Key 
[{}]", sourceBucket, sourceKey, e);
             session.transfer(flowFile, REL_FAILURE);
         }
     }
 
+    /*
+     * Sections of this code were derived from example code from the official 
AWS S3 documentation. Specifically this example:
+     * 
https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/java/example_code/s3/src/main/java/aws/example/s3/LowLevelMultipartCopy.java
+     */
+    private void largeFileCopy(final AmazonS3Client s3, final 
AccessControlList acl, final CannedAccessControlList cannedAccessControlList,
+                               final String sourceBucket, final String 
sourceKey,
+                               final String destinationBucket, final String 
destinationKey, final AtomicReference<String> multipartIdRef,
+                               final long contentLength, final int retryLimit) 
{
+        InitiateMultipartUploadRequest initRequest = new 
InitiateMultipartUploadRequest(destinationBucket,
+                destinationKey);
+        if (acl != null) {
+            initRequest.setAccessControlList(acl);
+        }
+        if (cannedAccessControlList != null) {
+            initRequest.setCannedACL(cannedAccessControlList);
+        }
+
+        InitiateMultipartUploadResult initResult = 
s3.initiateMultipartUpload(initRequest);

Review Comment:
   ```suggestion
           final InitiateMultipartUploadResult initResult = 
s3.initiateMultipartUpload(initRequest);
   ```



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -69,11 +85,21 @@ public class CopyS3Object extends AbstractS3Processor {
             .defaultValue("${filename}-1")
             .build();
 
+    static final PropertyDescriptor MULTIPART_RETRIES = new 
PropertyDescriptor.Builder()
+            .name("Retry Attempt Limit")
+            .description("This configures the number of retries that will be 
attempted when a part upload request " +
+                    "on files larger than 5GB encounter a 503/Slow Down 
error.")
+            .defaultValue("3")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .required(true)
+            .build();

Review Comment:
   For this initial change, I think it is better to set a hard-coded retry of 
3. Standard NiFi retry handling can support retrying the entire process, so 
having an internal retry attempt of 3 for individual part operations seems 
reasonable. This keeps the processor simpler from a user perspective.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -119,28 +145,127 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
         final String destinationBucket = 
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
         final String destinationKey = 
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
 
+        final GetObjectMetadataRequest sourceMetadataRequest = new 
GetObjectMetadataRequest(sourceBucket, sourceKey);
+        final ObjectMetadata metadataResult = 
s3.getObjectMetadata(sourceMetadataRequest);
+
+        if (metadataResult == null) {
+            session.transfer(flowFile, REL_FAILURE);
+        }
+
+        final AtomicReference<String> multipartIdRef = new AtomicReference<>();
+        final long contentLength = metadataResult.getContentLength();
+        final boolean isMultiPart = metadataResult.getContentLength() > 
MULTIPART_THRESHOLD;
+
         try {
-            final CopyObjectRequest request = new 
CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
             final AccessControlList acl = createACL(context, flowFile);
-            if (acl != null) {
-                request.setAccessControlList(acl);
-            }
-
             final CannedAccessControlList cannedAccessControlList = 
createCannedACL(context, flowFile);
-            if (cannedAccessControlList != null) {
-                request.setCannedAccessControlList(cannedAccessControlList);
-            }
 
-            s3.copyObject(request);
+            if (!isMultiPart) {
+                smallFileCopy(s3, acl, cannedAccessControlList, sourceBucket, 
sourceKey, destinationBucket, destinationKey);
+            } else {
+                final int retryLimit = 
context.getProperty(MULTIPART_RETRIES).asInteger();
+                largeFileCopy(s3, acl, cannedAccessControlList, sourceBucket, 
sourceKey, destinationBucket,
+                        destinationKey, multipartIdRef, contentLength, 
retryLimit);
+            }
             session.getProvenanceReporter().send(flowFile, 
getTransitUrl(destinationBucket, destinationKey));
             session.transfer(flowFile, REL_SUCCESS);
-        } catch (final IllegalArgumentException | AmazonClientException e) {
+        } catch (final ProcessException | IllegalArgumentException | 
AmazonClientException e) {
+            if (isMultiPart && !StringUtils.isEmpty(multipartIdRef.get())) {
+                try {
+                    AbortMultipartUploadRequest abortRequest = new 
AbortMultipartUploadRequest(destinationBucket, destinationKey, 
multipartIdRef.get());
+                    s3.abortMultipartUpload(abortRequest);
+                } catch (AmazonS3Exception ignored) {
+                    getLogger().error("Failed to cleanup the partial upload to 
bucket {} and key {}", destinationBucket, destinationKey);
+                    getLogger().error("Abort exception", ignored);

Review Comment:
   This should be consolidated, also recommend changing this to a warning:
   ```suggestion
                   } catch (final AmazonS3Exception s3e) {
                       getLogger().warn("Abort Multipart Upload failed for 
Bucket [{}] Key [{}]", destinationBucket, destinationKey, s3e);



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -119,28 +145,127 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
         final String destinationBucket = 
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
         final String destinationKey = 
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
 
+        final GetObjectMetadataRequest sourceMetadataRequest = new 
GetObjectMetadataRequest(sourceBucket, sourceKey);
+        final ObjectMetadata metadataResult = 
s3.getObjectMetadata(sourceMetadataRequest);
+
+        if (metadataResult == null) {
+            session.transfer(flowFile, REL_FAILURE);
+        }

Review Comment:
   I recommend removing this check, since it could be handled through the the 
GetObjectMetadata Processor. This introduces another request which may not be 
necessary in many cases.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -119,28 +145,127 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
         final String destinationBucket = 
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
         final String destinationKey = 
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
 
+        final GetObjectMetadataRequest sourceMetadataRequest = new 
GetObjectMetadataRequest(sourceBucket, sourceKey);
+        final ObjectMetadata metadataResult = 
s3.getObjectMetadata(sourceMetadataRequest);
+
+        if (metadataResult == null) {
+            session.transfer(flowFile, REL_FAILURE);
+        }
+
+        final AtomicReference<String> multipartIdRef = new AtomicReference<>();
+        final long contentLength = metadataResult.getContentLength();
+        final boolean isMultiPart = metadataResult.getContentLength() > 
MULTIPART_THRESHOLD;
+
         try {
-            final CopyObjectRequest request = new 
CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
             final AccessControlList acl = createACL(context, flowFile);
-            if (acl != null) {
-                request.setAccessControlList(acl);
-            }
-
             final CannedAccessControlList cannedAccessControlList = 
createCannedACL(context, flowFile);
-            if (cannedAccessControlList != null) {
-                request.setCannedAccessControlList(cannedAccessControlList);
-            }
 
-            s3.copyObject(request);
+            if (!isMultiPart) {
+                smallFileCopy(s3, acl, cannedAccessControlList, sourceBucket, 
sourceKey, destinationBucket, destinationKey);
+            } else {
+                final int retryLimit = 
context.getProperty(MULTIPART_RETRIES).asInteger();
+                largeFileCopy(s3, acl, cannedAccessControlList, sourceBucket, 
sourceKey, destinationBucket,
+                        destinationKey, multipartIdRef, contentLength, 
retryLimit);
+            }
             session.getProvenanceReporter().send(flowFile, 
getTransitUrl(destinationBucket, destinationKey));
             session.transfer(flowFile, REL_SUCCESS);
-        } catch (final IllegalArgumentException | AmazonClientException e) {
+        } catch (final ProcessException | IllegalArgumentException | 
AmazonClientException e) {
+            if (isMultiPart && !StringUtils.isEmpty(multipartIdRef.get())) {
+                try {
+                    AbortMultipartUploadRequest abortRequest = new 
AbortMultipartUploadRequest(destinationBucket, destinationKey, 
multipartIdRef.get());
+                    s3.abortMultipartUpload(abortRequest);
+                } catch (AmazonS3Exception ignored) {
+                    getLogger().error("Failed to cleanup the partial upload to 
bucket {} and key {}", destinationBucket, destinationKey);
+                    getLogger().error("Abort exception", ignored);
+                }
+            }
+
             flowFile = extractExceptionDetails(e, session, flowFile);
             getLogger().error("Failed to copy S3 object from Bucket [{}] Key 
[{}]", sourceBucket, sourceKey, e);
             session.transfer(flowFile, REL_FAILURE);
         }
     }
 
+    /*
+     * Sections of this code were derived from example code from the official 
AWS S3 documentation. Specifically this example:
+     * 
https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/java/example_code/s3/src/main/java/aws/example/s3/LowLevelMultipartCopy.java
+     */
+    private void largeFileCopy(final AmazonS3Client s3, final 
AccessControlList acl, final CannedAccessControlList cannedAccessControlList,
+                               final String sourceBucket, final String 
sourceKey,
+                               final String destinationBucket, final String 
destinationKey, final AtomicReference<String> multipartIdRef,
+                               final long contentLength, final int retryLimit) 
{
+        InitiateMultipartUploadRequest initRequest = new 
InitiateMultipartUploadRequest(destinationBucket,
+                destinationKey);
+        if (acl != null) {
+            initRequest.setAccessControlList(acl);
+        }
+        if (cannedAccessControlList != null) {
+            initRequest.setCannedACL(cannedAccessControlList);
+        }
+
+        InitiateMultipartUploadResult initResult = 
s3.initiateMultipartUpload(initRequest);
+
+        multipartIdRef.set(initResult.getUploadId());
+
+        long bytePosition = 0;
+        int partNumber = 1;
+        List<CopyPartResult> responses = new ArrayList<>();

Review Comment:
   ```suggestion
           final List<CopyPartResult> copyPartResults = new ArrayList<>();
   ```



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -119,28 +145,127 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
         final String destinationBucket = 
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
         final String destinationKey = 
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
 
+        final GetObjectMetadataRequest sourceMetadataRequest = new 
GetObjectMetadataRequest(sourceBucket, sourceKey);
+        final ObjectMetadata metadataResult = 
s3.getObjectMetadata(sourceMetadataRequest);
+
+        if (metadataResult == null) {
+            session.transfer(flowFile, REL_FAILURE);
+        }
+
+        final AtomicReference<String> multipartIdRef = new AtomicReference<>();
+        final long contentLength = metadataResult.getContentLength();
+        final boolean isMultiPart = metadataResult.getContentLength() > 
MULTIPART_THRESHOLD;
+
         try {
-            final CopyObjectRequest request = new 
CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
             final AccessControlList acl = createACL(context, flowFile);
-            if (acl != null) {
-                request.setAccessControlList(acl);
-            }
-
             final CannedAccessControlList cannedAccessControlList = 
createCannedACL(context, flowFile);
-            if (cannedAccessControlList != null) {
-                request.setCannedAccessControlList(cannedAccessControlList);
-            }
 
-            s3.copyObject(request);
+            if (!isMultiPart) {
+                smallFileCopy(s3, acl, cannedAccessControlList, sourceBucket, 
sourceKey, destinationBucket, destinationKey);
+            } else {
+                final int retryLimit = 
context.getProperty(MULTIPART_RETRIES).asInteger();
+                largeFileCopy(s3, acl, cannedAccessControlList, sourceBucket, 
sourceKey, destinationBucket,
+                        destinationKey, multipartIdRef, contentLength, 
retryLimit);
+            }
             session.getProvenanceReporter().send(flowFile, 
getTransitUrl(destinationBucket, destinationKey));
             session.transfer(flowFile, REL_SUCCESS);
-        } catch (final IllegalArgumentException | AmazonClientException e) {
+        } catch (final ProcessException | IllegalArgumentException | 
AmazonClientException e) {
+            if (isMultiPart && !StringUtils.isEmpty(multipartIdRef.get())) {
+                try {
+                    AbortMultipartUploadRequest abortRequest = new 
AbortMultipartUploadRequest(destinationBucket, destinationKey, 
multipartIdRef.get());

Review Comment:
   ```suggestion
                       final AbortMultipartUploadRequest abortRequest = new 
AbortMultipartUploadRequest(destinationBucket, destinationKey, 
multipartIdRef.get());
   ```



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -119,28 +145,127 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
         final String destinationBucket = 
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
         final String destinationKey = 
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
 
+        final GetObjectMetadataRequest sourceMetadataRequest = new 
GetObjectMetadataRequest(sourceBucket, sourceKey);
+        final ObjectMetadata metadataResult = 
s3.getObjectMetadata(sourceMetadataRequest);
+
+        if (metadataResult == null) {
+            session.transfer(flowFile, REL_FAILURE);
+        }
+
+        final AtomicReference<String> multipartIdRef = new AtomicReference<>();
+        final long contentLength = metadataResult.getContentLength();
+        final boolean isMultiPart = metadataResult.getContentLength() > 
MULTIPART_THRESHOLD;
+
         try {
-            final CopyObjectRequest request = new 
CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
             final AccessControlList acl = createACL(context, flowFile);
-            if (acl != null) {
-                request.setAccessControlList(acl);
-            }
-
             final CannedAccessControlList cannedAccessControlList = 
createCannedACL(context, flowFile);
-            if (cannedAccessControlList != null) {
-                request.setCannedAccessControlList(cannedAccessControlList);
-            }
 
-            s3.copyObject(request);
+            if (!isMultiPart) {
+                smallFileCopy(s3, acl, cannedAccessControlList, sourceBucket, 
sourceKey, destinationBucket, destinationKey);
+            } else {
+                final int retryLimit = 
context.getProperty(MULTIPART_RETRIES).asInteger();
+                largeFileCopy(s3, acl, cannedAccessControlList, sourceBucket, 
sourceKey, destinationBucket,
+                        destinationKey, multipartIdRef, contentLength, 
retryLimit);
+            }
             session.getProvenanceReporter().send(flowFile, 
getTransitUrl(destinationBucket, destinationKey));
             session.transfer(flowFile, REL_SUCCESS);
-        } catch (final IllegalArgumentException | AmazonClientException e) {
+        } catch (final ProcessException | IllegalArgumentException | 
AmazonClientException e) {
+            if (isMultiPart && !StringUtils.isEmpty(multipartIdRef.get())) {
+                try {
+                    AbortMultipartUploadRequest abortRequest = new 
AbortMultipartUploadRequest(destinationBucket, destinationKey, 
multipartIdRef.get());
+                    s3.abortMultipartUpload(abortRequest);
+                } catch (AmazonS3Exception ignored) {
+                    getLogger().error("Failed to cleanup the partial upload to 
bucket {} and key {}", destinationBucket, destinationKey);
+                    getLogger().error("Abort exception", ignored);
+                }
+            }
+
             flowFile = extractExceptionDetails(e, session, flowFile);
             getLogger().error("Failed to copy S3 object from Bucket [{}] Key 
[{}]", sourceBucket, sourceKey, e);
             session.transfer(flowFile, REL_FAILURE);
         }
     }
 
+    /*
+     * Sections of this code were derived from example code from the official 
AWS S3 documentation. Specifically this example:
+     * 
https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/java/example_code/s3/src/main/java/aws/example/s3/LowLevelMultipartCopy.java
+     */
+    private void largeFileCopy(final AmazonS3Client s3, final 
AccessControlList acl, final CannedAccessControlList cannedAccessControlList,
+                               final String sourceBucket, final String 
sourceKey,
+                               final String destinationBucket, final String 
destinationKey, final AtomicReference<String> multipartIdRef,
+                               final long contentLength, final int retryLimit) 
{
+        InitiateMultipartUploadRequest initRequest = new 
InitiateMultipartUploadRequest(destinationBucket,
+                destinationKey);
+        if (acl != null) {
+            initRequest.setAccessControlList(acl);
+        }
+        if (cannedAccessControlList != null) {
+            initRequest.setCannedACL(cannedAccessControlList);
+        }
+
+        InitiateMultipartUploadResult initResult = 
s3.initiateMultipartUpload(initRequest);
+
+        multipartIdRef.set(initResult.getUploadId());
+
+        long bytePosition = 0;
+        int partNumber = 1;
+        List<CopyPartResult> responses = new ArrayList<>();
+        while (bytePosition < contentLength) {
+            long lastByte = Math.min(bytePosition + MULTIPART_THRESHOLD - 1, 
contentLength - 1);
+
+            CopyPartRequest copyRequest = new CopyPartRequest()
+                    .withSourceBucketName(sourceBucket)
+                    .withSourceKey(sourceKey)
+                    .withDestinationBucketName(destinationBucket)
+                    .withDestinationKey(destinationKey)
+                    .withUploadId(initResult.getUploadId())
+                    .withFirstByte(bytePosition)
+                    .withLastByte(lastByte)
+                    .withPartNumber(partNumber++);
+            boolean partIsDone = false;
+            int retryIndex = 0;
+
+            while (!partIsDone) {
+                try {
+                    responses.add(s3.copyPart(copyRequest));
+                    partIsDone = true;
+                } catch (AmazonS3Exception e) {
+                    if (e.getStatusCode() == 503 && retryLimit > 0 && 
retryIndex < retryLimit) {
+                        retryIndex++;
+                    } else {
+                        throw e;
+                    }
+                }
+            }
+            bytePosition += MULTIPART_THRESHOLD;
+        }
+
+        CompleteMultipartUploadRequest completeRequest = new 
CompleteMultipartUploadRequest(

Review Comment:
   ```suggestion
           final CompleteMultipartUploadRequest completeRequest = new 
CompleteMultipartUploadRequest(
   ```



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -119,28 +145,127 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
         final String destinationBucket = 
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
         final String destinationKey = 
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
 
+        final GetObjectMetadataRequest sourceMetadataRequest = new 
GetObjectMetadataRequest(sourceBucket, sourceKey);
+        final ObjectMetadata metadataResult = 
s3.getObjectMetadata(sourceMetadataRequest);
+
+        if (metadataResult == null) {
+            session.transfer(flowFile, REL_FAILURE);
+        }
+
+        final AtomicReference<String> multipartIdRef = new AtomicReference<>();
+        final long contentLength = metadataResult.getContentLength();
+        final boolean isMultiPart = metadataResult.getContentLength() > 
MULTIPART_THRESHOLD;
+
         try {
-            final CopyObjectRequest request = new 
CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
             final AccessControlList acl = createACL(context, flowFile);
-            if (acl != null) {
-                request.setAccessControlList(acl);
-            }
-
             final CannedAccessControlList cannedAccessControlList = 
createCannedACL(context, flowFile);
-            if (cannedAccessControlList != null) {
-                request.setCannedAccessControlList(cannedAccessControlList);
-            }
 
-            s3.copyObject(request);
+            if (!isMultiPart) {
+                smallFileCopy(s3, acl, cannedAccessControlList, sourceBucket, 
sourceKey, destinationBucket, destinationKey);
+            } else {
+                final int retryLimit = 
context.getProperty(MULTIPART_RETRIES).asInteger();
+                largeFileCopy(s3, acl, cannedAccessControlList, sourceBucket, 
sourceKey, destinationBucket,
+                        destinationKey, multipartIdRef, contentLength, 
retryLimit);
+            }
             session.getProvenanceReporter().send(flowFile, 
getTransitUrl(destinationBucket, destinationKey));
             session.transfer(flowFile, REL_SUCCESS);
-        } catch (final IllegalArgumentException | AmazonClientException e) {
+        } catch (final ProcessException | IllegalArgumentException | 
AmazonClientException e) {
+            if (isMultiPart && !StringUtils.isEmpty(multipartIdRef.get())) {
+                try {
+                    AbortMultipartUploadRequest abortRequest = new 
AbortMultipartUploadRequest(destinationBucket, destinationKey, 
multipartIdRef.get());
+                    s3.abortMultipartUpload(abortRequest);
+                } catch (AmazonS3Exception ignored) {
+                    getLogger().error("Failed to cleanup the partial upload to 
bucket {} and key {}", destinationBucket, destinationKey);
+                    getLogger().error("Abort exception", ignored);
+                }
+            }
+
             flowFile = extractExceptionDetails(e, session, flowFile);
             getLogger().error("Failed to copy S3 object from Bucket [{}] Key 
[{}]", sourceBucket, sourceKey, e);
             session.transfer(flowFile, REL_FAILURE);
         }
     }
 
+    /*
+     * Sections of this code were derived from example code from the official 
AWS S3 documentation. Specifically this example:
+     * 
https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/java/example_code/s3/src/main/java/aws/example/s3/LowLevelMultipartCopy.java
+     */
+    private void largeFileCopy(final AmazonS3Client s3, final 
AccessControlList acl, final CannedAccessControlList cannedAccessControlList,
+                               final String sourceBucket, final String 
sourceKey,
+                               final String destinationBucket, final String 
destinationKey, final AtomicReference<String> multipartIdRef,
+                               final long contentLength, final int retryLimit) 
{
+        InitiateMultipartUploadRequest initRequest = new 
InitiateMultipartUploadRequest(destinationBucket,
+                destinationKey);
+        if (acl != null) {
+            initRequest.setAccessControlList(acl);
+        }
+        if (cannedAccessControlList != null) {
+            initRequest.setCannedACL(cannedAccessControlList);
+        }
+
+        InitiateMultipartUploadResult initResult = 
s3.initiateMultipartUpload(initRequest);
+
+        multipartIdRef.set(initResult.getUploadId());
+
+        long bytePosition = 0;
+        int partNumber = 1;
+        List<CopyPartResult> responses = new ArrayList<>();
+        while (bytePosition < contentLength) {
+            long lastByte = Math.min(bytePosition + MULTIPART_THRESHOLD - 1, 
contentLength - 1);
+
+            CopyPartRequest copyRequest = new CopyPartRequest()
+                    .withSourceBucketName(sourceBucket)
+                    .withSourceKey(sourceKey)
+                    .withDestinationBucketName(destinationBucket)
+                    .withDestinationKey(destinationKey)
+                    .withUploadId(initResult.getUploadId())
+                    .withFirstByte(bytePosition)
+                    .withLastByte(lastByte)
+                    .withPartNumber(partNumber++);
+            boolean partIsDone = false;
+            int retryIndex = 0;
+
+            while (!partIsDone) {
+                try {
+                    responses.add(s3.copyPart(copyRequest));
+                    partIsDone = true;
+                } catch (AmazonS3Exception e) {
+                    if (e.getStatusCode() == 503 && retryLimit > 0 && 
retryIndex < retryLimit) {
+                        retryIndex++;
+                    } else {
+                        throw e;
+                    }
+                }
+            }
+            bytePosition += MULTIPART_THRESHOLD;
+        }
+
+        CompleteMultipartUploadRequest completeRequest = new 
CompleteMultipartUploadRequest(
+                destinationBucket,
+                destinationKey,
+                initResult.getUploadId(),
+                responses.stream().map(response -> new 
PartETag(response.getPartNumber(), response.getETag()))
+                        .collect(Collectors.toList()));
+        s3.completeMultipartUpload(completeRequest);
+    }
+
+    private void smallFileCopy(final AmazonS3Client s3, final 
AccessControlList acl,

Review Comment:
   ```suggestion
       private void copyObject(final AmazonS3Client s3, final AccessControlList 
acl,
   ```



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -119,28 +145,127 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
         final String destinationBucket = 
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
         final String destinationKey = 
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
 
+        final GetObjectMetadataRequest sourceMetadataRequest = new 
GetObjectMetadataRequest(sourceBucket, sourceKey);
+        final ObjectMetadata metadataResult = 
s3.getObjectMetadata(sourceMetadataRequest);
+
+        if (metadataResult == null) {
+            session.transfer(flowFile, REL_FAILURE);
+        }
+
+        final AtomicReference<String> multipartIdRef = new AtomicReference<>();
+        final long contentLength = metadataResult.getContentLength();
+        final boolean isMultiPart = metadataResult.getContentLength() > 
MULTIPART_THRESHOLD;
+
         try {
-            final CopyObjectRequest request = new 
CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
             final AccessControlList acl = createACL(context, flowFile);
-            if (acl != null) {
-                request.setAccessControlList(acl);
-            }
-
             final CannedAccessControlList cannedAccessControlList = 
createCannedACL(context, flowFile);
-            if (cannedAccessControlList != null) {
-                request.setCannedAccessControlList(cannedAccessControlList);
-            }
 
-            s3.copyObject(request);
+            if (!isMultiPart) {
+                smallFileCopy(s3, acl, cannedAccessControlList, sourceBucket, 
sourceKey, destinationBucket, destinationKey);
+            } else {
+                final int retryLimit = 
context.getProperty(MULTIPART_RETRIES).asInteger();
+                largeFileCopy(s3, acl, cannedAccessControlList, sourceBucket, 
sourceKey, destinationBucket,
+                        destinationKey, multipartIdRef, contentLength, 
retryLimit);
+            }
             session.getProvenanceReporter().send(flowFile, 
getTransitUrl(destinationBucket, destinationKey));
             session.transfer(flowFile, REL_SUCCESS);
-        } catch (final IllegalArgumentException | AmazonClientException e) {
+        } catch (final ProcessException | IllegalArgumentException | 
AmazonClientException e) {
+            if (isMultiPart && !StringUtils.isEmpty(multipartIdRef.get())) {
+                try {
+                    AbortMultipartUploadRequest abortRequest = new 
AbortMultipartUploadRequest(destinationBucket, destinationKey, 
multipartIdRef.get());
+                    s3.abortMultipartUpload(abortRequest);
+                } catch (AmazonS3Exception ignored) {
+                    getLogger().error("Failed to cleanup the partial upload to 
bucket {} and key {}", destinationBucket, destinationKey);
+                    getLogger().error("Abort exception", ignored);
+                }
+            }
+
             flowFile = extractExceptionDetails(e, session, flowFile);
             getLogger().error("Failed to copy S3 object from Bucket [{}] Key 
[{}]", sourceBucket, sourceKey, e);
             session.transfer(flowFile, REL_FAILURE);
         }
     }
 
+    /*
+     * Sections of this code were derived from example code from the official 
AWS S3 documentation. Specifically this example:
+     * 
https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/java/example_code/s3/src/main/java/aws/example/s3/LowLevelMultipartCopy.java
+     */
+    private void largeFileCopy(final AmazonS3Client s3, final 
AccessControlList acl, final CannedAccessControlList cannedAccessControlList,
+                               final String sourceBucket, final String 
sourceKey,
+                               final String destinationBucket, final String 
destinationKey, final AtomicReference<String> multipartIdRef,
+                               final long contentLength, final int retryLimit) 
{
+        InitiateMultipartUploadRequest initRequest = new 
InitiateMultipartUploadRequest(destinationBucket,
+                destinationKey);
+        if (acl != null) {
+            initRequest.setAccessControlList(acl);
+        }
+        if (cannedAccessControlList != null) {
+            initRequest.setCannedACL(cannedAccessControlList);
+        }
+
+        InitiateMultipartUploadResult initResult = 
s3.initiateMultipartUpload(initRequest);
+
+        multipartIdRef.set(initResult.getUploadId());
+
+        long bytePosition = 0;
+        int partNumber = 1;
+        List<CopyPartResult> responses = new ArrayList<>();
+        while (bytePosition < contentLength) {
+            long lastByte = Math.min(bytePosition + MULTIPART_THRESHOLD - 1, 
contentLength - 1);
+
+            CopyPartRequest copyRequest = new CopyPartRequest()
+                    .withSourceBucketName(sourceBucket)
+                    .withSourceKey(sourceKey)
+                    .withDestinationBucketName(destinationBucket)
+                    .withDestinationKey(destinationKey)
+                    .withUploadId(initResult.getUploadId())
+                    .withFirstByte(bytePosition)
+                    .withLastByte(lastByte)
+                    .withPartNumber(partNumber++);
+            boolean partIsDone = false;

Review Comment:
   ```suggestion
               boolean partRequestCompleted = false;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to