This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new cf199094598 CAMEL-21009 - Camel-AWS2-Kinesis: CloudWatchAsyncClient
and DynamoDbAsyncClient parameters are ignored from KCL Consumer (#14927)
cf199094598 is described below
commit cf19909459848ffd2b0ebb9d5a9221cb99e53c85
Author: Andrea Cosentino <[email protected]>
AuthorDate: Wed Jul 24 14:54:16 2024 +0200
CAMEL-21009 - Camel-AWS2-Kinesis: CloudWatchAsyncClient and
DynamoDbAsyncClient parameters are ignored from KCL Consumer (#14927)
* CAMEL-21009 - Camel-AWS2-Kinesis: CloudWatchAsyncClient and
DynamoDbAsyncClient parameters are ignored from KCL Consumer
Signed-off-by: Andrea Cosentino <[email protected]>
* CAMEL-21009 - Camel-AWS2-Kinesis: CloudWatchAsyncClient and
DynamoDbAsyncClient parameters are ignored from KCL Consumer
Signed-off-by: Andrea Cosentino <[email protected]>
---------
Signed-off-by: Andrea Cosentino <[email protected]>
---
.../camel/component/aws2/kinesis/KclKinesis2Consumer.java | 10 +++++++---
1 file changed, 7 insertions(+), 3 deletions(-)
diff --git
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KclKinesis2Consumer.java
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KclKinesis2Consumer.java
index d8c3549edb1..63ce2844440 100644
---
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KclKinesis2Consumer.java
+++
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KclKinesis2Consumer.java
@@ -81,7 +81,7 @@ public class KclKinesis2Consumer extends DefaultConsumer {
protected void doStart() throws Exception {
super.doStart();
LOG.debug("Starting KCL Consumer");
- DynamoDbAsyncClient dynamoByAsyncClient = null;
+ DynamoDbAsyncClient dynamoDbAsyncClient = null;
CloudWatchAsyncClient cloudWatchAsyncClient = null;
KinesisAsyncClient kinesisAsyncClient = getEndpoint().getAsyncClient();
Kinesis2Configuration configuration = getEndpoint().getConfiguration();
@@ -105,8 +105,10 @@ public class KclKinesis2Consumer extends DefaultConsumer {
if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
clientBuilder =
clientBuilder.region(Region.of(configuration.getRegion()));
}
- dynamoByAsyncClient
+ dynamoDbAsyncClient
= clientBuilder.build();
+ } else {
+ dynamoDbAsyncClient =
getEndpoint().getConfiguration().getDynamoDbAsyncClient();
}
if
(ObjectHelper.isEmpty(getEndpoint().getConfiguration().getCloudWatchAsyncClient()))
{
CloudWatchAsyncClientBuilder clientBuilder =
CloudWatchAsyncClient.builder();
@@ -129,10 +131,12 @@ public class KclKinesis2Consumer extends DefaultConsumer {
clientBuilder =
clientBuilder.region(Region.of(configuration.getRegion()));
}
cloudWatchAsyncClient = clientBuilder.build();
+ } else {
+ cloudWatchAsyncClient =
getEndpoint().getConfiguration().getCloudWatchAsyncClient();
}
this.executor = this.getEndpoint().createExecutor();
this.executor.submit(new KclKinesisConsumingTask(
- configuration.getStreamName(), kinesisAsyncClient,
dynamoByAsyncClient, cloudWatchAsyncClient));
+ configuration.getStreamName(), kinesisAsyncClient,
dynamoDbAsyncClient, cloudWatchAsyncClient));
}
@Override