This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit a6ed48dc43462006ee9cc26f0a7bd218d0e24502 Author: Andrea Cosentino <[email protected]> AuthorDate: Thu Apr 16 09:25:27 2020 +0200 CAMEL-14868 - Camel-AWS2-*: Where possible, give the possiblity to the end user to pass an AWS Request pojo as body, aws2-msk list brokers --- .../camel/component/aws2/msk/MSK2Configuration.java | 13 +++++++++++++ .../apache/camel/component/aws2/msk/MSK2Producer.java | 18 +++++++++++++++++- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/components/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/MSK2Configuration.java b/components/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/MSK2Configuration.java index fdc3b10..8cddee0 100644 --- a/components/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/MSK2Configuration.java +++ b/components/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/MSK2Configuration.java @@ -47,6 +47,8 @@ public class MSK2Configuration implements Cloneable { private Integer proxyPort; @UriParam private String region; + @UriParam(defaultValue = "false") + private boolean pojoRequest; public KafkaClient getMskClient() { return mskClient; @@ -137,6 +139,17 @@ public class MSK2Configuration implements Cloneable { public void setRegion(String region) { this.region = region; } + + public boolean isPojoRequest() { + return pojoRequest; + } + + /** + * If we want to use a POJO request as body or not + */ + public void setPojoRequest(boolean pojoRequest) { + this.pojoRequest = pojoRequest; + } // ************************************************* // diff --git a/components/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/MSK2Producer.java b/components/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/MSK2Producer.java index 449e8af..e6fc60e 100644 --- a/components/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/MSK2Producer.java +++ b/components/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/MSK2Producer.java @@ -18,6 +18,7 @@ package org.apache.camel.component.aws2.msk; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.InvalidPayloadException; import org.apache.camel.Message; import org.apache.camel.support.DefaultProducer; import org.apache.camel.util.ObjectHelper; @@ -95,7 +96,21 @@ public class MSK2Producer extends DefaultProducer { return (MSK2Endpoint)super.getEndpoint(); } - private void listClusters(KafkaClient mskClient, Exchange exchange) { + private void listClusters(KafkaClient mskClient, Exchange exchange) throws InvalidPayloadException { + if (getConfiguration().isPojoRequest()) { + Object payload = exchange.getIn().getMandatoryBody(); + if (payload instanceof ListClustersRequest) { + ListClustersResponse result; + try { + result = mskClient.listClusters((ListClustersRequest) payload); + } catch (AwsServiceException ase) { + LOG.trace("List Clusters command returned the error code {}", ase.awsErrorDetails().errorCode()); + throw ase; + } + Message message = getMessageForResponse(exchange); + message.setBody(result); + } + } else { ListClustersRequest.Builder builder = ListClustersRequest.builder(); if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSK2Constants.CLUSTERS_FILTER))) { String filter = exchange.getIn().getHeader(MSK2Constants.CLUSTERS_FILTER, String.class); @@ -110,6 +125,7 @@ public class MSK2Producer extends DefaultProducer { } Message message = getMessageForResponse(exchange); message.setBody(result); + } } private void createCluster(KafkaClient mskClient, Exchange exchange) {
