This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 0e5e24c CAMEL-17185 Add an option for customizing retryable PubSub
server errors (#6413)
0e5e24c is described below
commit 0e5e24c9454057b9d1947db265690802b06d1e6e
Author: vpaturet <[email protected]>
AuthorDate: Sat Nov 13 09:41:12 2021 +0100
CAMEL-17185 Add an option for customizing retryable PubSub server errors
(#6413)
* Add an option for customizing retryable PubSub server errors
* Expose the option for customizing retryable code as a string
---
.../pubsub/GooglePubsubComponentConfigurer.java | 6 +++++
.../component/google/pubsub/google-pubsub.json | 1 +
.../google/pubsub/GooglePubsubComponent.java | 29 ++++++++++++++++++++++
.../dsl/GooglePubsubComponentBuilderFactory.java | 18 ++++++++++++++
4 files changed, 54 insertions(+)
diff --git
a/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubComponentConfigurer.java
b/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubComponentConfigurer.java
index e2b37f5..6543efb 100644
---
a/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubComponentConfigurer.java
+++
b/components/camel-google/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubComponentConfigurer.java
@@ -37,6 +37,8 @@ public class GooglePubsubComponentConfigurer extends
PropertyConfigurerSupport i
case "publisherTerminationTimeout":
target.setPublisherTerminationTimeout(property(camelContext, int.class,
value)); return true;
case "serviceaccountkey":
case "serviceAccountKey":
target.setServiceAccountKey(property(camelContext, java.lang.String.class,
value)); return true;
+ case "synchronouspullretryablecodes":
+ case "synchronousPullRetryableCodes":
target.setSynchronousPullRetryableCodes(property(camelContext,
java.lang.String.class, value)); return true;
default: return false;
}
}
@@ -60,6 +62,8 @@ public class GooglePubsubComponentConfigurer extends
PropertyConfigurerSupport i
case "publisherTerminationTimeout": return int.class;
case "serviceaccountkey":
case "serviceAccountKey": return java.lang.String.class;
+ case "synchronouspullretryablecodes":
+ case "synchronousPullRetryableCodes": return java.lang.String.class;
default: return null;
}
}
@@ -84,6 +88,8 @@ public class GooglePubsubComponentConfigurer extends
PropertyConfigurerSupport i
case "publisherTerminationTimeout": return
target.getPublisherTerminationTimeout();
case "serviceaccountkey":
case "serviceAccountKey": return target.getServiceAccountKey();
+ case "synchronouspullretryablecodes":
+ case "synchronousPullRetryableCodes": return
target.getSynchronousPullRetryableCodes();
default: return null;
}
}
diff --git
a/components/camel-google/camel-google-pubsub/src/generated/resources/org/apache/camel/component/google/pubsub/google-pubsub.json
b/components/camel-google/camel-google-pubsub/src/generated/resources/org/apache/camel/component/google/pubsub/google-pubsub.json
index 12d1223..aaba5a0 100644
---
a/components/camel-google/camel-google-pubsub/src/generated/resources/org/apache/camel/component/google/pubsub/google-pubsub.json
+++
b/components/camel-google/camel-google-pubsub/src/generated/resources/org/apache/camel/component/google/pubsub/google-pubsub.json
@@ -26,6 +26,7 @@
"endpoint": { "kind": "property", "displayName": "Endpoint", "group":
"common", "label": "common", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "Endpoint to use with local Pub\/Sub emulator." },
"serviceAccountKey": { "kind": "property", "displayName": "Service Account
Key", "group": "common", "label": "common", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "description": "The Service account key that can be
used as credentials for the PubSub publisher\/subscriber. It can be loaded by
default from classpath, but you can prefix with classpath:, file:, or http: to
load the resource from different [...]
"bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error
Handler", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Allows for bridging the
consumer to the Camel routing Error Handler, which mean any exceptions occurred
while the consumer is trying to pickup incoming messages, or the likes, will
now be processed as a me [...]
+ "synchronousPullRetryableCodes": { "kind": "property", "displayName":
"Synchronous Pull Retryable Codes", "group": "consumer", "label": "consumer",
"required": false, "type": "string", "javaType": "java.lang.String",
"deprecated": false, "autowired": false, "secret": false, "description":
"Comma-separated list of additional retryable error codes for synchronous pull.
By default the PubSub client library retries ABORTED, UNAVAILABLE, UNKNOWN" },
"lazyStartProducer": { "kind": "property", "displayName": "Lazy Start
Producer", "group": "producer", "label": "producer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Whether the producer
should be started lazy (on the first message). By starting lazy you can use
this to allow CamelContext and routes to startup in situations where a producer
may otherwise fail during star [...]
"publisherCacheSize": { "kind": "property", "displayName": "Publisher
Cache Size", "group": "producer", "label": "producer", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false,
"secret": false, "description": "Maximum number of producers to cache. This
could be increased if you have producers for lots of different topics." },
"publisherCacheTimeout": { "kind": "property", "displayName": "Publisher
Cache Timeout", "group": "producer", "label": "producer", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false,
"secret": false, "description": "How many milliseconds should each producer
stay alive in the cache." },
diff --git
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java
index 1f9b69f..7a49337 100644
---
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java
+++
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java
@@ -17,15 +17,20 @@
package org.apache.camel.component.google.pubsub;
import java.io.IOException;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
@@ -87,6 +92,11 @@ public class GooglePubsubComponent extends DefaultComponent {
description = "How many milliseconds should a producer be
allowed to terminate.")
private int publisherTerminationTimeout = 60000;
+ @Metadata(
+ label = "consumer",
+ description = "Comma-separated list of additional retryable
error codes for synchronous pull. By default the PubSub client library retries
ABORTED, UNAVAILABLE, UNKNOWN")
+ private String synchronousPullRetryableCodes;
+
private RemovalListener<String, Publisher> removalListener = removal -> {
Publisher publisher = removal.getValue();
if (publisher == null) {
@@ -183,6 +193,17 @@ public class GooglePubsubComponent extends
DefaultComponent {
SubscriberStubSettings.Builder builder =
SubscriberStubSettings.newBuilder().setTransportChannelProvider(
SubscriberStubSettings.defaultGrpcTransportProviderBuilder().build());
+ if (synchronousPullRetryableCodes != null) {
+ // retrieve the default retryable codes and add the ones specified
as a component option
+ Set<StatusCode.Code> retryableCodes = new
HashSet<>(builder.pullSettings().getRetryableCodes());
+ Set<StatusCode.Code> customRetryableCodes =
Stream.of(synchronousPullRetryableCodes.split(","))
+ .map(String::trim)
+ .map(StatusCode.Code::valueOf)
+ .collect(Collectors.toSet());
+ retryableCodes.addAll(customRetryableCodes);
+ builder.pullSettings().setRetryableCodes(retryableCodes);
+ }
+
if (StringHelper.trimToNull(endpoint) != null) {
ManagedChannel channel =
ManagedChannelBuilder.forTarget(endpoint).usePlaintext().build();
TransportChannelProvider channelProvider
@@ -254,4 +275,12 @@ public class GooglePubsubComponent extends
DefaultComponent {
public void setServiceAccountKey(String serviceAccountKey) {
this.serviceAccountKey = serviceAccountKey;
}
+
+ public String getSynchronousPullRetryableCodes() {
+ return synchronousPullRetryableCodes;
+ }
+
+ public void setSynchronousPullRetryableCodes(String
synchronousPullRetryableCodes) {
+ this.synchronousPullRetryableCodes = synchronousPullRetryableCodes;
+ }
}
diff --git
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/GooglePubsubComponentBuilderFactory.java
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/GooglePubsubComponentBuilderFactory.java
index 9f3b79b..c3daf2e 100644
---
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/GooglePubsubComponentBuilderFactory.java
+++
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/GooglePubsubComponentBuilderFactory.java
@@ -121,6 +121,23 @@ public interface GooglePubsubComponentBuilderFactory {
return this;
}
/**
+ * Comma-separated list of additional retryable error codes for
+ * synchronous pull. By default the PubSub client library retries
+ * ABORTED, UNAVAILABLE, UNKNOWN.
+ *
+ * The option is a: <code>java.lang.String</code> type.
+ *
+ * Group: consumer
+ *
+ * @param synchronousPullRetryableCodes the value to set
+ * @return the dsl builder
+ */
+ default GooglePubsubComponentBuilder synchronousPullRetryableCodes(
+ java.lang.String synchronousPullRetryableCodes) {
+ doSetProperty("synchronousPullRetryableCodes",
synchronousPullRetryableCodes);
+ return this;
+ }
+ /**
* Whether the producer should be started lazy (on the first message).
* By starting lazy you can use this to allow CamelContext and routes
to
* startup in situations where a producer may otherwise fail during
@@ -232,6 +249,7 @@ public interface GooglePubsubComponentBuilderFactory {
case "endpoint": ((GooglePubsubComponent)
component).setEndpoint((java.lang.String) value); return true;
case "serviceAccountKey": ((GooglePubsubComponent)
component).setServiceAccountKey((java.lang.String) value); return true;
case "bridgeErrorHandler": ((GooglePubsubComponent)
component).setBridgeErrorHandler((boolean) value); return true;
+ case "synchronousPullRetryableCodes": ((GooglePubsubComponent)
component).setSynchronousPullRetryableCodes((java.lang.String) value); return
true;
case "lazyStartProducer": ((GooglePubsubComponent)
component).setLazyStartProducer((boolean) value); return true;
case "publisherCacheSize": ((GooglePubsubComponent)
component).setPublisherCacheSize((int) value); return true;
case "publisherCacheTimeout": ((GooglePubsubComponent)
component).setPublisherCacheTimeout((int) value); return true;