This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit d640513f47a6c0399bad0bc3b4176283a2f4efd1 Author: Andrea Cosentino <[email protected]> AuthorDate: Thu Apr 16 09:34:38 2020 +0200 CAMEL-14868 - Camel-AWS2-*: Where possible, give the possiblity to the end user to pass an AWS Request pojo as body, aws2-msk fixed CS --- .../camel/component/aws2/msk/MSK2Producer.java | 168 ++++++++++----------- 1 file changed, 84 insertions(+), 84 deletions(-) diff --git a/components/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/MSK2Producer.java b/components/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/MSK2Producer.java index 92253dd..366be9e 100644 --- a/components/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/MSK2Producer.java +++ b/components/camel-aws2-msk/src/main/java/org/apache/camel/component/aws2/msk/MSK2Producer.java @@ -102,7 +102,7 @@ public class MSK2Producer extends DefaultProducer { if (payload instanceof ListClustersRequest) { ListClustersResponse result; try { - result = mskClient.listClusters((ListClustersRequest) payload); + result = mskClient.listClusters((ListClustersRequest)payload); } catch (AwsServiceException ase) { LOG.trace("List Clusters command returned the error code {}", ase.awsErrorDetails().errorCode()); throw ase; @@ -111,20 +111,20 @@ public class MSK2Producer extends DefaultProducer { message.setBody(result); } } else { - ListClustersRequest.Builder builder = ListClustersRequest.builder(); - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSK2Constants.CLUSTERS_FILTER))) { - String filter = exchange.getIn().getHeader(MSK2Constants.CLUSTERS_FILTER, String.class); - builder.clusterNameFilter(filter); - } - ListClustersResponse result; - try { - result = mskClient.listClusters(builder.build()); - } catch (AwsServiceException ase) { - LOG.trace("List Clusters command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); + ListClustersRequest.Builder builder = ListClustersRequest.builder(); + if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSK2Constants.CLUSTERS_FILTER))) { + String filter = exchange.getIn().getHeader(MSK2Constants.CLUSTERS_FILTER, String.class); + builder.clusterNameFilter(filter); + } + ListClustersResponse result; + try { + result = mskClient.listClusters(builder.build()); + } catch (AwsServiceException ase) { + LOG.trace("List Clusters command returned the error code {}", ase.awsErrorDetails().errorCode()); + throw ase; + } + Message message = getMessageForResponse(exchange); + message.setBody(result); } } @@ -134,7 +134,7 @@ public class MSK2Producer extends DefaultProducer { if (payload instanceof CreateClusterRequest) { CreateClusterResponse response; try { - response = mskClient.createCluster((CreateClusterRequest) payload); + response = mskClient.createCluster((CreateClusterRequest)payload); } catch (AwsServiceException ase) { LOG.trace("Create Cluster command returned the error code {}", ase.awsErrorDetails().errorCode()); throw ase; @@ -143,40 +143,40 @@ public class MSK2Producer extends DefaultProducer { message.setBody(response); } } else { - CreateClusterRequest.Builder builder = CreateClusterRequest.builder(); - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSK2Constants.CLUSTER_NAME))) { - String name = exchange.getIn().getHeader(MSK2Constants.CLUSTER_NAME, String.class); - builder.clusterName(name); - } else { - throw new IllegalArgumentException("Cluster Name must be specified"); - } - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSK2Constants.CLUSTER_KAFKA_VERSION))) { - String version = exchange.getIn().getHeader(MSK2Constants.CLUSTER_KAFKA_VERSION, String.class); - builder.kafkaVersion(version); - } else { - throw new IllegalArgumentException("Kafka Version must be specified"); - } - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSK2Constants.BROKER_NODES_NUMBER))) { - Integer nodesNumber = exchange.getIn().getHeader(MSK2Constants.BROKER_NODES_NUMBER, Integer.class); - builder.numberOfBrokerNodes(nodesNumber); - } else { - throw new IllegalArgumentException("Kafka Version must be specified"); - } - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSK2Constants.BROKER_NODES_GROUP_INFO))) { - BrokerNodeGroupInfo brokerNodesGroupInfo = exchange.getIn().getHeader(MSK2Constants.BROKER_NODES_GROUP_INFO, BrokerNodeGroupInfo.class); - builder.brokerNodeGroupInfo(brokerNodesGroupInfo); - } else { - throw new IllegalArgumentException("BrokerNodeGroupInfo must be specified"); - } - CreateClusterResponse response; - try { - response = mskClient.createCluster(builder.build()); - } catch (AwsServiceException ase) { - LOG.trace("Create Cluster command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(response); + CreateClusterRequest.Builder builder = CreateClusterRequest.builder(); + if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSK2Constants.CLUSTER_NAME))) { + String name = exchange.getIn().getHeader(MSK2Constants.CLUSTER_NAME, String.class); + builder.clusterName(name); + } else { + throw new IllegalArgumentException("Cluster Name must be specified"); + } + if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSK2Constants.CLUSTER_KAFKA_VERSION))) { + String version = exchange.getIn().getHeader(MSK2Constants.CLUSTER_KAFKA_VERSION, String.class); + builder.kafkaVersion(version); + } else { + throw new IllegalArgumentException("Kafka Version must be specified"); + } + if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSK2Constants.BROKER_NODES_NUMBER))) { + Integer nodesNumber = exchange.getIn().getHeader(MSK2Constants.BROKER_NODES_NUMBER, Integer.class); + builder.numberOfBrokerNodes(nodesNumber); + } else { + throw new IllegalArgumentException("Kafka Version must be specified"); + } + if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSK2Constants.BROKER_NODES_GROUP_INFO))) { + BrokerNodeGroupInfo brokerNodesGroupInfo = exchange.getIn().getHeader(MSK2Constants.BROKER_NODES_GROUP_INFO, BrokerNodeGroupInfo.class); + builder.brokerNodeGroupInfo(brokerNodesGroupInfo); + } else { + throw new IllegalArgumentException("BrokerNodeGroupInfo must be specified"); + } + CreateClusterResponse response; + try { + response = mskClient.createCluster(builder.build()); + } catch (AwsServiceException ase) { + LOG.trace("Create Cluster command returned the error code {}", ase.awsErrorDetails().errorCode()); + throw ase; + } + Message message = getMessageForResponse(exchange); + message.setBody(response); } } @@ -186,7 +186,7 @@ public class MSK2Producer extends DefaultProducer { if (payload instanceof DeleteClusterRequest) { DeleteClusterResponse result; try { - result = mskClient.deleteCluster((DeleteClusterRequest) payload); + result = mskClient.deleteCluster((DeleteClusterRequest)payload); } catch (AwsServiceException ase) { LOG.trace("Delete Cluster command returned the error code {}", ase.awsErrorDetails().errorCode()); throw ase; @@ -195,22 +195,22 @@ public class MSK2Producer extends DefaultProducer { message.setBody(result); } } else { - DeleteClusterRequest.Builder builder = DeleteClusterRequest.builder(); - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSK2Constants.CLUSTER_ARN))) { - String arn = exchange.getIn().getHeader(MSK2Constants.CLUSTER_ARN, String.class); - builder.clusterArn(arn); - } else { - throw new IllegalArgumentException("Cluster ARN must be specified"); - } - DeleteClusterResponse result; - try { - result = mskClient.deleteCluster(builder.build()); - } catch (AwsServiceException ase) { - LOG.trace("Delete Cluster command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); + DeleteClusterRequest.Builder builder = DeleteClusterRequest.builder(); + if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSK2Constants.CLUSTER_ARN))) { + String arn = exchange.getIn().getHeader(MSK2Constants.CLUSTER_ARN, String.class); + builder.clusterArn(arn); + } else { + throw new IllegalArgumentException("Cluster ARN must be specified"); + } + DeleteClusterResponse result; + try { + result = mskClient.deleteCluster(builder.build()); + } catch (AwsServiceException ase) { + LOG.trace("Delete Cluster command returned the error code {}", ase.awsErrorDetails().errorCode()); + throw ase; + } + Message message = getMessageForResponse(exchange); + message.setBody(result); } } @@ -220,7 +220,7 @@ public class MSK2Producer extends DefaultProducer { if (payload instanceof DescribeClusterRequest) { DescribeClusterResponse result; try { - result = mskClient.describeCluster((DescribeClusterRequest) payload); + result = mskClient.describeCluster((DescribeClusterRequest)payload); } catch (AwsServiceException ase) { LOG.trace("Delete Cluster command returned the error code {}", ase.awsErrorDetails().errorCode()); throw ase; @@ -229,22 +229,22 @@ public class MSK2Producer extends DefaultProducer { message.setBody(result); } } else { - DescribeClusterRequest.Builder builder = DescribeClusterRequest.builder(); - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSK2Constants.CLUSTER_ARN))) { - String arn = exchange.getIn().getHeader(MSK2Constants.CLUSTER_ARN, String.class); - builder.clusterArn(arn); - } else { - throw new IllegalArgumentException("Cluster ARN must be specified"); - } - DescribeClusterResponse result; - try { - result = mskClient.describeCluster(builder.build()); - } catch (AwsServiceException ase) { - LOG.trace("Delete Cluster command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); + DescribeClusterRequest.Builder builder = DescribeClusterRequest.builder(); + if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSK2Constants.CLUSTER_ARN))) { + String arn = exchange.getIn().getHeader(MSK2Constants.CLUSTER_ARN, String.class); + builder.clusterArn(arn); + } else { + throw new IllegalArgumentException("Cluster ARN must be specified"); + } + DescribeClusterResponse result; + try { + result = mskClient.describeCluster(builder.build()); + } catch (AwsServiceException ase) { + LOG.trace("Delete Cluster command returned the error code {}", ase.awsErrorDetails().errorCode()); + throw ase; + } + Message message = getMessageForResponse(exchange); + message.setBody(result); } }
