This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.18.x by this push:
new b2b00a39d28 CAMEL-18447 Camel-pubsub: AsyncTaskException: Asynchronous
task failed with real account (#8667)
b2b00a39d28 is described below
commit b2b00a39d28ca05ccaa99b55fd2641cc8048ee5b
Author: JiriOndrusek <[email protected]>
AuthorDate: Sat Nov 5 11:00:22 2022 +0100
CAMEL-18447 Camel-pubsub: AsyncTaskException: Asynchronous task failed with
real account (#8667)
---
.../camel/component/google/pubsub/GooglePubsubConsumer.java | 6 +++++-
.../component/google/pubsub/consumer/AcknowledgeSync.java | 10 ++++++----
2 files changed, 11 insertions(+), 5 deletions(-)
diff --git
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
index e639e400e22..ef3dc08a443 100644
---
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
+++
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
@@ -192,8 +192,12 @@ public class GooglePubsubConsumer extends DefaultConsumer {
}
if (endpoint.getAckMode() !=
GooglePubsubConstants.AckMode.NONE) {
+ //existing subscriber can not be propagated,
because it will be closed at the end of this block
+ //subscriber will be created at the moment of use
+ // (see
https://issues.apache.org/jira/browse/CAMEL-18447)
exchange.adapt(ExtendedExchange.class)
- .addOnCompletion(new
AcknowledgeSync(subscriber, subscriptionName));
+ .addOnCompletion(new AcknowledgeSync(
+ () ->
endpoint.getComponent().getSubscriberStub(endpoint), subscriptionName));
}
try {
diff --git
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java
index df4498607d8..0c2fa0528f3 100644
---
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java
+++
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.google.pubsub.consumer;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Callable;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.pubsub.v1.AcknowledgeRequest;
@@ -28,11 +29,12 @@ import org.apache.camel.spi.Synchronization;
public class AcknowledgeSync implements Synchronization {
- private final SubscriberStub subscriber;
+ //Supplier cannot be used because of thrown exception (Callback used
instead)
+ private final Callable<SubscriberStub> subscriberStubSupplier;
private final String subscriptionName;
- public AcknowledgeSync(SubscriberStub subscriber, String subscriptionName)
{
- this.subscriber = subscriber;
+ public AcknowledgeSync(Callable<SubscriberStub> subscriberStubSupplier,
String subscriptionName) {
+ this.subscriberStubSupplier = subscriberStubSupplier;
this.subscriptionName = subscriptionName;
}
@@ -41,7 +43,7 @@ public class AcknowledgeSync implements Synchronization {
AcknowledgeRequest ackRequest = AcknowledgeRequest.newBuilder()
.addAllAckIds(getAckIdList(exchange))
.setSubscription(subscriptionName).build();
- try {
+ try (SubscriberStub subscriber = subscriberStubSupplier.call()) {
subscriber.acknowledgeCallable().call(ackRequest);
} catch (Exception e) {
throw new RuntimeCamelException(e);