This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 0b6fa7794de Salesforce component: removed initial replay id check
(#18396)
0b6fa7794de is described below
commit 0b6fa7794def4bcc7f35214454794fe9cd4e1427
Author: Lorenzo Benvenuti <[email protected]>
AuthorDate: Thu Jul 3 19:07:10 2025 +0200
Salesforce component: removed initial replay id check (#18396)
* CAMEL-22173 - camel-salesforce: avoid performing a dedicated call to
check replay id in pub/sub consumer
* CAMEL-22173 - camel-salesforce: addressed PR comments
---------
Co-authored-by: Lorenzo Benvenuti <[email protected]>
---
.../camel/catalog/components/salesforce.json | 41 +++++----
.../salesforce/SalesforceEndpointConfigurer.java | 6 ++
.../salesforce/SalesforceEndpointUriFactory.java | 3 +-
.../camel/component/salesforce/salesforce.json | 41 +++++----
.../salesforce/InvalidReplayIdException.java | 32 +++++++
.../component/salesforce/PubSubApiConsumer.java | 6 +-
.../component/salesforce/SalesforceComponent.java | 1 +
.../component/salesforce/SalesforceEndpoint.java | 15 +++
.../internal/client/PubSubApiClient.java | 94 +++++--------------
.../component/salesforce/PubSubApiManualIT.java | 83 +++++++++++++++++
.../camel/component/salesforce/PubSubApiTest.java | 102 +++++++++++++++++++--
.../SendInvalidReplayIdErrorPubSubServer.java | 89 ++++++++++++++++++
12 files changed, 392 insertions(+), 121 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/salesforce.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/salesforce.json
index 3c0c4859bc7..5c338d7e0f1 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/salesforce.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/salesforce.json
@@ -202,25 +202,26 @@
"sObjectSearch": { "index": 45, "kind": "parameter", "displayName":
"SObject Search", "group": "common", "label": "", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Salesforce SOSL search
string" },
"streamQueryResult": { "index": 46, "kind": "parameter", "displayName":
"Stream query result", "group": "common", "label": "", "required": false,
"type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "false",
"configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "If true, streams SOQL
query result and transparently handles sub [...]
"updateTopic": { "index": 47, "kind": "parameter", "displayName": "Update
Topic", "group": "common", "label": "", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Whether to update an
existing Push Topic when using the Streaming API, defaults to false" },
- "pubSubBatchSize": { "index": 48, "kind": "parameter", "displayName": "Pub
Sub Batch Size", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 100, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Max number of events to
receive in a batch from the Pub\/Sub API." },
- "pubSubDeserializeType": { "index": 49, "kind": "parameter",
"displayName": "Pub Sub Deserialize Type", "group": "consumer", "label":
"consumer", "required": false, "type": "object", "javaType":
"org.apache.camel.component.salesforce.PubSubDeserializeType", "enum": [
"AVRO", "SPECIFIC_RECORD", "GENERIC_RECORD", "POJO", "JSON" ], "deprecated":
false, "autowired": false, "secret": false, "defaultValue": "AVRO",
"configurationClass": "org.apache.camel.component.salesforce.SalesforceEndp
[...]
- "pubSubPojoClass": { "index": 50, "kind": "parameter", "displayName": "Pub
Sub Pojo Class", "group": "consumer", "label": "consumer", "required": false,
"type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Fully qualified class
name to deserialize Pub\/Sub API event to." },
- "pubSubReplayId": { "index": 51, "kind": "parameter", "displayName": "Pub
Sub Replay Id", "group": "consumer", "label": "consumer", "required": false,
"type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "description": "The replayId value to use
when subscribing to the Pub\/Sub API." },
- "replayId": { "index": 52, "kind": "parameter", "displayName": "Replay
Id", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "java.lang.Long", "deprecated": false, "autowired":
false, "secret": false, "description": "The replayId value to use when
subscribing to the Streaming API." },
- "replayPreset": { "index": 53, "kind": "parameter", "displayName": "Replay
Preset", "group": "consumer", "label": "consumer", "required": false, "type":
"object", "javaType": "com.salesforce.eventbus.protobuf.ReplayPreset", "enum":
[ "LATEST", "EARLIEST", "CUSTOM" ], "deprecated": false, "autowired": false,
"secret": false, "defaultValue": "LATEST", "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "descript [...]
- "bridgeErrorHandler": { "index": 54, "kind": "parameter", "displayName":
"Bridge Error Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Allows for bridging the consumer to the
Camel routing Error Handler, which mean any exceptions (if possible) occurred
while the Camel consumer is trying to pickup incoming [...]
- "exceptionHandler": { "index": 55, "kind": "parameter", "displayName":
"Exception Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.",
"deprecated": false, "autowired": false, "secret": false, "description": "To
let the consumer use a custom ExceptionHandler. Notice if the option
bridgeErrorHandler is enabled then this option is not in use. By de [...]
- "exchangePattern": { "index": 56, "kind": "parameter", "displayName":
"Exchange Pattern", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ],
"deprecated": false, "autowired": false, "secret": false, "description": "Sets
the exchange pattern when the consumer creates an exchange." },
- "allOrNone": { "index": 57, "kind": "parameter", "displayName": "All Or
None", "group": "producer", "label": "producer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Composite API option to
indicate to rollback all records if any are not successful." },
- "apexUrl": { "index": 58, "kind": "parameter", "displayName": "Apex Url",
"group": "producer", "label": "producer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "APEX method URL" },
- "compositeMethod": { "index": 59, "kind": "parameter", "displayName":
"Composite Method", "group": "producer", "label": "producer", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Composite (raw) method."
},
- "eventName": { "index": 60, "kind": "parameter", "displayName": "Event
Name", "group": "producer", "label": "producer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Name of Platform Event,
Change Data Capture Event, custom event, etc." },
- "eventSchemaFormat": { "index": 61, "kind": "parameter", "displayName":
"Event Schema Format", "group": "producer", "label": "producer", "required":
false, "type": "object", "javaType":
"org.apache.camel.component.salesforce.internal.dto.EventSchemaFormatEnum",
"enum": [ "EXPANDED", "COMPACT" ], "deprecated": false, "autowired": false,
"secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "descr [...]
- "eventSchemaId": { "index": 62, "kind": "parameter", "displayName": "Event
Schema Id", "group": "producer", "label": "producer", "required": false,
"type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "The ID of the event
schema." },
- "rawHttpHeaders": { "index": 63, "kind": "parameter", "displayName": "Raw
Http Headers", "group": "producer", "label": "producer", "required": false,
"type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Comma separated list of
message headers to include as HTTP parameters for Raw operation." },
- "rawMethod": { "index": 64, "kind": "parameter", "displayName": "Raw
Method", "group": "producer", "label": "producer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "HTTP method to use for
the Raw operation" },
- "rawPath": { "index": 65, "kind": "parameter", "displayName": "Raw Path",
"group": "producer", "label": "producer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "The portion of the
endpoint URL after the domain name. E.g.,
'\/services\/data\/v52.0\/sobjects\/Accou [...]
- "rawQueryParameters": { "index": 66, "kind": "parameter", "displayName":
"Raw Query Parameters", "group": "producer", "label": "producer", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Comma separated list of
message headers to include as query parameters for Raw [...]
- "lazyStartProducer": { "index": 67, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
+ "fallbackToLatestReplayId": { "index": 48, "kind": "parameter",
"displayName": "Fallback To Latest Replay Id", "group": "consumer", "label":
"consumer", "required": false, "type": "boolean", "javaType": "boolean",
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
false, "description": "Whether the pub\/sub consumer needs to fallback to the
latest replay id when the provided id is not valid. If set to false, the
component will keep retrying; in order to treat t [...]
+ "pubSubBatchSize": { "index": 49, "kind": "parameter", "displayName": "Pub
Sub Batch Size", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 100, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Max number of events to
receive in a batch from the Pub\/Sub API." },
+ "pubSubDeserializeType": { "index": 50, "kind": "parameter",
"displayName": "Pub Sub Deserialize Type", "group": "consumer", "label":
"consumer", "required": false, "type": "object", "javaType":
"org.apache.camel.component.salesforce.PubSubDeserializeType", "enum": [
"AVRO", "SPECIFIC_RECORD", "GENERIC_RECORD", "POJO", "JSON" ], "deprecated":
false, "autowired": false, "secret": false, "defaultValue": "AVRO",
"configurationClass": "org.apache.camel.component.salesforce.SalesforceEndp
[...]
+ "pubSubPojoClass": { "index": 51, "kind": "parameter", "displayName": "Pub
Sub Pojo Class", "group": "consumer", "label": "consumer", "required": false,
"type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Fully qualified class
name to deserialize Pub\/Sub API event to." },
+ "pubSubReplayId": { "index": 52, "kind": "parameter", "displayName": "Pub
Sub Replay Id", "group": "consumer", "label": "consumer", "required": false,
"type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "description": "The replayId value to use
when subscribing to the Pub\/Sub API." },
+ "replayId": { "index": 53, "kind": "parameter", "displayName": "Replay
Id", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "java.lang.Long", "deprecated": false, "autowired":
false, "secret": false, "description": "The replayId value to use when
subscribing to the Streaming API." },
+ "replayPreset": { "index": 54, "kind": "parameter", "displayName": "Replay
Preset", "group": "consumer", "label": "consumer", "required": false, "type":
"object", "javaType": "com.salesforce.eventbus.protobuf.ReplayPreset", "enum":
[ "LATEST", "EARLIEST", "CUSTOM" ], "deprecated": false, "autowired": false,
"secret": false, "defaultValue": "LATEST", "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "descript [...]
+ "bridgeErrorHandler": { "index": 55, "kind": "parameter", "displayName":
"Bridge Error Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Allows for bridging the consumer to the
Camel routing Error Handler, which mean any exceptions (if possible) occurred
while the Camel consumer is trying to pickup incoming [...]
+ "exceptionHandler": { "index": 56, "kind": "parameter", "displayName":
"Exception Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.",
"deprecated": false, "autowired": false, "secret": false, "description": "To
let the consumer use a custom ExceptionHandler. Notice if the option
bridgeErrorHandler is enabled then this option is not in use. By de [...]
+ "exchangePattern": { "index": 57, "kind": "parameter", "displayName":
"Exchange Pattern", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ],
"deprecated": false, "autowired": false, "secret": false, "description": "Sets
the exchange pattern when the consumer creates an exchange." },
+ "allOrNone": { "index": 58, "kind": "parameter", "displayName": "All Or
None", "group": "producer", "label": "producer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Composite API option to
indicate to rollback all records if any are not successful." },
+ "apexUrl": { "index": 59, "kind": "parameter", "displayName": "Apex Url",
"group": "producer", "label": "producer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "APEX method URL" },
+ "compositeMethod": { "index": 60, "kind": "parameter", "displayName":
"Composite Method", "group": "producer", "label": "producer", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Composite (raw) method."
},
+ "eventName": { "index": 61, "kind": "parameter", "displayName": "Event
Name", "group": "producer", "label": "producer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Name of Platform Event,
Change Data Capture Event, custom event, etc." },
+ "eventSchemaFormat": { "index": 62, "kind": "parameter", "displayName":
"Event Schema Format", "group": "producer", "label": "producer", "required":
false, "type": "object", "javaType":
"org.apache.camel.component.salesforce.internal.dto.EventSchemaFormatEnum",
"enum": [ "EXPANDED", "COMPACT" ], "deprecated": false, "autowired": false,
"secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "descr [...]
+ "eventSchemaId": { "index": 63, "kind": "parameter", "displayName": "Event
Schema Id", "group": "producer", "label": "producer", "required": false,
"type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "The ID of the event
schema." },
+ "rawHttpHeaders": { "index": 64, "kind": "parameter", "displayName": "Raw
Http Headers", "group": "producer", "label": "producer", "required": false,
"type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Comma separated list of
message headers to include as HTTP parameters for Raw operation." },
+ "rawMethod": { "index": 65, "kind": "parameter", "displayName": "Raw
Method", "group": "producer", "label": "producer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "HTTP method to use for
the Raw operation" },
+ "rawPath": { "index": 66, "kind": "parameter", "displayName": "Raw Path",
"group": "producer", "label": "producer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "The portion of the
endpoint URL after the domain name. E.g.,
'\/services\/data\/v52.0\/sobjects\/Accou [...]
+ "rawQueryParameters": { "index": 67, "kind": "parameter", "displayName":
"Raw Query Parameters", "group": "producer", "label": "producer", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Comma separated list of
message headers to include as query parameters for Raw [...]
+ "lazyStartProducer": { "index": 68, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
}
}
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointConfigurer.java
b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointConfigurer.java
index 1a636d7886b..ee9dcea1f95 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointConfigurer.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointConfigurer.java
@@ -57,6 +57,8 @@ public class SalesforceEndpointConfigurer extends
PropertyConfigurerSupport impl
case "exchangePattern":
target.setExchangePattern(property(camelContext,
org.apache.camel.ExchangePattern.class, value)); return true;
case "fallbackreplayid":
case "fallBackReplayId":
target.getConfiguration().setFallBackReplayId(property(camelContext,
java.lang.Long.class, value)); return true;
+ case "fallbacktolatestreplayid":
+ case "fallbackToLatestReplayId":
target.setFallbackToLatestReplayId(property(camelContext, boolean.class,
value)); return true;
case "format":
target.getConfiguration().setFormat(property(camelContext,
org.apache.camel.component.salesforce.internal.PayloadFormat.class, value));
return true;
case "httpclient":
case "httpClient":
target.getConfiguration().setHttpClient(property(camelContext,
org.apache.camel.component.salesforce.SalesforceHttpClient.class, value));
return true;
@@ -193,6 +195,8 @@ public class SalesforceEndpointConfigurer extends
PropertyConfigurerSupport impl
case "exchangePattern": return org.apache.camel.ExchangePattern.class;
case "fallbackreplayid":
case "fallBackReplayId": return java.lang.Long.class;
+ case "fallbacktolatestreplayid":
+ case "fallbackToLatestReplayId": return boolean.class;
case "format": return
org.apache.camel.component.salesforce.internal.PayloadFormat.class;
case "httpclient":
case "httpClient": return
org.apache.camel.component.salesforce.SalesforceHttpClient.class;
@@ -330,6 +334,8 @@ public class SalesforceEndpointConfigurer extends
PropertyConfigurerSupport impl
case "exchangePattern": return target.getExchangePattern();
case "fallbackreplayid":
case "fallBackReplayId": return
target.getConfiguration().getFallBackReplayId();
+ case "fallbacktolatestreplayid":
+ case "fallbackToLatestReplayId": return
target.isFallbackToLatestReplayId();
case "format": return target.getConfiguration().getFormat();
case "httpclient":
case "httpClient": return target.getConfiguration().getHttpClient();
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointUriFactory.java
b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointUriFactory.java
index c347be8472e..a9d53e081ae 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointUriFactory.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointUriFactory.java
@@ -23,7 +23,7 @@ public class SalesforceEndpointUriFactory extends
org.apache.camel.support.compo
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Set<String> MULTI_VALUE_PREFIXES;
static {
- Set<String> props = new HashSet<>(68);
+ Set<String> props = new HashSet<>(69);
props.add("allOrNone");
props.add("apexMethod");
props.add("apexQueryParams");
@@ -41,6 +41,7 @@ public class SalesforceEndpointUriFactory extends
org.apache.camel.support.compo
props.add("exceptionHandler");
props.add("exchangePattern");
props.add("fallBackReplayId");
+ props.add("fallbackToLatestReplayId");
props.add("format");
props.add("httpClient");
props.add("includeDetails");
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/generated/resources/META-INF/org/apache/camel/component/salesforce/salesforce.json
b/components/camel-salesforce/camel-salesforce-component/src/generated/resources/META-INF/org/apache/camel/component/salesforce/salesforce.json
index 3c0c4859bc7..5c338d7e0f1 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/generated/resources/META-INF/org/apache/camel/component/salesforce/salesforce.json
+++
b/components/camel-salesforce/camel-salesforce-component/src/generated/resources/META-INF/org/apache/camel/component/salesforce/salesforce.json
@@ -202,25 +202,26 @@
"sObjectSearch": { "index": 45, "kind": "parameter", "displayName":
"SObject Search", "group": "common", "label": "", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Salesforce SOSL search
string" },
"streamQueryResult": { "index": 46, "kind": "parameter", "displayName":
"Stream query result", "group": "common", "label": "", "required": false,
"type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "false",
"configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "If true, streams SOQL
query result and transparently handles sub [...]
"updateTopic": { "index": 47, "kind": "parameter", "displayName": "Update
Topic", "group": "common", "label": "", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Whether to update an
existing Push Topic when using the Streaming API, defaults to false" },
- "pubSubBatchSize": { "index": 48, "kind": "parameter", "displayName": "Pub
Sub Batch Size", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 100, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Max number of events to
receive in a batch from the Pub\/Sub API." },
- "pubSubDeserializeType": { "index": 49, "kind": "parameter",
"displayName": "Pub Sub Deserialize Type", "group": "consumer", "label":
"consumer", "required": false, "type": "object", "javaType":
"org.apache.camel.component.salesforce.PubSubDeserializeType", "enum": [
"AVRO", "SPECIFIC_RECORD", "GENERIC_RECORD", "POJO", "JSON" ], "deprecated":
false, "autowired": false, "secret": false, "defaultValue": "AVRO",
"configurationClass": "org.apache.camel.component.salesforce.SalesforceEndp
[...]
- "pubSubPojoClass": { "index": 50, "kind": "parameter", "displayName": "Pub
Sub Pojo Class", "group": "consumer", "label": "consumer", "required": false,
"type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Fully qualified class
name to deserialize Pub\/Sub API event to." },
- "pubSubReplayId": { "index": 51, "kind": "parameter", "displayName": "Pub
Sub Replay Id", "group": "consumer", "label": "consumer", "required": false,
"type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "description": "The replayId value to use
when subscribing to the Pub\/Sub API." },
- "replayId": { "index": 52, "kind": "parameter", "displayName": "Replay
Id", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "java.lang.Long", "deprecated": false, "autowired":
false, "secret": false, "description": "The replayId value to use when
subscribing to the Streaming API." },
- "replayPreset": { "index": 53, "kind": "parameter", "displayName": "Replay
Preset", "group": "consumer", "label": "consumer", "required": false, "type":
"object", "javaType": "com.salesforce.eventbus.protobuf.ReplayPreset", "enum":
[ "LATEST", "EARLIEST", "CUSTOM" ], "deprecated": false, "autowired": false,
"secret": false, "defaultValue": "LATEST", "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "descript [...]
- "bridgeErrorHandler": { "index": 54, "kind": "parameter", "displayName":
"Bridge Error Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Allows for bridging the consumer to the
Camel routing Error Handler, which mean any exceptions (if possible) occurred
while the Camel consumer is trying to pickup incoming [...]
- "exceptionHandler": { "index": 55, "kind": "parameter", "displayName":
"Exception Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.",
"deprecated": false, "autowired": false, "secret": false, "description": "To
let the consumer use a custom ExceptionHandler. Notice if the option
bridgeErrorHandler is enabled then this option is not in use. By de [...]
- "exchangePattern": { "index": 56, "kind": "parameter", "displayName":
"Exchange Pattern", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ],
"deprecated": false, "autowired": false, "secret": false, "description": "Sets
the exchange pattern when the consumer creates an exchange." },
- "allOrNone": { "index": 57, "kind": "parameter", "displayName": "All Or
None", "group": "producer", "label": "producer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Composite API option to
indicate to rollback all records if any are not successful." },
- "apexUrl": { "index": 58, "kind": "parameter", "displayName": "Apex Url",
"group": "producer", "label": "producer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "APEX method URL" },
- "compositeMethod": { "index": 59, "kind": "parameter", "displayName":
"Composite Method", "group": "producer", "label": "producer", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Composite (raw) method."
},
- "eventName": { "index": 60, "kind": "parameter", "displayName": "Event
Name", "group": "producer", "label": "producer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Name of Platform Event,
Change Data Capture Event, custom event, etc." },
- "eventSchemaFormat": { "index": 61, "kind": "parameter", "displayName":
"Event Schema Format", "group": "producer", "label": "producer", "required":
false, "type": "object", "javaType":
"org.apache.camel.component.salesforce.internal.dto.EventSchemaFormatEnum",
"enum": [ "EXPANDED", "COMPACT" ], "deprecated": false, "autowired": false,
"secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "descr [...]
- "eventSchemaId": { "index": 62, "kind": "parameter", "displayName": "Event
Schema Id", "group": "producer", "label": "producer", "required": false,
"type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "The ID of the event
schema." },
- "rawHttpHeaders": { "index": 63, "kind": "parameter", "displayName": "Raw
Http Headers", "group": "producer", "label": "producer", "required": false,
"type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Comma separated list of
message headers to include as HTTP parameters for Raw operation." },
- "rawMethod": { "index": 64, "kind": "parameter", "displayName": "Raw
Method", "group": "producer", "label": "producer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "HTTP method to use for
the Raw operation" },
- "rawPath": { "index": 65, "kind": "parameter", "displayName": "Raw Path",
"group": "producer", "label": "producer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "The portion of the
endpoint URL after the domain name. E.g.,
'\/services\/data\/v52.0\/sobjects\/Accou [...]
- "rawQueryParameters": { "index": 66, "kind": "parameter", "displayName":
"Raw Query Parameters", "group": "producer", "label": "producer", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Comma separated list of
message headers to include as query parameters for Raw [...]
- "lazyStartProducer": { "index": 67, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
+ "fallbackToLatestReplayId": { "index": 48, "kind": "parameter",
"displayName": "Fallback To Latest Replay Id", "group": "consumer", "label":
"consumer", "required": false, "type": "boolean", "javaType": "boolean",
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
false, "description": "Whether the pub\/sub consumer needs to fallback to the
latest replay id when the provided id is not valid. If set to false, the
component will keep retrying; in order to treat t [...]
+ "pubSubBatchSize": { "index": 49, "kind": "parameter", "displayName": "Pub
Sub Batch Size", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 100, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Max number of events to
receive in a batch from the Pub\/Sub API." },
+ "pubSubDeserializeType": { "index": 50, "kind": "parameter",
"displayName": "Pub Sub Deserialize Type", "group": "consumer", "label":
"consumer", "required": false, "type": "object", "javaType":
"org.apache.camel.component.salesforce.PubSubDeserializeType", "enum": [
"AVRO", "SPECIFIC_RECORD", "GENERIC_RECORD", "POJO", "JSON" ], "deprecated":
false, "autowired": false, "secret": false, "defaultValue": "AVRO",
"configurationClass": "org.apache.camel.component.salesforce.SalesforceEndp
[...]
+ "pubSubPojoClass": { "index": 51, "kind": "parameter", "displayName": "Pub
Sub Pojo Class", "group": "consumer", "label": "consumer", "required": false,
"type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Fully qualified class
name to deserialize Pub\/Sub API event to." },
+ "pubSubReplayId": { "index": 52, "kind": "parameter", "displayName": "Pub
Sub Replay Id", "group": "consumer", "label": "consumer", "required": false,
"type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "description": "The replayId value to use
when subscribing to the Pub\/Sub API." },
+ "replayId": { "index": 53, "kind": "parameter", "displayName": "Replay
Id", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "java.lang.Long", "deprecated": false, "autowired":
false, "secret": false, "description": "The replayId value to use when
subscribing to the Streaming API." },
+ "replayPreset": { "index": 54, "kind": "parameter", "displayName": "Replay
Preset", "group": "consumer", "label": "consumer", "required": false, "type":
"object", "javaType": "com.salesforce.eventbus.protobuf.ReplayPreset", "enum":
[ "LATEST", "EARLIEST", "CUSTOM" ], "deprecated": false, "autowired": false,
"secret": false, "defaultValue": "LATEST", "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "descript [...]
+ "bridgeErrorHandler": { "index": 55, "kind": "parameter", "displayName":
"Bridge Error Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Allows for bridging the consumer to the
Camel routing Error Handler, which mean any exceptions (if possible) occurred
while the Camel consumer is trying to pickup incoming [...]
+ "exceptionHandler": { "index": 56, "kind": "parameter", "displayName":
"Exception Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.",
"deprecated": false, "autowired": false, "secret": false, "description": "To
let the consumer use a custom ExceptionHandler. Notice if the option
bridgeErrorHandler is enabled then this option is not in use. By de [...]
+ "exchangePattern": { "index": 57, "kind": "parameter", "displayName":
"Exchange Pattern", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ],
"deprecated": false, "autowired": false, "secret": false, "description": "Sets
the exchange pattern when the consumer creates an exchange." },
+ "allOrNone": { "index": 58, "kind": "parameter", "displayName": "All Or
None", "group": "producer", "label": "producer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Composite API option to
indicate to rollback all records if any are not successful." },
+ "apexUrl": { "index": 59, "kind": "parameter", "displayName": "Apex Url",
"group": "producer", "label": "producer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "APEX method URL" },
+ "compositeMethod": { "index": 60, "kind": "parameter", "displayName":
"Composite Method", "group": "producer", "label": "producer", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Composite (raw) method."
},
+ "eventName": { "index": 61, "kind": "parameter", "displayName": "Event
Name", "group": "producer", "label": "producer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Name of Platform Event,
Change Data Capture Event, custom event, etc." },
+ "eventSchemaFormat": { "index": 62, "kind": "parameter", "displayName":
"Event Schema Format", "group": "producer", "label": "producer", "required":
false, "type": "object", "javaType":
"org.apache.camel.component.salesforce.internal.dto.EventSchemaFormatEnum",
"enum": [ "EXPANDED", "COMPACT" ], "deprecated": false, "autowired": false,
"secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "descr [...]
+ "eventSchemaId": { "index": 63, "kind": "parameter", "displayName": "Event
Schema Id", "group": "producer", "label": "producer", "required": false,
"type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "The ID of the event
schema." },
+ "rawHttpHeaders": { "index": 64, "kind": "parameter", "displayName": "Raw
Http Headers", "group": "producer", "label": "producer", "required": false,
"type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Comma separated list of
message headers to include as HTTP parameters for Raw operation." },
+ "rawMethod": { "index": 65, "kind": "parameter", "displayName": "Raw
Method", "group": "producer", "label": "producer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "HTTP method to use for
the Raw operation" },
+ "rawPath": { "index": 66, "kind": "parameter", "displayName": "Raw Path",
"group": "producer", "label": "producer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "The portion of the
endpoint URL after the domain name. E.g.,
'\/services\/data\/v52.0\/sobjects\/Accou [...]
+ "rawQueryParameters": { "index": 67, "kind": "parameter", "displayName":
"Raw Query Parameters", "group": "producer", "label": "producer", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.salesforce.SalesforceEndpointConfig",
"configurationField": "configuration", "description": "Comma separated list of
message headers to include as query parameters for Raw [...]
+ "lazyStartProducer": { "index": 68, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
}
}
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/InvalidReplayIdException.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/InvalidReplayIdException.java
new file mode 100644
index 00000000000..a41f41d9a64
--- /dev/null
+++
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/InvalidReplayIdException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce;
+
+public class InvalidReplayIdException extends RuntimeException {
+
+ private final String replayId;
+
+ public InvalidReplayIdException(String message, String replayId) {
+ super(message);
+ this.replayId = replayId;
+ }
+
+ public String getReplayId() {
+ return replayId;
+ }
+
+}
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java
index c6a4b1c639d..5b34620aaed 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java
@@ -37,7 +37,7 @@ public class PubSubApiConsumer extends DefaultConsumer {
private final String topic;
private final ReplayPreset initialReplayPreset;
private String initialReplayId;
- private int initialReplayIdTimeout;
+ private boolean fallbackToLatestReplayId;
private final SalesforceEndpoint endpoint;
private final int batchSize;
@@ -54,7 +54,7 @@ public class PubSubApiConsumer extends DefaultConsumer {
this.topic = endpoint.getTopicName();
this.initialReplayPreset =
endpoint.getConfiguration().getReplayPreset();
this.initialReplayId = endpoint.getPubSubReplayId();
- this.initialReplayIdTimeout =
endpoint.getComponent().getInitialReplyIdTimeout();
+ this.fallbackToLatestReplayId = endpoint.isFallbackToLatestReplayId();
if (initialReplayPreset == ReplayPreset.CUSTOM && initialReplayId ==
null) {
throw new IllegalArgumentException("pubSubReplayId option is
required if ReplayPreset is CUSTOM.");
}
@@ -94,7 +94,7 @@ public class PubSubApiConsumer extends DefaultConsumer {
this.pubSubClient.setUsePlainTextConnection(this.usePlainTextConnection);
ServiceHelper.startService(pubSubClient);
- pubSubClient.subscribe(this, initialReplayPreset, initialReplayId,
initialReplayIdTimeout, true);
+ pubSubClient.subscribe(this, initialReplayPreset, initialReplayId,
fallbackToLatestReplayId);
}
@Override
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
index babc4a41f1d..cc9a105d83b 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
@@ -276,6 +276,7 @@ public class SalesforceComponent extends DefaultComponent
implements SSLContextP
javaType = "java.lang.String", label = "common")
private String packages;
+ @Deprecated
@Metadata(description = "Timeout in seconds to validate when a custom
pubSubReplayId has been configured, when starting the Camel Salesforce
consumer.",
defaultValue = "30", label = "consumer,advanced")
private int initialReplyIdTimeout = 30;
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
index bd35c02d620..b506da0f3c6 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
@@ -72,6 +72,13 @@ public class SalesforceEndpoint extends DefaultEndpoint {
@UriParam(label = "consumer", description = "The replayId value to use
when subscribing to the Pub/Sub API.")
private String pubSubReplayId;
+ @UriParam(label = "consumer",
+ description = "Whether the pub/sub consumer needs to fallback to
the latest replay id when the provided id is not valid. "
+ + "If set to false, the component will keep
retrying; in order to treat this as an exception you can "
+ + "use BridgeExceptionHandlerToErrorHandler and
handle the exception in the route.",
+ defaultValue = "false")
+ private boolean fallbackToLatestReplayId;
+
public SalesforceEndpoint(String uri, SalesforceComponent
salesforceComponent, SalesforceEndpointConfig configuration,
OperationName operationName, String topicName) {
super(uri, salesforceComponent);
@@ -141,6 +148,14 @@ public class SalesforceEndpoint extends DefaultEndpoint {
this.pubSubReplayId = pubSubReplayId;
}
+ public boolean isFallbackToLatestReplayId() {
+ return fallbackToLatestReplayId;
+ }
+
+ public void setFallbackToLatestReplayId(boolean fallbackToLatestReplayId) {
+ this.fallbackToLatestReplayId = fallbackToLatestReplayId;
+ }
+
@Override
protected void doStart() throws Exception {
try {
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
index 1845a190bef..461e24e3bb9 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
@@ -24,11 +24,8 @@ import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
import com.salesforce.eventbus.protobuf.ConsumerEvent;
import com.salesforce.eventbus.protobuf.FetchRequest;
@@ -63,6 +60,7 @@ import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
+import org.apache.camel.component.salesforce.InvalidReplayIdException;
import org.apache.camel.component.salesforce.PubSubApiConsumer;
import org.apache.camel.component.salesforce.SalesforceLoginConfig;
import org.apache.camel.component.salesforce.api.SalesforceException;
@@ -77,7 +75,7 @@ public class PubSubApiClient extends ServiceSupport {
public static final String PUBSUB_ERROR_AUTH_ERROR =
"sfdc.platform.eventbus.grpc.service.auth.error";
private static final String PUBSUB_ERROR_AUTH_REFRESH_INVALID =
"sfdc.platform.eventbus.grpc.service.auth.refresh.invalid";
- private static final String PUBSUB_ERROR_CORRUPTED_REPLAY_ID
+ public static final String PUBSUB_ERROR_CORRUPTED_REPLAY_ID
=
"sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted";
protected PubSubGrpc.PubSubStub asyncStub;
@@ -105,7 +103,7 @@ public class PubSubApiClient extends ServiceSupport {
private ReplayPreset initialReplayPreset;
private String initialReplayId;
- private int initialReplayIdTimeout;
+ private boolean fallbackToLatestReplayId;
public PubSubApiClient(SalesforceSession session, SalesforceLoginConfig
loginConfig, String pubSubHost,
int pubSubPort, long backoffIncrement, long
maxBackoff, boolean allowUseProxyServer) {
@@ -151,12 +149,11 @@ public class PubSubApiClient extends ServiceSupport {
}
public void subscribe(
- PubSubApiConsumer consumer, ReplayPreset replayPreset, String
initialReplayId, int initialReplayIdTimeout,
- boolean initialSubscribe) {
+ PubSubApiConsumer consumer, ReplayPreset replayPreset, String
initialReplayId, boolean fallbackToLatestReplayId) {
LOG.debug("Starting subscribe {}", consumer.getTopic());
this.initialReplayPreset = replayPreset;
this.initialReplayId = initialReplayId;
- this.initialReplayIdTimeout = initialReplayIdTimeout;
+ this.fallbackToLatestReplayId = fallbackToLatestReplayId;
if (replayPreset == ReplayPreset.CUSTOM && initialReplayId == null) {
throw new RuntimeException("initialReplayId is required for
ReplayPreset.CUSTOM");
}
@@ -165,9 +162,6 @@ public class PubSubApiClient extends ServiceSupport {
ByteString replayId = null;
if (initialReplayId != null) {
replayId = base64DecodeToByteString(initialReplayId);
- if (initialSubscribe) {
- checkInitialReplayIdValidity(topic, replayId,
initialReplayIdTimeout);
- }
}
LOG.info("Subscribing to topic: {}.", topic);
final FetchResponseObserver responseObserver = new
FetchResponseObserver(consumer);
@@ -185,57 +179,6 @@ public class PubSubApiClient extends ServiceSupport {
serverStream.onNext(fetchRequestBuilder.build());
}
- public void checkInitialReplayIdValidity(String topic, ByteString
replayId, int initialReplayIdTimeout) {
- LOG.info("Checking initialReplayId: {} for topic: {}",
base64EncodeByteString(replayId), topic);
-
- final AtomicReference<Throwable> error = new AtomicReference<>();
- final CountDownLatch latch = new CountDownLatch(1);
- final StreamObserver<FetchResponse> responseObserver = new
StreamObserver<>() {
-
- @Override
- public void onNext(FetchResponse value) {
- latch.countDown();
- }
-
- @Override
- public void onError(Throwable t) {
- if (t instanceof StatusRuntimeException e) {
- Metadata trailers = e.getTrailers();
- if (trailers != null && PUBSUB_ERROR_CORRUPTED_REPLAY_ID
- .equals(trailers.get(Metadata.Key.of("error-code",
Metadata.ASCII_STRING_MARSHALLER)))) {
- error.set(t);
- }
- }
- latch.countDown();
- }
-
- @Override
- public void onCompleted() {
- }
- };
- StreamObserver<FetchRequest> serverStream =
asyncStub.subscribe(responseObserver);
- FetchRequest.Builder fetchRequestBuilder = FetchRequest.newBuilder()
- .setReplayPreset(ReplayPreset.CUSTOM)
- .setTopicName(topic)
- .setNumRequested(1)
- .setReplayId(replayId);
- serverStream.onNext(fetchRequestBuilder.build());
-
- try {
- if (!Uninterruptibles.awaitUninterruptibly(latch,
initialReplayIdTimeout, TimeUnit.SECONDS)) {
- throw new RuntimeException("Timeout while checking
initialReplayId.");
- }
- } finally {
- serverStream.onCompleted();
- }
-
- if (error.get() != null) {
- throw new RuntimeException(
- "initialReplayId " + base64EncodeByteString(replayId) + "
is not valid",
- error.get());
- }
- }
-
public TopicInfo getTopicInfo(String name) {
return topicInfoCache.computeIfAbsent(name,
topic ->
blockingStub.getTopic(TopicRequest.newBuilder().setTopicName(topic).build()));
@@ -408,10 +351,22 @@ public class PubSubApiClient extends ServiceSupport {
LOG.debug("logged in {}", consumer.getTopic());
}
case PUBSUB_ERROR_CORRUPTED_REPLAY_ID -> {
- LOG.error("replay id: " + replayId
- + " is corrupt. Trying to recover by
resubscribing with LATEST replay preset");
- replayId = null;
- initialReplayPreset = ReplayPreset.LATEST;
+ String currReplayId = null;
+ if (replayId != null) {
+ currReplayId = replayId;
+ } else if (initialReplayPreset ==
ReplayPreset.CUSTOM) {
+ currReplayId = initialReplayId;
+ }
+ if (fallbackToLatestReplayId) {
+ initialReplayPreset = ReplayPreset.LATEST;
+ LOG.warn("replay id: " + currReplayId
+ + " is corrupt. Trying to recover by
resubscribing with LATEST replay preset");
+ replayId = null;
+ } else {
+
consumer.getExceptionHandler().handleException(new InvalidReplayIdException(
+ "Corrupt replay id: " + currReplayId,
+ currReplayId));
+ }
}
default -> LOG.error("unexpected errorCode: {}",
errorCode);
}
@@ -431,12 +386,13 @@ public class PubSubApiClient extends ServiceSupport {
throw new RuntimeException(e);
}
if (replayId != null) {
- subscribe(consumer, ReplayPreset.CUSTOM, replayId,
initialReplayIdTimeout, false);
+ subscribe(consumer, ReplayPreset.CUSTOM, replayId,
fallbackToLatestReplayId);
} else {
if (initialReplayPreset == ReplayPreset.CUSTOM) {
- subscribe(consumer, initialReplayPreset, initialReplayId,
initialReplayIdTimeout, false);
+ subscribe(consumer, initialReplayPreset, initialReplayId,
+ fallbackToLatestReplayId);
} else {
- subscribe(consumer, initialReplayPreset, null,
initialReplayIdTimeout, false);
+ subscribe(consumer, initialReplayPreset, null,
fallbackToLatestReplayId);
}
}
}
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiManualIT.java
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiManualIT.java
index bf98679d6e7..9cf527e912f 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiManualIT.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiManualIT.java
@@ -18,7 +18,10 @@ package org.apache.camel.component.salesforce;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Base64;
import java.util.List;
+import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;
import com.google.protobuf.ByteString;
@@ -350,6 +353,86 @@ public class PubSubApiManualIT extends
AbstractSalesforceTestBase {
mock.assertIsSatisfied();
}
+ @Test
+ public void canHandleInvalidReplayIdExceptions() throws Exception {
+
context.getPropertiesComponent().addOverrideProperty("invalidPubSubReplayId",
getInvalidReplayId());
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ onException(InvalidReplayIdException.class)
+ .handled(true)
+ .to("mock:HandleInvalidReplayIdException");
+
+ from("salesforce:pubSubSubscribe:/event/CamelEventMessage__e" +
+ "?replayPreset=CUSTOM" +
+ "&pubSubReplayId={{invalidPubSubReplayId}}" +
+ "&bridgeErrorHandler=true")
+ .routeId("r.subscriberWithInvalidReplayId")
+ .autoStartup(false)
+ .to("mock:SubscriberWithInvalidReplayId");
+ }
+ });
+
+ MockEndpoint errorHandlerMock =
getMockEndpoint("mock:HandleInvalidReplayIdException");
+ errorHandlerMock.expectedMessageCount(1);
+
+ MockEndpoint subscriberMock =
getMockEndpoint("mock:SubscriberWithInvalidReplayId");
+ subscriberMock.expectedMessageCount(0);
+
+
context.getRouteController().startRoute("r.subscriberWithInvalidReplayId");
+ final GenericRecord record = new GenericRecordBuilder(camelEventSchema)
+ .set("Message__c", "hello world")
+ .set("CreatedDate", System.currentTimeMillis() / 1000)
+ .set("CreatedById", "123")
+ .build();
+ template.requestBody("direct:publishCamelEventMessage",
List.of(record));
+
+ errorHandlerMock.assertIsSatisfied();
+ subscriberMock.assertIsSatisfied();
+ }
+
+ @Test
+ public void fallsBackToLatestReplayId() throws Exception {
+
context.getPropertiesComponent().addOverrideProperty("invalidPubSubReplayId",
getInvalidReplayId());
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("salesforce:pubSubSubscribe:/event/CamelEventMessage__e" +
+ "?replayPreset=CUSTOM" +
+ "&pubSubReplayId={{invalidPubSubReplayId}}" +
+ "&fallbackToLatestReplayId=true")
+ .routeId("r.subscriberWithFallbackToLatestReplayId")
+ .to("mock:SubscriberWithFallbackToLatestReplayId");
+ }
+ });
+
+ MockEndpoint mock =
getMockEndpoint("mock:SubscriberWithFallbackToLatestReplayId");
+ mock.expectedMessageCount(1);
+
+ // Need to wait to ensure that the event is published _after_ the
client has subscribed
+ Thread.sleep(3000);
+ final GenericRecord record = new GenericRecordBuilder(camelEventSchema)
+ .set("Message__c", "hello world")
+ .set("CreatedDate", System.currentTimeMillis() / 1000)
+ .set("CreatedById", "123")
+ .build();
+ template.requestBody("direct:publishCamelEventMessage",
List.of(record));
+
+ mock.assertIsSatisfied();
+ }
+
+ private String getInvalidReplayId() {
+ // behind the scenes a replay id is just the index
+ // of an event in a topic, hopefully creating a huge, random replay id
+ // is enough to make it invalid
+ long replayId = 100000000000L + new Random().nextLong(100000000000L);
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+ buffer.putLong(replayId);
+ return Base64.getEncoder().encodeToString(buffer.array());
+ }
+
@Override
protected RouteBuilder doCreateRouteBuilder() throws Exception {
return new RouteBuilder() {
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java
index 0b71c079712..b328ec8a520 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java
@@ -18,21 +18,29 @@ package org.apache.camel.component.salesforce;
import java.io.IOException;
import java.net.ServerSocket;
+import java.util.Base64;
+import com.google.protobuf.ByteString;
import com.salesforce.eventbus.protobuf.ReplayPreset;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import org.apache.camel.component.salesforce.internal.SalesforceSession;
import org.apache.camel.component.salesforce.internal.client.PubSubApiClient;
import
org.apache.camel.component.salesforce.internal.pubsub.AuthErrorPubSubServer;
+import
org.apache.camel.component.salesforce.internal.pubsub.SendInvalidReplayIdErrorPubSubServer;
import
org.apache.camel.component.salesforce.internal.pubsub.SendOneMessagePubSubServer;
+import org.apache.camel.spi.ExceptionHandler;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -64,11 +72,11 @@ public class PubSubApiTest {
port, 1000, 10000, true));
client.setUsePlainTextConnection(true);
client.start();
- client.subscribe(consumer, ReplayPreset.LATEST, null, 0, true);
+ client.subscribe(consumer, ReplayPreset.LATEST, null, true);
verify(session, timeout(5000)).attemptLoginUntilSuccessful(anyLong(),
anyLong());
- verify(client, timeout(5000).times(1)).subscribe(consumer,
ReplayPreset.LATEST, null, 0, true);
- verify(client, timeout(5000).times(1)).subscribe(consumer,
ReplayPreset.CUSTOM, "MTIz", 0, false);
+ verify(client, timeout(5000).times(1)).subscribe(consumer,
ReplayPreset.LATEST, null, true);
+ verify(client, timeout(5000).times(1)).subscribe(consumer,
ReplayPreset.CUSTOM, "MTIz", true);
}
@Test
@@ -94,10 +102,10 @@ public class PubSubApiTest {
port, 1000, 10000, true));
client.setUsePlainTextConnection(true);
client.start();
- client.subscribe(consumer, ReplayPreset.CUSTOM, "initial", 0, false);
+ client.subscribe(consumer, ReplayPreset.CUSTOM, "initial", true);
verify(session, timeout(5000)).attemptLoginUntilSuccessful(anyLong(),
anyLong());
- verify(client, timeout(5000).times(2)).subscribe(consumer,
ReplayPreset.CUSTOM, "initial", 0, false);
+ verify(client, timeout(5000).times(2)).subscribe(consumer,
ReplayPreset.CUSTOM, "initial", true);
}
@Test
@@ -123,12 +131,90 @@ public class PubSubApiTest {
port, 1000, 10000, true));
client.setUsePlainTextConnection(true);
client.start();
- client.subscribe(consumer, ReplayPreset.LATEST, null, 0, false);
+ client.subscribe(consumer, ReplayPreset.LATEST, null, true);
Thread.sleep(1000);
verify(session, timeout(5000)).attemptLoginUntilSuccessful(anyLong(),
anyLong());
- verify(client, timeout(5000).times(2)).subscribe(consumer,
ReplayPreset.LATEST, null, 0, false);
+ verify(client, timeout(5000).times(2)).subscribe(consumer,
ReplayPreset.LATEST, null, true);
+ }
+
+ @Test
+ public void testFallbackToLatestReplayIdWhenReplayIdIsCorrupted() throws
Exception {
+ final SalesforceSession session = mock(SalesforceSession.class);
+ when(session.getAccessToken()).thenReturn("faketoken");
+ when(session.getInstanceUrl()).thenReturn("https://myinstance");
+ when(session.getOrgId()).thenReturn("00D123123123");
+
+ final PubSubApiConsumer consumer = mock(PubSubApiConsumer.class);
+ when(consumer.getTopic()).thenReturn("/event/FakeTopic");
+ when(consumer.getBatchSize()).thenReturn(100);
+
+ int port = getPort();
+ LOG.debug("Starting server on port {}", port);
+ final Server grpcServer = ServerBuilder.forPort(port)
+ .addService(new SendInvalidReplayIdErrorPubSubServer(1))
+ .build();
+ grpcServer.start();
+
+ PubSubApiClient client = Mockito.spy(new PubSubApiClient(
+ session, new SalesforceLoginConfig(), "localhost",
+ port, 1000, 10000, true));
+ client.setUsePlainTextConnection(true);
+ client.start();
+ String replayId = encodeReplayId("123");
+ client.subscribe(consumer, ReplayPreset.CUSTOM, replayId, true);
+
+ Thread.sleep(1000);
+
+ InOrder inOrder = Mockito.inOrder(client);
+ inOrder.verify(client, timeout(5000)).subscribe(consumer,
ReplayPreset.CUSTOM, replayId, true);
+ inOrder.verify(client, timeout(5000)).subscribe(consumer,
ReplayPreset.LATEST, null, true);
+ }
+
+ @Test
+ public void
testInvokesExceptionHandlerWhenReplayIdIsCorruptedAndFallbackToLatestReplayIdIsDisabled()
throws Exception {
+ final SalesforceSession session = mock(SalesforceSession.class);
+ when(session.getAccessToken()).thenReturn("faketoken");
+ when(session.getInstanceUrl()).thenReturn("https://myinstance");
+ when(session.getOrgId()).thenReturn("00D123123123");
+
+ final ExceptionHandler exceptionHandler = mock(ExceptionHandler.class);
+ final PubSubApiConsumer consumer = mock(PubSubApiConsumer.class);
+ when(consumer.getTopic()).thenReturn("/event/FakeTopic");
+ when(consumer.getBatchSize()).thenReturn(100);
+ when(consumer.getExceptionHandler()).thenReturn(exceptionHandler);
+
+ int port = getPort();
+ LOG.debug("Starting server on port {}", port);
+ final Server grpcServer = ServerBuilder.forPort(port)
+ .addService(new SendInvalidReplayIdErrorPubSubServer(3))
+ .build();
+ grpcServer.start();
+
+ PubSubApiClient client = Mockito.spy(new PubSubApiClient(
+ session, new SalesforceLoginConfig(), "localhost",
+ port, 1000, 10000, true));
+ client.setUsePlainTextConnection(true);
+ client.start();
+ final String replayId = encodeReplayId("123");
+ client.subscribe(consumer, ReplayPreset.CUSTOM, replayId, false);
+
+ Thread.sleep(1000);
+
+ InOrder inOrder = Mockito.inOrder(client);
+ inOrder.verify(client, timeout(5000).times(3)).subscribe(consumer,
ReplayPreset.CUSTOM, replayId, false);
+ inOrder.verify(client, never()).subscribe(consumer,
ReplayPreset.LATEST, null, false);
+
+ ArgumentCaptor<InvalidReplayIdException> captor =
ArgumentCaptor.forClass(InvalidReplayIdException.class);
+ verify(exceptionHandler,
timeout(5000).times(3)).handleException(captor.capture());
+ for (InvalidReplayIdException exception : captor.getAllValues()) {
+ assertEquals(replayId, exception.getReplayId());
+ }
+ }
+
+ private String encodeReplayId(String replayId) {
+ return
Base64.getEncoder().encodeToString(ByteString.copyFromUtf8(replayId).toByteArray());
}
@Test
@@ -154,7 +240,7 @@ public class PubSubApiTest {
port, 1000, 10000, true);
client.setUsePlainTextConnection(true);
client.start();
- client.subscribe(consumer, ReplayPreset.LATEST, null, 0, true);
+ client.subscribe(consumer, ReplayPreset.LATEST, null, true);
verify(session, timeout(5000)).attemptLoginUntilSuccessful(anyLong(),
anyLong());
}
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/pubsub/SendInvalidReplayIdErrorPubSubServer.java
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/pubsub/SendInvalidReplayIdErrorPubSubServer.java
new file mode 100644
index 00000000000..a96c5304249
--- /dev/null
+++
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/pubsub/SendInvalidReplayIdErrorPubSubServer.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.pubsub;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+import com.google.protobuf.ByteString;
+import com.salesforce.eventbus.protobuf.FetchRequest;
+import com.salesforce.eventbus.protobuf.FetchResponse;
+import com.salesforce.eventbus.protobuf.PubSubGrpc;
+import io.grpc.Metadata;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+
+import static
org.apache.camel.component.salesforce.internal.client.PubSubApiClient.PUBSUB_ERROR_CORRUPTED_REPLAY_ID;
+
+public class SendInvalidReplayIdErrorPubSubServer extends
PubSubGrpc.PubSubImplBase {
+
+ private int count = 0;
+ private int numberOfInvalidIdReplies;
+
+ public SendInvalidReplayIdErrorPubSubServer(int numberOfInvalidIdReplies) {
+ this.numberOfInvalidIdReplies = numberOfInvalidIdReplies;
+ }
+
+ @Override
+ public StreamObserver<FetchRequest>
subscribe(StreamObserver<FetchResponse> client) {
+
+ return new StreamObserver<>() {
+ @Override
+ public void onNext(FetchRequest request) {
+ count++;
+ if (count <= numberOfInvalidIdReplies &&
ByteString.copyFromUtf8("123").equals(request.getReplayId())) {
+ TimerTask task = new TimerTask() {
+ public void run() {
+ StatusRuntimeException e = new
StatusRuntimeException(Status.UNAUTHENTICATED, new Metadata());
+ e.getTrailers().put(Metadata.Key.of("error-code",
Metadata.ASCII_STRING_MARSHALLER),
+ PUBSUB_ERROR_CORRUPTED_REPLAY_ID);
+ client.onError(e);
+ }
+ };
+ schedule(task);
+ return;
+ }
+ TimerTask task = new TimerTask() {
+ public void run() {
+ FetchResponse response = FetchResponse.newBuilder()
+
.setLatestReplayId(ByteString.copyFromUtf8("456"))
+ .build();
+ client.onNext(response);
+ }
+ };
+ schedule(task);
+ }
+
+ private void schedule(TimerTask task) {
+ Timer timer = new Timer("Timer");
+ long delay = 1000L;
+ timer.schedule(task, delay);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+
+ }
+
+ @Override
+ public void onCompleted() {
+
+ }
+ };
+ }
+}