Hi Sachin, Seems MongoDB CDC is trying to restore from a previous change stream position, which has been removed from oplog collection. The resumeToken data provided (8267DD842E000000A22B0229296E04) was created at Friday, March 21, 2025 3:22:22 PM (4 days ago), which may have exceeded the MongoDB server oplog's TTL.
For the CDC client side, there’s a “heartbeat.interval.ms<http://heartbeat.interval.ms>” [1] option to send heartbeat requests to MongoDB server regularly and refreshes resume token position. It is suggested to set it to a reasonable interval if captured collection doesn’t have much change logs produced. [1] https://nightlies.apache.org/flink/flink-cdc-docs-release-3.3/docs/connectors/flink-sources/mongodb-cdc/#connector-options Best Regards, Xiqian 2025年3月25日 15:42,Sachin Mittal <sjmit...@gmail.com> 写道: Hi, We are using Flink CDC as our datastream application deployed on AWS KDA. Our MongoDB is deployed on Mongo Atlas. The versions are: Flink : 1.20.0 MongoCDC (flink-connector-mongodb-cdc) : 3.1.1 After the application is running for few days, I get the following error: java.lang.RuntimeException: One or more fetchers have encountered exception at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:333) at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:228) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:190) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:444) at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io/>.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io/>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:972) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:951) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:765) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:577) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ... 1 more Caused by: java.io<http://java.io/>.IOException: org.apache.flink.util.FlinkRuntimeException: Read split StreamSplit{splitId='stream-split', offset={resumeToken={"_data": "8267DD842E000000A22B0229296E04"}, timestamp=7484283488862994594}, endOffset={resumeToken=null, timestamp=9223372034707292159}, isSuspended=false} error due to Open change stream failed. at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:101) at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ... 6 more Caused by: org.apache.flink.util.FlinkRuntimeException: Read split StreamSplit{splitId='stream-split', offset={resumeToken={"_data": "8267DD842E000000A22B0229296E04"}, timestamp=7484283488862994594}, endOffset={resumeToken=null, timestamp=9223372034707292159}, isSuspended=false} error due to Open change stream failed. at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.checkReadException(IncrementalSourceStreamFetcher.java:137) at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.pollSplitRecords(IncrementalSourceStreamFetcher.java:115) at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.pollSplitRecords(IncrementalSourceSplitReader.java:192) at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:98) ... 8 more Caused by: org.apache.flink.util.FlinkRuntimeException: Open change stream failed at org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:317) at org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:248) at org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.execute(MongoDBStreamFetchTask.java:104) at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89) ... 5 more Caused by: com.mongodb.MongoCommandException: Command failed with error 286 (ChangeStreamHistoryLost): 'PlanExecutor error during aggregation :: caused by :: Resume of change stream was not possible, as the resume point may no longer be in the oplog.' on server xxxxx.yyyy.mongodb.net<http://xxxxx.yyyy.mongodb.net/>:27017. The full response is {"errorLabels": ["NonResumableChangeStreamError"], "ok": 0.0, "errmsg": "PlanExecutor error during aggregation :: caused by :: Resume of change stream was not possible, as the resume point may no longer be in the oplog.", "code": 286, "codeName": "ChangeStreamHistoryLost", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1742888042, "i": 1}}, "signature": {"hash": {"$binary": {"base64": "AAjSl5Tuyjc+8qWOD10ThSBQOM4=", "subType": "00"}}, "keyId": 7448674344508588123}}, "operationTime": {"$timestamp": {"t": 1742888042, "i": 1}}} at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:205) at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:443) at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:365) at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:114) at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:643) at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:73) I think the error is mainly for MongoDB ops log side. Any idea how I can approach the team handling MongoDB to get them to resolve at their end, so such errors are not propagated at Flink side. Anything I can fix at Flink or Flink CDC side to stop application from continuously restarting due to these errors ? Thanks Sachin