[ https://issues.apache.org/jira/browse/CAMEL-21740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17927639#comment-17927639 ]
jubar commented on CAMEL-21740: ------------------------------- [~davsclaus] No problem. I just have one question regarding my PR. If someone is starting the route with the CUSTOM ReplayIdPreset and supplies an invalid replay id, shouldn't the consumer error out instead of resubscribing with LATEST? > camel-salesforce - pub/sub API corrupt replay id causes infinite resubscribe > loop > --------------------------------------------------------------------------------- > > Key: CAMEL-21740 > URL: https://issues.apache.org/jira/browse/CAMEL-21740 > Project: Camel > Issue Type: Bug > Components: camel-salesforce > Affects Versions: 4.4.3 > Reporter: jubar > Assignee: Claus Ibsen > Priority: Minor > Fix For: 4.8.4, 4.10.1, 4.11.0 > > > If there are no new Change Data Capture events for more than 3 days the > cached replayId in the PubSubApiClient becomes invalid. ([Salesforce > Docs|https://developer.salesforce.com/docs/platform/pub-sub-api/references/methods/subscribe-rpc.html?q=replay#replaying-an-event-stream]) > It seems like the next event/reconnect then causes the client to try to > resubscribe indefinitely with this invalid replay id. > I use v4.4.3 but the PubSubApiClient hasn't been changed since then. > Log snippet: > {code:java} > 2025-01-20T15:03:48.814+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : GRPC Exception > io.grpc.StatusRuntimeException: INVALID_ARGUMENT: The Replay ID validation > failed. Ensure that the Replay ID is valid. rpcId: **** > at io.grpc.Status.asRuntimeException(Status.java:533) > ~[grpc-api-1.61.1.jar:1.61.1] > at > io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:481) > ~[grpc-stub-1.61.1.jar:1.61.1] > at > io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:489) > ~[grpc-core-1.61.1.jar:1.61.1] > at > io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:453) > ~[grpc-core-1.61.1.jar:1.61.1] > at > io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:486) > ~[grpc-core-1.61.1.jar:1.61.1] > at > io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574) > ~[grpc-core-1.61.1.jar:1.61.1] > at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72) > ~[grpc-core-1.61.1.jar:1.61.1] > at > io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742) > ~[grpc-core-1.61.1.jar:1.61.1] > at > io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723) > ~[grpc-core-1.61.1.jar:1.61.1] > at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > ~[grpc-core-1.61.1.jar:1.61.1] > at > io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) > ~[grpc-core-1.61.1.jar:1.61.1] > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) > ~[na:na] > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) > ~[na:na] > at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na] > 2025-01-20T15:03:48.820+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : Trailers: > 2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : Trailer: date, Value: Mon, 20 Jan > 2025 14:03:30 GMT > 2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : Trailer: content-type, Value: > application/grpc > 2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : Trailer: rpc-id, Value: **** > 2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : Trailer: error-code, Value: > sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted > 2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : Trailer: type, Value: Subscribe > 2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : unexpected errorCode: > sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted > 2025-01-20T15:03:48.823+01:00 INFO 9360 --- [ault-executor-2] > o.a.c.c.s.i.client.PubSubApiClient : Subscribing to topic: > /data/AccountChangeEvent. > 2025-01-20T15:03:48.823+01:00 INFO 9360 --- [ault-executor-2] > o.a.c.c.s.i.client.PubSubApiClient : Subscribe successful. > 2025-01-20T15:03:48.916+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : GRPC Exception > io.grpc.StatusRuntimeException: INVALID_ARGUMENT: The Replay ID validation > failed. Ensure that the Replay ID is valid. rpcId: **** > at io.grpc.Status.asRuntimeException(Status.java:533) > ~[grpc-api-1.61.1.jar:1.61.1] > at > io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:481) > ~[grpc-stub-1.61.1.jar:1.61.1] > at > io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574) > ~[grpc-core-1.61.1.jar:1.61.1] > at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72) > ~[grpc-core-1.61.1.jar:1.61.1] > at > io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742) > ~[grpc-core-1.61.1.jar:1.61.1] > at > io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723) > ~[grpc-core-1.61.1.jar:1.61.1] > at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > ~[grpc-core-1.61.1.jar:1.61.1] > at > io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) > ~[grpc-core-1.61.1.jar:1.61.1] > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) > ~[na:na] > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) > ~[na:na] > at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na] > 2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : Trailers: > 2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : Trailer: date, Value: Mon, 20 Jan > 2025 14:03:30 GMT > 2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : Trailer: content-type, Value: > application/grpc > 2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : Trailer: rpc-id, Value: **** > 2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : Trailer: error-code, Value: > sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted > 2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : Trailer: type, Value: Subscribe > 2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : unexpected errorCode: > sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted > 2025-01-20T15:03:48.917+01:00 INFO 9360 --- [ault-executor-2] > o.a.c.c.s.i.client.PubSubApiClient : Subscribing to topic: > /data/AccountChangeEvent. > 2025-01-20T15:03:48.918+01:00 INFO 9360 --- [ault-executor-2] > o.a.c.c.s.i.client.PubSubApiClient : Subscribe successful. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)