This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 0b6fa7794de Salesforce component: removed initial replay id check 
(#18396)
0b6fa7794de is described below

commit 0b6fa7794def4bcc7f35214454794fe9cd4e1427
Author: Lorenzo Benvenuti <[email protected]>
AuthorDate: Thu Jul 3 19:07:10 2025 +0200

    Salesforce component: removed initial replay id check (#18396)
    
    * CAMEL-22173 - camel-salesforce: avoid performing a dedicated call to 
check replay id in pub/sub consumer
    
    * CAMEL-22173 - camel-salesforce: addressed PR comments
    
    ---------
    
    Co-authored-by: Lorenzo Benvenuti <[email protected]>
---
 .../camel/catalog/components/salesforce.json       |  41 +++++----
 .../salesforce/SalesforceEndpointConfigurer.java   |   6 ++
 .../salesforce/SalesforceEndpointUriFactory.java   |   3 +-
 .../camel/component/salesforce/salesforce.json     |  41 +++++----
 .../salesforce/InvalidReplayIdException.java       |  32 +++++++
 .../component/salesforce/PubSubApiConsumer.java    |   6 +-
 .../component/salesforce/SalesforceComponent.java  |   1 +
 .../component/salesforce/SalesforceEndpoint.java   |  15 +++
 .../internal/client/PubSubApiClient.java           |  94 +++++--------------
 .../component/salesforce/PubSubApiManualIT.java    |  83 +++++++++++++++++
 .../camel/component/salesforce/PubSubApiTest.java  | 102 +++++++++++++++++++--
 .../SendInvalidReplayIdErrorPubSubServer.java      |  89 ++++++++++++++++++
 12 files changed, 392 insertions(+), 121 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/salesforce.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/salesforce.json
index 3c0c4859bc7..5c338d7e0f1 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/salesforce.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/salesforce.json
@@ -202,25 +202,26 @@
     "sObjectSearch": { "index": 45, "kind": "parameter", "displayName": 
"SObject Search", "group": "common", "label": "", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Salesforce SOSL search 
string" },
     "streamQueryResult": { "index": 46, "kind": "parameter", "displayName": 
"Stream query result", "group": "common", "label": "", "required": false, 
"type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "false", 
"configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "If true, streams SOQL 
query result and transparently handles sub [...]
     "updateTopic": { "index": 47, "kind": "parameter", "displayName": "Update 
Topic", "group": "common", "label": "", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "autowired": false, "secret": 
false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Whether to update an 
existing Push Topic when using the Streaming API, defaults to false" },
-    "pubSubBatchSize": { "index": 48, "kind": "parameter", "displayName": "Pub 
Sub Batch Size", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 100, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Max number of events to 
receive in a batch from the Pub\/Sub API." },
-    "pubSubDeserializeType": { "index": 49, "kind": "parameter", 
"displayName": "Pub Sub Deserialize Type", "group": "consumer", "label": 
"consumer", "required": false, "type": "object", "javaType": 
"org.apache.camel.component.salesforce.PubSubDeserializeType", "enum": [ 
"AVRO", "SPECIFIC_RECORD", "GENERIC_RECORD", "POJO", "JSON" ], "deprecated": 
false, "autowired": false, "secret": false, "defaultValue": "AVRO", 
"configurationClass": "org.apache.camel.component.salesforce.SalesforceEndp 
[...]
-    "pubSubPojoClass": { "index": 50, "kind": "parameter", "displayName": "Pub 
Sub Pojo Class", "group": "consumer", "label": "consumer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Fully qualified class 
name to deserialize Pub\/Sub API event to." },
-    "pubSubReplayId": { "index": 51, "kind": "parameter", "displayName": "Pub 
Sub Replay Id", "group": "consumer", "label": "consumer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "The replayId value to use 
when subscribing to the Pub\/Sub API." },
-    "replayId": { "index": 52, "kind": "parameter", "displayName": "Replay 
Id", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "java.lang.Long", "deprecated": false, "autowired": 
false, "secret": false, "description": "The replayId value to use when 
subscribing to the Streaming API." },
-    "replayPreset": { "index": 53, "kind": "parameter", "displayName": "Replay 
Preset", "group": "consumer", "label": "consumer", "required": false, "type": 
"object", "javaType": "com.salesforce.eventbus.protobuf.ReplayPreset", "enum": 
[ "LATEST", "EARLIEST", "CUSTOM" ], "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "LATEST", "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "descript [...]
-    "bridgeErrorHandler": { "index": 54, "kind": "parameter", "displayName": 
"Bridge Error Handler", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Allows for bridging the consumer to the 
Camel routing Error Handler, which mean any exceptions (if possible) occurred 
while the Camel consumer is trying to pickup incoming [...]
-    "exceptionHandler": { "index": 55, "kind": "parameter", "displayName": 
"Exception Handler", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", 
"deprecated": false, "autowired": false, "secret": false, "description": "To 
let the consumer use a custom ExceptionHandler. Notice if the option 
bridgeErrorHandler is enabled then this option is not in use. By de [...]
-    "exchangePattern": { "index": 56, "kind": "parameter", "displayName": 
"Exchange Pattern", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], 
"deprecated": false, "autowired": false, "secret": false, "description": "Sets 
the exchange pattern when the consumer creates an exchange." },
-    "allOrNone": { "index": 57, "kind": "parameter", "displayName": "All Or 
None", "group": "producer", "label": "producer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Composite API option to 
indicate to rollback all records if any are not successful." },
-    "apexUrl": { "index": 58, "kind": "parameter", "displayName": "Apex Url", 
"group": "producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "APEX method URL" },
-    "compositeMethod": { "index": 59, "kind": "parameter", "displayName": 
"Composite Method", "group": "producer", "label": "producer", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Composite (raw) method." 
},
-    "eventName": { "index": 60, "kind": "parameter", "displayName": "Event 
Name", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Name of Platform Event, 
Change Data Capture Event, custom event, etc." },
-    "eventSchemaFormat": { "index": 61, "kind": "parameter", "displayName": 
"Event Schema Format", "group": "producer", "label": "producer", "required": 
false, "type": "object", "javaType": 
"org.apache.camel.component.salesforce.internal.dto.EventSchemaFormatEnum", 
"enum": [ "EXPANDED", "COMPACT" ], "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "descr [...]
-    "eventSchemaId": { "index": 62, "kind": "parameter", "displayName": "Event 
Schema Id", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "The ID of the event 
schema." },
-    "rawHttpHeaders": { "index": 63, "kind": "parameter", "displayName": "Raw 
Http Headers", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Comma separated list of 
message headers to include as HTTP parameters for Raw operation." },
-    "rawMethod": { "index": 64, "kind": "parameter", "displayName": "Raw 
Method", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "HTTP method to use for 
the Raw operation" },
-    "rawPath": { "index": 65, "kind": "parameter", "displayName": "Raw Path", 
"group": "producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "The portion of the 
endpoint URL after the domain name. E.g., 
'\/services\/data\/v52.0\/sobjects\/Accou [...]
-    "rawQueryParameters": { "index": 66, "kind": "parameter", "displayName": 
"Raw Query Parameters", "group": "producer", "label": "producer", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Comma separated list of 
message headers to include as query parameters for Raw  [...]
-    "lazyStartProducer": { "index": 67, "kind": "parameter", "displayName": 
"Lazy Start Producer", "group": "producer (advanced)", "label": 
"producer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether the producer should be started 
lazy (on the first message). By starting lazy you can use this to allow 
CamelContext and routes to startup in situations where a produ [...]
+    "fallbackToLatestReplayId": { "index": 48, "kind": "parameter", 
"displayName": "Fallback To Latest Replay Id", "group": "consumer", "label": 
"consumer", "required": false, "type": "boolean", "javaType": "boolean", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
false, "description": "Whether the pub\/sub consumer needs to fallback to the 
latest replay id when the provided id is not valid. If set to false, the 
component will keep retrying; in order to treat t [...]
+    "pubSubBatchSize": { "index": 49, "kind": "parameter", "displayName": "Pub 
Sub Batch Size", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 100, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Max number of events to 
receive in a batch from the Pub\/Sub API." },
+    "pubSubDeserializeType": { "index": 50, "kind": "parameter", 
"displayName": "Pub Sub Deserialize Type", "group": "consumer", "label": 
"consumer", "required": false, "type": "object", "javaType": 
"org.apache.camel.component.salesforce.PubSubDeserializeType", "enum": [ 
"AVRO", "SPECIFIC_RECORD", "GENERIC_RECORD", "POJO", "JSON" ], "deprecated": 
false, "autowired": false, "secret": false, "defaultValue": "AVRO", 
"configurationClass": "org.apache.camel.component.salesforce.SalesforceEndp 
[...]
+    "pubSubPojoClass": { "index": 51, "kind": "parameter", "displayName": "Pub 
Sub Pojo Class", "group": "consumer", "label": "consumer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Fully qualified class 
name to deserialize Pub\/Sub API event to." },
+    "pubSubReplayId": { "index": 52, "kind": "parameter", "displayName": "Pub 
Sub Replay Id", "group": "consumer", "label": "consumer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "The replayId value to use 
when subscribing to the Pub\/Sub API." },
+    "replayId": { "index": 53, "kind": "parameter", "displayName": "Replay 
Id", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "java.lang.Long", "deprecated": false, "autowired": 
false, "secret": false, "description": "The replayId value to use when 
subscribing to the Streaming API." },
+    "replayPreset": { "index": 54, "kind": "parameter", "displayName": "Replay 
Preset", "group": "consumer", "label": "consumer", "required": false, "type": 
"object", "javaType": "com.salesforce.eventbus.protobuf.ReplayPreset", "enum": 
[ "LATEST", "EARLIEST", "CUSTOM" ], "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "LATEST", "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "descript [...]
+    "bridgeErrorHandler": { "index": 55, "kind": "parameter", "displayName": 
"Bridge Error Handler", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Allows for bridging the consumer to the 
Camel routing Error Handler, which mean any exceptions (if possible) occurred 
while the Camel consumer is trying to pickup incoming [...]
+    "exceptionHandler": { "index": 56, "kind": "parameter", "displayName": 
"Exception Handler", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", 
"deprecated": false, "autowired": false, "secret": false, "description": "To 
let the consumer use a custom ExceptionHandler. Notice if the option 
bridgeErrorHandler is enabled then this option is not in use. By de [...]
+    "exchangePattern": { "index": 57, "kind": "parameter", "displayName": 
"Exchange Pattern", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], 
"deprecated": false, "autowired": false, "secret": false, "description": "Sets 
the exchange pattern when the consumer creates an exchange." },
+    "allOrNone": { "index": 58, "kind": "parameter", "displayName": "All Or 
None", "group": "producer", "label": "producer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Composite API option to 
indicate to rollback all records if any are not successful." },
+    "apexUrl": { "index": 59, "kind": "parameter", "displayName": "Apex Url", 
"group": "producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "APEX method URL" },
+    "compositeMethod": { "index": 60, "kind": "parameter", "displayName": 
"Composite Method", "group": "producer", "label": "producer", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Composite (raw) method." 
},
+    "eventName": { "index": 61, "kind": "parameter", "displayName": "Event 
Name", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Name of Platform Event, 
Change Data Capture Event, custom event, etc." },
+    "eventSchemaFormat": { "index": 62, "kind": "parameter", "displayName": 
"Event Schema Format", "group": "producer", "label": "producer", "required": 
false, "type": "object", "javaType": 
"org.apache.camel.component.salesforce.internal.dto.EventSchemaFormatEnum", 
"enum": [ "EXPANDED", "COMPACT" ], "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "descr [...]
+    "eventSchemaId": { "index": 63, "kind": "parameter", "displayName": "Event 
Schema Id", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "The ID of the event 
schema." },
+    "rawHttpHeaders": { "index": 64, "kind": "parameter", "displayName": "Raw 
Http Headers", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Comma separated list of 
message headers to include as HTTP parameters for Raw operation." },
+    "rawMethod": { "index": 65, "kind": "parameter", "displayName": "Raw 
Method", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "HTTP method to use for 
the Raw operation" },
+    "rawPath": { "index": 66, "kind": "parameter", "displayName": "Raw Path", 
"group": "producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "The portion of the 
endpoint URL after the domain name. E.g., 
'\/services\/data\/v52.0\/sobjects\/Accou [...]
+    "rawQueryParameters": { "index": 67, "kind": "parameter", "displayName": 
"Raw Query Parameters", "group": "producer", "label": "producer", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Comma separated list of 
message headers to include as query parameters for Raw  [...]
+    "lazyStartProducer": { "index": 68, "kind": "parameter", "displayName": 
"Lazy Start Producer", "group": "producer (advanced)", "label": 
"producer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether the producer should be started 
lazy (on the first message). By starting lazy you can use this to allow 
CamelContext and routes to startup in situations where a produ [...]
   }
 }
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointConfigurer.java
 
b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointConfigurer.java
index 1a636d7886b..ee9dcea1f95 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointConfigurer.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointConfigurer.java
@@ -57,6 +57,8 @@ public class SalesforceEndpointConfigurer extends 
PropertyConfigurerSupport impl
         case "exchangePattern": 
target.setExchangePattern(property(camelContext, 
org.apache.camel.ExchangePattern.class, value)); return true;
         case "fallbackreplayid":
         case "fallBackReplayId": 
target.getConfiguration().setFallBackReplayId(property(camelContext, 
java.lang.Long.class, value)); return true;
+        case "fallbacktolatestreplayid":
+        case "fallbackToLatestReplayId": 
target.setFallbackToLatestReplayId(property(camelContext, boolean.class, 
value)); return true;
         case "format": 
target.getConfiguration().setFormat(property(camelContext, 
org.apache.camel.component.salesforce.internal.PayloadFormat.class, value)); 
return true;
         case "httpclient":
         case "httpClient": 
target.getConfiguration().setHttpClient(property(camelContext, 
org.apache.camel.component.salesforce.SalesforceHttpClient.class, value)); 
return true;
@@ -193,6 +195,8 @@ public class SalesforceEndpointConfigurer extends 
PropertyConfigurerSupport impl
         case "exchangePattern": return org.apache.camel.ExchangePattern.class;
         case "fallbackreplayid":
         case "fallBackReplayId": return java.lang.Long.class;
+        case "fallbacktolatestreplayid":
+        case "fallbackToLatestReplayId": return boolean.class;
         case "format": return 
org.apache.camel.component.salesforce.internal.PayloadFormat.class;
         case "httpclient":
         case "httpClient": return 
org.apache.camel.component.salesforce.SalesforceHttpClient.class;
@@ -330,6 +334,8 @@ public class SalesforceEndpointConfigurer extends 
PropertyConfigurerSupport impl
         case "exchangePattern": return target.getExchangePattern();
         case "fallbackreplayid":
         case "fallBackReplayId": return 
target.getConfiguration().getFallBackReplayId();
+        case "fallbacktolatestreplayid":
+        case "fallbackToLatestReplayId": return 
target.isFallbackToLatestReplayId();
         case "format": return target.getConfiguration().getFormat();
         case "httpclient":
         case "httpClient": return target.getConfiguration().getHttpClient();
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointUriFactory.java
 
b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointUriFactory.java
index c347be8472e..a9d53e081ae 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointUriFactory.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointUriFactory.java
@@ -23,7 +23,7 @@ public class SalesforceEndpointUriFactory extends 
org.apache.camel.support.compo
     private static final Set<String> SECRET_PROPERTY_NAMES;
     private static final Set<String> MULTI_VALUE_PREFIXES;
     static {
-        Set<String> props = new HashSet<>(68);
+        Set<String> props = new HashSet<>(69);
         props.add("allOrNone");
         props.add("apexMethod");
         props.add("apexQueryParams");
@@ -41,6 +41,7 @@ public class SalesforceEndpointUriFactory extends 
org.apache.camel.support.compo
         props.add("exceptionHandler");
         props.add("exchangePattern");
         props.add("fallBackReplayId");
+        props.add("fallbackToLatestReplayId");
         props.add("format");
         props.add("httpClient");
         props.add("includeDetails");
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/generated/resources/META-INF/org/apache/camel/component/salesforce/salesforce.json
 
b/components/camel-salesforce/camel-salesforce-component/src/generated/resources/META-INF/org/apache/camel/component/salesforce/salesforce.json
index 3c0c4859bc7..5c338d7e0f1 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/generated/resources/META-INF/org/apache/camel/component/salesforce/salesforce.json
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/generated/resources/META-INF/org/apache/camel/component/salesforce/salesforce.json
@@ -202,25 +202,26 @@
     "sObjectSearch": { "index": 45, "kind": "parameter", "displayName": 
"SObject Search", "group": "common", "label": "", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Salesforce SOSL search 
string" },
     "streamQueryResult": { "index": 46, "kind": "parameter", "displayName": 
"Stream query result", "group": "common", "label": "", "required": false, 
"type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "false", 
"configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "If true, streams SOQL 
query result and transparently handles sub [...]
     "updateTopic": { "index": 47, "kind": "parameter", "displayName": "Update 
Topic", "group": "common", "label": "", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "autowired": false, "secret": 
false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Whether to update an 
existing Push Topic when using the Streaming API, defaults to false" },
-    "pubSubBatchSize": { "index": 48, "kind": "parameter", "displayName": "Pub 
Sub Batch Size", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 100, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Max number of events to 
receive in a batch from the Pub\/Sub API." },
-    "pubSubDeserializeType": { "index": 49, "kind": "parameter", 
"displayName": "Pub Sub Deserialize Type", "group": "consumer", "label": 
"consumer", "required": false, "type": "object", "javaType": 
"org.apache.camel.component.salesforce.PubSubDeserializeType", "enum": [ 
"AVRO", "SPECIFIC_RECORD", "GENERIC_RECORD", "POJO", "JSON" ], "deprecated": 
false, "autowired": false, "secret": false, "defaultValue": "AVRO", 
"configurationClass": "org.apache.camel.component.salesforce.SalesforceEndp 
[...]
-    "pubSubPojoClass": { "index": 50, "kind": "parameter", "displayName": "Pub 
Sub Pojo Class", "group": "consumer", "label": "consumer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Fully qualified class 
name to deserialize Pub\/Sub API event to." },
-    "pubSubReplayId": { "index": 51, "kind": "parameter", "displayName": "Pub 
Sub Replay Id", "group": "consumer", "label": "consumer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "The replayId value to use 
when subscribing to the Pub\/Sub API." },
-    "replayId": { "index": 52, "kind": "parameter", "displayName": "Replay 
Id", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "java.lang.Long", "deprecated": false, "autowired": 
false, "secret": false, "description": "The replayId value to use when 
subscribing to the Streaming API." },
-    "replayPreset": { "index": 53, "kind": "parameter", "displayName": "Replay 
Preset", "group": "consumer", "label": "consumer", "required": false, "type": 
"object", "javaType": "com.salesforce.eventbus.protobuf.ReplayPreset", "enum": 
[ "LATEST", "EARLIEST", "CUSTOM" ], "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "LATEST", "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "descript [...]
-    "bridgeErrorHandler": { "index": 54, "kind": "parameter", "displayName": 
"Bridge Error Handler", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Allows for bridging the consumer to the 
Camel routing Error Handler, which mean any exceptions (if possible) occurred 
while the Camel consumer is trying to pickup incoming [...]
-    "exceptionHandler": { "index": 55, "kind": "parameter", "displayName": 
"Exception Handler", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", 
"deprecated": false, "autowired": false, "secret": false, "description": "To 
let the consumer use a custom ExceptionHandler. Notice if the option 
bridgeErrorHandler is enabled then this option is not in use. By de [...]
-    "exchangePattern": { "index": 56, "kind": "parameter", "displayName": 
"Exchange Pattern", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], 
"deprecated": false, "autowired": false, "secret": false, "description": "Sets 
the exchange pattern when the consumer creates an exchange." },
-    "allOrNone": { "index": 57, "kind": "parameter", "displayName": "All Or 
None", "group": "producer", "label": "producer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Composite API option to 
indicate to rollback all records if any are not successful." },
-    "apexUrl": { "index": 58, "kind": "parameter", "displayName": "Apex Url", 
"group": "producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "APEX method URL" },
-    "compositeMethod": { "index": 59, "kind": "parameter", "displayName": 
"Composite Method", "group": "producer", "label": "producer", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Composite (raw) method." 
},
-    "eventName": { "index": 60, "kind": "parameter", "displayName": "Event 
Name", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Name of Platform Event, 
Change Data Capture Event, custom event, etc." },
-    "eventSchemaFormat": { "index": 61, "kind": "parameter", "displayName": 
"Event Schema Format", "group": "producer", "label": "producer", "required": 
false, "type": "object", "javaType": 
"org.apache.camel.component.salesforce.internal.dto.EventSchemaFormatEnum", 
"enum": [ "EXPANDED", "COMPACT" ], "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "descr [...]
-    "eventSchemaId": { "index": 62, "kind": "parameter", "displayName": "Event 
Schema Id", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "The ID of the event 
schema." },
-    "rawHttpHeaders": { "index": 63, "kind": "parameter", "displayName": "Raw 
Http Headers", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Comma separated list of 
message headers to include as HTTP parameters for Raw operation." },
-    "rawMethod": { "index": 64, "kind": "parameter", "displayName": "Raw 
Method", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "HTTP method to use for 
the Raw operation" },
-    "rawPath": { "index": 65, "kind": "parameter", "displayName": "Raw Path", 
"group": "producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "The portion of the 
endpoint URL after the domain name. E.g., 
'\/services\/data\/v52.0\/sobjects\/Accou [...]
-    "rawQueryParameters": { "index": 66, "kind": "parameter", "displayName": 
"Raw Query Parameters", "group": "producer", "label": "producer", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Comma separated list of 
message headers to include as query parameters for Raw  [...]
-    "lazyStartProducer": { "index": 67, "kind": "parameter", "displayName": 
"Lazy Start Producer", "group": "producer (advanced)", "label": 
"producer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether the producer should be started 
lazy (on the first message). By starting lazy you can use this to allow 
CamelContext and routes to startup in situations where a produ [...]
+    "fallbackToLatestReplayId": { "index": 48, "kind": "parameter", 
"displayName": "Fallback To Latest Replay Id", "group": "consumer", "label": 
"consumer", "required": false, "type": "boolean", "javaType": "boolean", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
false, "description": "Whether the pub\/sub consumer needs to fallback to the 
latest replay id when the provided id is not valid. If set to false, the 
component will keep retrying; in order to treat t [...]
+    "pubSubBatchSize": { "index": 49, "kind": "parameter", "displayName": "Pub 
Sub Batch Size", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 100, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Max number of events to 
receive in a batch from the Pub\/Sub API." },
+    "pubSubDeserializeType": { "index": 50, "kind": "parameter", 
"displayName": "Pub Sub Deserialize Type", "group": "consumer", "label": 
"consumer", "required": false, "type": "object", "javaType": 
"org.apache.camel.component.salesforce.PubSubDeserializeType", "enum": [ 
"AVRO", "SPECIFIC_RECORD", "GENERIC_RECORD", "POJO", "JSON" ], "deprecated": 
false, "autowired": false, "secret": false, "defaultValue": "AVRO", 
"configurationClass": "org.apache.camel.component.salesforce.SalesforceEndp 
[...]
+    "pubSubPojoClass": { "index": 51, "kind": "parameter", "displayName": "Pub 
Sub Pojo Class", "group": "consumer", "label": "consumer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Fully qualified class 
name to deserialize Pub\/Sub API event to." },
+    "pubSubReplayId": { "index": 52, "kind": "parameter", "displayName": "Pub 
Sub Replay Id", "group": "consumer", "label": "consumer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "The replayId value to use 
when subscribing to the Pub\/Sub API." },
+    "replayId": { "index": 53, "kind": "parameter", "displayName": "Replay 
Id", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "java.lang.Long", "deprecated": false, "autowired": 
false, "secret": false, "description": "The replayId value to use when 
subscribing to the Streaming API." },
+    "replayPreset": { "index": 54, "kind": "parameter", "displayName": "Replay 
Preset", "group": "consumer", "label": "consumer", "required": false, "type": 
"object", "javaType": "com.salesforce.eventbus.protobuf.ReplayPreset", "enum": 
[ "LATEST", "EARLIEST", "CUSTOM" ], "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "LATEST", "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "descript [...]
+    "bridgeErrorHandler": { "index": 55, "kind": "parameter", "displayName": 
"Bridge Error Handler", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Allows for bridging the consumer to the 
Camel routing Error Handler, which mean any exceptions (if possible) occurred 
while the Camel consumer is trying to pickup incoming [...]
+    "exceptionHandler": { "index": 56, "kind": "parameter", "displayName": 
"Exception Handler", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", 
"deprecated": false, "autowired": false, "secret": false, "description": "To 
let the consumer use a custom ExceptionHandler. Notice if the option 
bridgeErrorHandler is enabled then this option is not in use. By de [...]
+    "exchangePattern": { "index": 57, "kind": "parameter", "displayName": 
"Exchange Pattern", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], 
"deprecated": false, "autowired": false, "secret": false, "description": "Sets 
the exchange pattern when the consumer creates an exchange." },
+    "allOrNone": { "index": 58, "kind": "parameter", "displayName": "All Or 
None", "group": "producer", "label": "producer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Composite API option to 
indicate to rollback all records if any are not successful." },
+    "apexUrl": { "index": 59, "kind": "parameter", "displayName": "Apex Url", 
"group": "producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "APEX method URL" },
+    "compositeMethod": { "index": 60, "kind": "parameter", "displayName": 
"Composite Method", "group": "producer", "label": "producer", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Composite (raw) method." 
},
+    "eventName": { "index": 61, "kind": "parameter", "displayName": "Event 
Name", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Name of Platform Event, 
Change Data Capture Event, custom event, etc." },
+    "eventSchemaFormat": { "index": 62, "kind": "parameter", "displayName": 
"Event Schema Format", "group": "producer", "label": "producer", "required": 
false, "type": "object", "javaType": 
"org.apache.camel.component.salesforce.internal.dto.EventSchemaFormatEnum", 
"enum": [ "EXPANDED", "COMPACT" ], "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "descr [...]
+    "eventSchemaId": { "index": 63, "kind": "parameter", "displayName": "Event 
Schema Id", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "The ID of the event 
schema." },
+    "rawHttpHeaders": { "index": 64, "kind": "parameter", "displayName": "Raw 
Http Headers", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Comma separated list of 
message headers to include as HTTP parameters for Raw operation." },
+    "rawMethod": { "index": 65, "kind": "parameter", "displayName": "Raw 
Method", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "HTTP method to use for 
the Raw operation" },
+    "rawPath": { "index": 66, "kind": "parameter", "displayName": "Raw Path", 
"group": "producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "The portion of the 
endpoint URL after the domain name. E.g., 
'\/services\/data\/v52.0\/sobjects\/Accou [...]
+    "rawQueryParameters": { "index": 67, "kind": "parameter", "displayName": 
"Raw Query Parameters", "group": "producer", "label": "producer", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.salesforce.SalesforceEndpointConfig", 
"configurationField": "configuration", "description": "Comma separated list of 
message headers to include as query parameters for Raw  [...]
+    "lazyStartProducer": { "index": 68, "kind": "parameter", "displayName": 
"Lazy Start Producer", "group": "producer (advanced)", "label": 
"producer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether the producer should be started 
lazy (on the first message). By starting lazy you can use this to allow 
CamelContext and routes to startup in situations where a produ [...]
   }
 }
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/InvalidReplayIdException.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/InvalidReplayIdException.java
new file mode 100644
index 00000000000..a41f41d9a64
--- /dev/null
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/InvalidReplayIdException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce;
+
+public class InvalidReplayIdException extends RuntimeException {
+
+    private final String replayId;
+
+    public InvalidReplayIdException(String message, String replayId) {
+        super(message);
+        this.replayId = replayId;
+    }
+
+    public String getReplayId() {
+        return replayId;
+    }
+
+}
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java
index c6a4b1c639d..5b34620aaed 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java
@@ -37,7 +37,7 @@ public class PubSubApiConsumer extends DefaultConsumer {
     private final String topic;
     private final ReplayPreset initialReplayPreset;
     private String initialReplayId;
-    private int initialReplayIdTimeout;
+    private boolean fallbackToLatestReplayId;
     private final SalesforceEndpoint endpoint;
 
     private final int batchSize;
@@ -54,7 +54,7 @@ public class PubSubApiConsumer extends DefaultConsumer {
         this.topic = endpoint.getTopicName();
         this.initialReplayPreset = 
endpoint.getConfiguration().getReplayPreset();
         this.initialReplayId = endpoint.getPubSubReplayId();
-        this.initialReplayIdTimeout = 
endpoint.getComponent().getInitialReplyIdTimeout();
+        this.fallbackToLatestReplayId = endpoint.isFallbackToLatestReplayId();
         if (initialReplayPreset == ReplayPreset.CUSTOM && initialReplayId == 
null) {
             throw new IllegalArgumentException("pubSubReplayId option is 
required if ReplayPreset is CUSTOM.");
         }
@@ -94,7 +94,7 @@ public class PubSubApiConsumer extends DefaultConsumer {
         
this.pubSubClient.setUsePlainTextConnection(this.usePlainTextConnection);
 
         ServiceHelper.startService(pubSubClient);
-        pubSubClient.subscribe(this, initialReplayPreset, initialReplayId, 
initialReplayIdTimeout, true);
+        pubSubClient.subscribe(this, initialReplayPreset, initialReplayId, 
fallbackToLatestReplayId);
     }
 
     @Override
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
index babc4a41f1d..cc9a105d83b 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
@@ -276,6 +276,7 @@ public class SalesforceComponent extends DefaultComponent 
implements SSLContextP
               javaType = "java.lang.String", label = "common")
     private String packages;
 
+    @Deprecated
     @Metadata(description = "Timeout in seconds to validate when a custom 
pubSubReplayId has been configured, when starting the Camel Salesforce 
consumer.",
               defaultValue = "30", label = "consumer,advanced")
     private int initialReplyIdTimeout = 30;
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
index bd35c02d620..b506da0f3c6 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
@@ -72,6 +72,13 @@ public class SalesforceEndpoint extends DefaultEndpoint {
     @UriParam(label = "consumer", description = "The replayId value to use 
when subscribing to the Pub/Sub API.")
     private String pubSubReplayId;
 
+    @UriParam(label = "consumer",
+              description = "Whether the pub/sub consumer needs to fallback to 
the latest replay id when the provided id is not valid. "
+                            + "If set to false, the component will keep 
retrying; in order to treat this as an exception you can "
+                            + "use BridgeExceptionHandlerToErrorHandler and 
handle the exception in the route.",
+              defaultValue = "false")
+    private boolean fallbackToLatestReplayId;
+
     public SalesforceEndpoint(String uri, SalesforceComponent 
salesforceComponent, SalesforceEndpointConfig configuration,
                               OperationName operationName, String topicName) {
         super(uri, salesforceComponent);
@@ -141,6 +148,14 @@ public class SalesforceEndpoint extends DefaultEndpoint {
         this.pubSubReplayId = pubSubReplayId;
     }
 
+    public boolean isFallbackToLatestReplayId() {
+        return fallbackToLatestReplayId;
+    }
+
+    public void setFallbackToLatestReplayId(boolean fallbackToLatestReplayId) {
+        this.fallbackToLatestReplayId = fallbackToLatestReplayId;
+    }
+
     @Override
     protected void doStart() throws Exception {
         try {
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
index 1845a190bef..461e24e3bb9 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
@@ -24,11 +24,8 @@ import java.util.Base64;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 
-import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
 import com.salesforce.eventbus.protobuf.ConsumerEvent;
 import com.salesforce.eventbus.protobuf.FetchRequest;
@@ -63,6 +60,7 @@ import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.specific.SpecificRecord;
+import org.apache.camel.component.salesforce.InvalidReplayIdException;
 import org.apache.camel.component.salesforce.PubSubApiConsumer;
 import org.apache.camel.component.salesforce.SalesforceLoginConfig;
 import org.apache.camel.component.salesforce.api.SalesforceException;
@@ -77,7 +75,7 @@ public class PubSubApiClient extends ServiceSupport {
 
     public static final String PUBSUB_ERROR_AUTH_ERROR = 
"sfdc.platform.eventbus.grpc.service.auth.error";
     private static final String PUBSUB_ERROR_AUTH_REFRESH_INVALID = 
"sfdc.platform.eventbus.grpc.service.auth.refresh.invalid";
-    private static final String PUBSUB_ERROR_CORRUPTED_REPLAY_ID
+    public static final String PUBSUB_ERROR_CORRUPTED_REPLAY_ID
             = 
"sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted";
 
     protected PubSubGrpc.PubSubStub asyncStub;
@@ -105,7 +103,7 @@ public class PubSubApiClient extends ServiceSupport {
 
     private ReplayPreset initialReplayPreset;
     private String initialReplayId;
-    private int initialReplayIdTimeout;
+    private boolean fallbackToLatestReplayId;
 
     public PubSubApiClient(SalesforceSession session, SalesforceLoginConfig 
loginConfig, String pubSubHost,
                            int pubSubPort, long backoffIncrement, long 
maxBackoff, boolean allowUseProxyServer) {
@@ -151,12 +149,11 @@ public class PubSubApiClient extends ServiceSupport {
     }
 
     public void subscribe(
-            PubSubApiConsumer consumer, ReplayPreset replayPreset, String 
initialReplayId, int initialReplayIdTimeout,
-            boolean initialSubscribe) {
+            PubSubApiConsumer consumer, ReplayPreset replayPreset, String 
initialReplayId, boolean fallbackToLatestReplayId) {
         LOG.debug("Starting subscribe {}", consumer.getTopic());
         this.initialReplayPreset = replayPreset;
         this.initialReplayId = initialReplayId;
-        this.initialReplayIdTimeout = initialReplayIdTimeout;
+        this.fallbackToLatestReplayId = fallbackToLatestReplayId;
         if (replayPreset == ReplayPreset.CUSTOM && initialReplayId == null) {
             throw new RuntimeException("initialReplayId is required for 
ReplayPreset.CUSTOM");
         }
@@ -165,9 +162,6 @@ public class PubSubApiClient extends ServiceSupport {
         ByteString replayId = null;
         if (initialReplayId != null) {
             replayId = base64DecodeToByteString(initialReplayId);
-            if (initialSubscribe) {
-                checkInitialReplayIdValidity(topic, replayId, 
initialReplayIdTimeout);
-            }
         }
         LOG.info("Subscribing to topic: {}.", topic);
         final FetchResponseObserver responseObserver = new 
FetchResponseObserver(consumer);
@@ -185,57 +179,6 @@ public class PubSubApiClient extends ServiceSupport {
         serverStream.onNext(fetchRequestBuilder.build());
     }
 
-    public void checkInitialReplayIdValidity(String topic, ByteString 
replayId, int initialReplayIdTimeout) {
-        LOG.info("Checking initialReplayId: {} for topic: {}", 
base64EncodeByteString(replayId), topic);
-
-        final AtomicReference<Throwable> error = new AtomicReference<>();
-        final CountDownLatch latch = new CountDownLatch(1);
-        final StreamObserver<FetchResponse> responseObserver = new 
StreamObserver<>() {
-
-            @Override
-            public void onNext(FetchResponse value) {
-                latch.countDown();
-            }
-
-            @Override
-            public void onError(Throwable t) {
-                if (t instanceof StatusRuntimeException e) {
-                    Metadata trailers = e.getTrailers();
-                    if (trailers != null && PUBSUB_ERROR_CORRUPTED_REPLAY_ID
-                            .equals(trailers.get(Metadata.Key.of("error-code", 
Metadata.ASCII_STRING_MARSHALLER)))) {
-                        error.set(t);
-                    }
-                }
-                latch.countDown();
-            }
-
-            @Override
-            public void onCompleted() {
-            }
-        };
-        StreamObserver<FetchRequest> serverStream = 
asyncStub.subscribe(responseObserver);
-        FetchRequest.Builder fetchRequestBuilder = FetchRequest.newBuilder()
-                .setReplayPreset(ReplayPreset.CUSTOM)
-                .setTopicName(topic)
-                .setNumRequested(1)
-                .setReplayId(replayId);
-        serverStream.onNext(fetchRequestBuilder.build());
-
-        try {
-            if (!Uninterruptibles.awaitUninterruptibly(latch, 
initialReplayIdTimeout, TimeUnit.SECONDS)) {
-                throw new RuntimeException("Timeout while checking 
initialReplayId.");
-            }
-        } finally {
-            serverStream.onCompleted();
-        }
-
-        if (error.get() != null) {
-            throw new RuntimeException(
-                    "initialReplayId " + base64EncodeByteString(replayId) + " 
is not valid",
-                    error.get());
-        }
-    }
-
     public TopicInfo getTopicInfo(String name) {
         return topicInfoCache.computeIfAbsent(name,
                 topic -> 
blockingStub.getTopic(TopicRequest.newBuilder().setTopicName(topic).build()));
@@ -408,10 +351,22 @@ public class PubSubApiClient extends ServiceSupport {
                             LOG.debug("logged in {}", consumer.getTopic());
                         }
                         case PUBSUB_ERROR_CORRUPTED_REPLAY_ID -> {
-                            LOG.error("replay id: " + replayId
-                                      + " is corrupt. Trying to recover by 
resubscribing with LATEST replay preset");
-                            replayId = null;
-                            initialReplayPreset = ReplayPreset.LATEST;
+                            String currReplayId = null;
+                            if (replayId != null) {
+                                currReplayId = replayId;
+                            } else if (initialReplayPreset == 
ReplayPreset.CUSTOM) {
+                                currReplayId = initialReplayId;
+                            }
+                            if (fallbackToLatestReplayId) {
+                                initialReplayPreset = ReplayPreset.LATEST;
+                                LOG.warn("replay id: " + currReplayId
+                                         + " is corrupt. Trying to recover by 
resubscribing with LATEST replay preset");
+                                replayId = null;
+                            } else {
+                                
consumer.getExceptionHandler().handleException(new InvalidReplayIdException(
+                                        "Corrupt replay id: " + currReplayId,
+                                        currReplayId));
+                            }
                         }
                         default -> LOG.error("unexpected errorCode: {}", 
errorCode);
                     }
@@ -431,12 +386,13 @@ public class PubSubApiClient extends ServiceSupport {
                 throw new RuntimeException(e);
             }
             if (replayId != null) {
-                subscribe(consumer, ReplayPreset.CUSTOM, replayId, 
initialReplayIdTimeout, false);
+                subscribe(consumer, ReplayPreset.CUSTOM, replayId, 
fallbackToLatestReplayId);
             } else {
                 if (initialReplayPreset == ReplayPreset.CUSTOM) {
-                    subscribe(consumer, initialReplayPreset, initialReplayId, 
initialReplayIdTimeout, false);
+                    subscribe(consumer, initialReplayPreset, initialReplayId,
+                            fallbackToLatestReplayId);
                 } else {
-                    subscribe(consumer, initialReplayPreset, null, 
initialReplayIdTimeout, false);
+                    subscribe(consumer, initialReplayPreset, null, 
fallbackToLatestReplayId);
                 }
             }
         }
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiManualIT.java
 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiManualIT.java
index bf98679d6e7..9cf527e912f 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiManualIT.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiManualIT.java
@@ -18,7 +18,10 @@ package org.apache.camel.component.salesforce;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Base64;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.protobuf.ByteString;
@@ -350,6 +353,86 @@ public class PubSubApiManualIT extends 
AbstractSalesforceTestBase {
         mock.assertIsSatisfied();
     }
 
+    @Test
+    public void canHandleInvalidReplayIdExceptions() throws Exception {
+        
context.getPropertiesComponent().addOverrideProperty("invalidPubSubReplayId", 
getInvalidReplayId());
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onException(InvalidReplayIdException.class)
+                        .handled(true)
+                        .to("mock:HandleInvalidReplayIdException");
+
+                from("salesforce:pubSubSubscribe:/event/CamelEventMessage__e" +
+                     "?replayPreset=CUSTOM" +
+                     "&pubSubReplayId={{invalidPubSubReplayId}}" +
+                     "&bridgeErrorHandler=true")
+                        .routeId("r.subscriberWithInvalidReplayId")
+                        .autoStartup(false)
+                        .to("mock:SubscriberWithInvalidReplayId");
+            }
+        });
+
+        MockEndpoint errorHandlerMock = 
getMockEndpoint("mock:HandleInvalidReplayIdException");
+        errorHandlerMock.expectedMessageCount(1);
+
+        MockEndpoint subscriberMock = 
getMockEndpoint("mock:SubscriberWithInvalidReplayId");
+        subscriberMock.expectedMessageCount(0);
+
+        
context.getRouteController().startRoute("r.subscriberWithInvalidReplayId");
+        final GenericRecord record = new GenericRecordBuilder(camelEventSchema)
+                .set("Message__c", "hello world")
+                .set("CreatedDate", System.currentTimeMillis() / 1000)
+                .set("CreatedById", "123")
+                .build();
+        template.requestBody("direct:publishCamelEventMessage", 
List.of(record));
+
+        errorHandlerMock.assertIsSatisfied();
+        subscriberMock.assertIsSatisfied();
+    }
+
+    @Test
+    public void fallsBackToLatestReplayId() throws Exception {
+        
context.getPropertiesComponent().addOverrideProperty("invalidPubSubReplayId", 
getInvalidReplayId());
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("salesforce:pubSubSubscribe:/event/CamelEventMessage__e" +
+                     "?replayPreset=CUSTOM" +
+                     "&pubSubReplayId={{invalidPubSubReplayId}}" +
+                     "&fallbackToLatestReplayId=true")
+                        .routeId("r.subscriberWithFallbackToLatestReplayId")
+                        .to("mock:SubscriberWithFallbackToLatestReplayId");
+            }
+        });
+
+        MockEndpoint mock = 
getMockEndpoint("mock:SubscriberWithFallbackToLatestReplayId");
+        mock.expectedMessageCount(1);
+
+        // Need to wait to ensure that the event is published _after_ the 
client has subscribed
+        Thread.sleep(3000);
+        final GenericRecord record = new GenericRecordBuilder(camelEventSchema)
+                .set("Message__c", "hello world")
+                .set("CreatedDate", System.currentTimeMillis() / 1000)
+                .set("CreatedById", "123")
+                .build();
+        template.requestBody("direct:publishCamelEventMessage", 
List.of(record));
+
+        mock.assertIsSatisfied();
+    }
+
+    private String getInvalidReplayId() {
+        // behind the scenes a replay id is just the index
+        // of an event in a topic, hopefully creating a huge, random replay id
+        // is enough to make it invalid
+        long replayId = 100000000000L + new Random().nextLong(100000000000L);
+        ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+        buffer.putLong(replayId);
+        return Base64.getEncoder().encodeToString(buffer.array());
+    }
+
     @Override
     protected RouteBuilder doCreateRouteBuilder() throws Exception {
         return new RouteBuilder() {
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java
 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java
index 0b71c079712..b328ec8a520 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java
@@ -18,21 +18,29 @@ package org.apache.camel.component.salesforce;
 
 import java.io.IOException;
 import java.net.ServerSocket;
+import java.util.Base64;
 
+import com.google.protobuf.ByteString;
 import com.salesforce.eventbus.protobuf.ReplayPreset;
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
 import org.apache.camel.component.salesforce.internal.SalesforceSession;
 import org.apache.camel.component.salesforce.internal.client.PubSubApiClient;
 import 
org.apache.camel.component.salesforce.internal.pubsub.AuthErrorPubSubServer;
+import 
org.apache.camel.component.salesforce.internal.pubsub.SendInvalidReplayIdErrorPubSubServer;
 import 
org.apache.camel.component.salesforce.internal.pubsub.SendOneMessagePubSubServer;
+import org.apache.camel.spi.ExceptionHandler;
 import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -64,11 +72,11 @@ public class PubSubApiTest {
                 port, 1000, 10000, true));
         client.setUsePlainTextConnection(true);
         client.start();
-        client.subscribe(consumer, ReplayPreset.LATEST, null, 0, true);
+        client.subscribe(consumer, ReplayPreset.LATEST, null, true);
 
         verify(session, timeout(5000)).attemptLoginUntilSuccessful(anyLong(), 
anyLong());
-        verify(client, timeout(5000).times(1)).subscribe(consumer, 
ReplayPreset.LATEST, null, 0, true);
-        verify(client, timeout(5000).times(1)).subscribe(consumer, 
ReplayPreset.CUSTOM, "MTIz", 0, false);
+        verify(client, timeout(5000).times(1)).subscribe(consumer, 
ReplayPreset.LATEST, null, true);
+        verify(client, timeout(5000).times(1)).subscribe(consumer, 
ReplayPreset.CUSTOM, "MTIz", true);
     }
 
     @Test
@@ -94,10 +102,10 @@ public class PubSubApiTest {
                 port, 1000, 10000, true));
         client.setUsePlainTextConnection(true);
         client.start();
-        client.subscribe(consumer, ReplayPreset.CUSTOM, "initial", 0, false);
+        client.subscribe(consumer, ReplayPreset.CUSTOM, "initial", true);
 
         verify(session, timeout(5000)).attemptLoginUntilSuccessful(anyLong(), 
anyLong());
-        verify(client, timeout(5000).times(2)).subscribe(consumer, 
ReplayPreset.CUSTOM, "initial", 0, false);
+        verify(client, timeout(5000).times(2)).subscribe(consumer, 
ReplayPreset.CUSTOM, "initial", true);
     }
 
     @Test
@@ -123,12 +131,90 @@ public class PubSubApiTest {
                 port, 1000, 10000, true));
         client.setUsePlainTextConnection(true);
         client.start();
-        client.subscribe(consumer, ReplayPreset.LATEST, null, 0, false);
+        client.subscribe(consumer, ReplayPreset.LATEST, null, true);
 
         Thread.sleep(1000);
 
         verify(session, timeout(5000)).attemptLoginUntilSuccessful(anyLong(), 
anyLong());
-        verify(client, timeout(5000).times(2)).subscribe(consumer, 
ReplayPreset.LATEST, null, 0, false);
+        verify(client, timeout(5000).times(2)).subscribe(consumer, 
ReplayPreset.LATEST, null, true);
+    }
+
+    @Test
+    public void testFallbackToLatestReplayIdWhenReplayIdIsCorrupted() throws 
Exception {
+        final SalesforceSession session = mock(SalesforceSession.class);
+        when(session.getAccessToken()).thenReturn("faketoken");
+        when(session.getInstanceUrl()).thenReturn("https://myinstance";);
+        when(session.getOrgId()).thenReturn("00D123123123");
+
+        final PubSubApiConsumer consumer = mock(PubSubApiConsumer.class);
+        when(consumer.getTopic()).thenReturn("/event/FakeTopic");
+        when(consumer.getBatchSize()).thenReturn(100);
+
+        int port = getPort();
+        LOG.debug("Starting server on port {}", port);
+        final Server grpcServer = ServerBuilder.forPort(port)
+                .addService(new SendInvalidReplayIdErrorPubSubServer(1))
+                .build();
+        grpcServer.start();
+
+        PubSubApiClient client = Mockito.spy(new PubSubApiClient(
+                session, new SalesforceLoginConfig(), "localhost",
+                port, 1000, 10000, true));
+        client.setUsePlainTextConnection(true);
+        client.start();
+        String replayId = encodeReplayId("123");
+        client.subscribe(consumer, ReplayPreset.CUSTOM, replayId, true);
+
+        Thread.sleep(1000);
+
+        InOrder inOrder = Mockito.inOrder(client);
+        inOrder.verify(client, timeout(5000)).subscribe(consumer, 
ReplayPreset.CUSTOM, replayId, true);
+        inOrder.verify(client, timeout(5000)).subscribe(consumer, 
ReplayPreset.LATEST, null, true);
+    }
+
+    @Test
+    public void 
testInvokesExceptionHandlerWhenReplayIdIsCorruptedAndFallbackToLatestReplayIdIsDisabled()
 throws Exception {
+        final SalesforceSession session = mock(SalesforceSession.class);
+        when(session.getAccessToken()).thenReturn("faketoken");
+        when(session.getInstanceUrl()).thenReturn("https://myinstance";);
+        when(session.getOrgId()).thenReturn("00D123123123");
+
+        final ExceptionHandler exceptionHandler = mock(ExceptionHandler.class);
+        final PubSubApiConsumer consumer = mock(PubSubApiConsumer.class);
+        when(consumer.getTopic()).thenReturn("/event/FakeTopic");
+        when(consumer.getBatchSize()).thenReturn(100);
+        when(consumer.getExceptionHandler()).thenReturn(exceptionHandler);
+
+        int port = getPort();
+        LOG.debug("Starting server on port {}", port);
+        final Server grpcServer = ServerBuilder.forPort(port)
+                .addService(new SendInvalidReplayIdErrorPubSubServer(3))
+                .build();
+        grpcServer.start();
+
+        PubSubApiClient client = Mockito.spy(new PubSubApiClient(
+                session, new SalesforceLoginConfig(), "localhost",
+                port, 1000, 10000, true));
+        client.setUsePlainTextConnection(true);
+        client.start();
+        final String replayId = encodeReplayId("123");
+        client.subscribe(consumer, ReplayPreset.CUSTOM, replayId, false);
+
+        Thread.sleep(1000);
+
+        InOrder inOrder = Mockito.inOrder(client);
+        inOrder.verify(client, timeout(5000).times(3)).subscribe(consumer, 
ReplayPreset.CUSTOM, replayId, false);
+        inOrder.verify(client, never()).subscribe(consumer, 
ReplayPreset.LATEST, null, false);
+
+        ArgumentCaptor<InvalidReplayIdException> captor = 
ArgumentCaptor.forClass(InvalidReplayIdException.class);
+        verify(exceptionHandler, 
timeout(5000).times(3)).handleException(captor.capture());
+        for (InvalidReplayIdException exception : captor.getAllValues()) {
+            assertEquals(replayId, exception.getReplayId());
+        }
+    }
+
+    private String encodeReplayId(String replayId) {
+        return 
Base64.getEncoder().encodeToString(ByteString.copyFromUtf8(replayId).toByteArray());
     }
 
     @Test
@@ -154,7 +240,7 @@ public class PubSubApiTest {
                 port, 1000, 10000, true);
         client.setUsePlainTextConnection(true);
         client.start();
-        client.subscribe(consumer, ReplayPreset.LATEST, null, 0, true);
+        client.subscribe(consumer, ReplayPreset.LATEST, null, true);
 
         verify(session, timeout(5000)).attemptLoginUntilSuccessful(anyLong(), 
anyLong());
     }
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/pubsub/SendInvalidReplayIdErrorPubSubServer.java
 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/pubsub/SendInvalidReplayIdErrorPubSubServer.java
new file mode 100644
index 00000000000..a96c5304249
--- /dev/null
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/pubsub/SendInvalidReplayIdErrorPubSubServer.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.pubsub;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+import com.google.protobuf.ByteString;
+import com.salesforce.eventbus.protobuf.FetchRequest;
+import com.salesforce.eventbus.protobuf.FetchResponse;
+import com.salesforce.eventbus.protobuf.PubSubGrpc;
+import io.grpc.Metadata;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+
+import static 
org.apache.camel.component.salesforce.internal.client.PubSubApiClient.PUBSUB_ERROR_CORRUPTED_REPLAY_ID;
+
+public class SendInvalidReplayIdErrorPubSubServer extends 
PubSubGrpc.PubSubImplBase {
+
+    private int count = 0;
+    private int numberOfInvalidIdReplies;
+
+    public SendInvalidReplayIdErrorPubSubServer(int numberOfInvalidIdReplies) {
+        this.numberOfInvalidIdReplies = numberOfInvalidIdReplies;
+    }
+
+    @Override
+    public StreamObserver<FetchRequest> 
subscribe(StreamObserver<FetchResponse> client) {
+
+        return new StreamObserver<>() {
+            @Override
+            public void onNext(FetchRequest request) {
+                count++;
+                if (count <= numberOfInvalidIdReplies && 
ByteString.copyFromUtf8("123").equals(request.getReplayId())) {
+                    TimerTask task = new TimerTask() {
+                        public void run() {
+                            StatusRuntimeException e = new 
StatusRuntimeException(Status.UNAUTHENTICATED, new Metadata());
+                            e.getTrailers().put(Metadata.Key.of("error-code", 
Metadata.ASCII_STRING_MARSHALLER),
+                                    PUBSUB_ERROR_CORRUPTED_REPLAY_ID);
+                            client.onError(e);
+                        }
+                    };
+                    schedule(task);
+                    return;
+                }
+                TimerTask task = new TimerTask() {
+                    public void run() {
+                        FetchResponse response = FetchResponse.newBuilder()
+                                
.setLatestReplayId(ByteString.copyFromUtf8("456"))
+                                .build();
+                        client.onNext(response);
+                    }
+                };
+                schedule(task);
+            }
+
+            private void schedule(TimerTask task) {
+                Timer timer = new Timer("Timer");
+                long delay = 1000L;
+                timer.schedule(task, delay);
+            }
+
+            @Override
+            public void onError(Throwable t) {
+
+            }
+
+            @Override
+            public void onCompleted() {
+
+            }
+        };
+    }
+}

Reply via email to