This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 359f01442438c5de5f89cd42364aa9e8329a3cbd Author: Andrea Cosentino <[email protected]> AuthorDate: Fri Apr 17 08:41:49 2020 +0200 CAMEL-14618 - Camel-aws-s3: Add an option to consumer to be able to move the consumed files to another bucket, added test --- .../camel/component/aws2/s3/AWS2S3Consumer.java | 19 +++++++---------- .../s3/integration/S3ConsumerIntegrationTest.java | 24 +++++++++++++++++++--- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java index 0fa3fda..b99fe70 100644 --- a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java +++ b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java @@ -215,16 +215,7 @@ public class AWS2S3Consumer extends ScheduledBatchPollingConsumer { */ protected void processCommit(Exchange exchange) { try { - if (getConfiguration().isDeleteAfterRead()) { - String bucketName = exchange.getIn().getHeader(AWS2S3Constants.BUCKET_NAME, String.class); - String key = exchange.getIn().getHeader(AWS2S3Constants.KEY, String.class); - - LOG.trace("Deleting object from bucket {} with key {}...", bucketName, key); - - getAmazonS3Client().deleteObject(DeleteObjectRequest.builder().bucket(getConfiguration().getBucketName()).key(key).build()); - - LOG.trace("Deleted object from bucket {} with key {}...", bucketName, key); - } else if (getConfiguration().isMoveAfterRead()) { + if (getConfiguration().isMoveAfterRead()) { String bucketName = exchange.getIn().getHeader(AWS2S3Constants.BUCKET_NAME, String.class); String key = exchange.getIn().getHeader(AWS2S3Constants.KEY, String.class); @@ -233,7 +224,11 @@ public class AWS2S3Consumer extends ScheduledBatchPollingConsumer { getAmazonS3Client().copyObject(CopyObjectRequest.builder().destinationKey(key).destinationBucket(getConfiguration().getDestinationBucket()).copySource(bucketName + "/" + key).build()); LOG.trace("Moved object from bucket {} with key {} to bucket {}...", bucketName, key, getConfiguration().getDestinationBucket()); - + } + if (getConfiguration().isDeleteAfterRead()) { + String bucketName = exchange.getIn().getHeader(AWS2S3Constants.BUCKET_NAME, String.class); + String key = exchange.getIn().getHeader(AWS2S3Constants.KEY, String.class); + LOG.trace("Deleting object from bucket {} with key {}...", bucketName, key); getAmazonS3Client().deleteObject(DeleteObjectRequest.builder().bucket(getConfiguration().getBucketName()).key(key).build()); @@ -241,7 +236,7 @@ public class AWS2S3Consumer extends ScheduledBatchPollingConsumer { LOG.trace("Deleted object from bucket {} with key {}...", bucketName, key); } } catch (AwsServiceException e) { - getExceptionHandler().handleException("Error occurred during deleting object. This exception is ignored.", exchange, e); + getExceptionHandler().handleException("Error occurred during moving or deleting object. This exception is ignored.", exchange, e); } } diff --git a/components/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3ConsumerIntegrationTest.java b/components/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3ConsumerIntegrationTest.java index 7a17893..b13669a 100644 --- a/components/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3ConsumerIntegrationTest.java +++ b/components/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3ConsumerIntegrationTest.java @@ -46,7 +46,7 @@ public class S3ConsumerIntegrationTest extends CamelTestSupport { @Test public void sendIn() throws Exception { - result.expectedMessageCount(1); + result.expectedMessageCount(3); template.send("direct:putObject", new Processor() { @@ -56,8 +56,26 @@ public class S3ConsumerIntegrationTest extends CamelTestSupport { exchange.getIn().setBody("Test"); } }); + + template.send("direct:putObject", new Processor() { + + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(AWS2S3Constants.KEY, "test1.txt"); + exchange.getIn().setBody("Test1"); + } + }); + + template.send("direct:putObject", new Processor() { + + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(AWS2S3Constants.KEY, "test2.txt"); + exchange.getIn().setBody("Test2"); + } + }); - Thread.sleep(5000); + Thread.sleep(10000); assertMockEndpointsSatisfied(); } @@ -70,7 +88,7 @@ public class S3ConsumerIntegrationTest extends CamelTestSupport { from("direct:putObject").startupOrder(1).to(awsEndpoint).to("mock:result"); - from("aws2-s3://mycamel?moveAfterRead=true&deleteAfterRead=false&destinationBucket=camel-kafka-connector").startupOrder(2).log("${header.key}"); + from("aws2-s3://mycamel?moveAfterRead=true&destinationBucket=camel-kafka-connector").startupOrder(2).log("${body}"); } };
