Hi, We are using Flink CDC as our datastream application deployed on AWS KDA. Our MongoDB is deployed on Mongo Atlas.
The versions are: Flink : 1.20.0 MongoCDC (flink-connector-mongodb-cdc) : 3.1.1 After the application is running for few days, I get the following error: java.lang.RuntimeException: One or more fetchers have encountered exception at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager .checkErrors(SplitFetcherManager.java:333) at org.apache.flink.connector.base.source.reader.SourceReaderBase .getNextFetch(SourceReaderBase.java:228) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext( SourceReaderBase.java:190) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext( SourceOperator.java:444) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext( StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor .processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( StreamTask.java:638) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor .runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( StreamTask.java:973) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask .java:917) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring( Task.java:972) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:951) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:765) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:577) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher .runOnce(SplitFetcher.java:168) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run( SplitFetcher.java:117) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors .java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:628) ... 1 more Caused by: java.io.IOException: org.apache.flink.util.FlinkRuntimeException: Read split StreamSplit{splitId='stream-split', offset={resumeToken={"_data": "8267DD842E000000A22B0229296E04"}, timestamp=7484283488862994594}, endOffset={resumeToken=null, timestamp=9223372034707292159}, isSuspended= false} error due to Open change stream failed. at org.apache.flink.cdc.connectors.base.source.reader. IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:101) at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run( FetchTask.java:58) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher .runOnce(SplitFetcher.java:165) ... 6 more Caused by: org.apache.flink.util.FlinkRuntimeException: Read split StreamSplit{splitId='stream-split', offset={resumeToken={"_data": "8267DD842E000000A22B0229296E04"}, timestamp=7484283488862994594}, endOffset={resumeToken=null, timestamp=9223372034707292159}, isSuspended= false} error due to Open change stream failed. at org.apache.flink.cdc.connectors.base.source.reader.external. IncrementalSourceStreamFetcher.checkReadException( IncrementalSourceStreamFetcher.java:137) at org.apache.flink.cdc.connectors.base.source.reader.external. IncrementalSourceStreamFetcher.pollSplitRecords( IncrementalSourceStreamFetcher.java:115) at org.apache.flink.cdc.connectors.base.source.reader. IncrementalSourceSplitReader.pollSplitRecords(IncrementalSourceSplitReader .java:192) at org.apache.flink.cdc.connectors.base.source.reader. IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:98) ... 8 more Caused by: org.apache.flink.util.FlinkRuntimeException: Open change stream failed at org.apache.flink.cdc.connectors.mongodb.source.reader.fetch. MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java: 317) at org.apache.flink.cdc.connectors.mongodb.source.reader.fetch. MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java: 248) at org.apache.flink.cdc.connectors.mongodb.source.reader.fetch. MongoDBStreamFetchTask.execute(MongoDBStreamFetchTask.java:104) at org.apache.flink.cdc.connectors.base.source.reader.external. IncrementalSourceStreamFetcher.lambda$submitTask$0( IncrementalSourceStreamFetcher.java:89) ... 5 more Caused by: com.mongodb.MongoCommandException: Command failed with error 286 (ChangeStreamHistoryLost): 'PlanExecutor error during aggregation :: caused by :: Resume of change stream was not possible, as the resume point may no longer be in the oplog.' on server xxxxx.yyyy.mongodb.net:27017. The full response is {"errorLabels": ["NonResumableChangeStreamError"], "ok": 0.0, "errmsg": "PlanExecutor error during aggregation :: caused by :: Resume of change stream was not possible, as the resume point may no longer be in the oplog.", "code": 286, "codeName": "ChangeStreamHistoryLost", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1742888042, "i": 1}}, "signature": { "hash": {"$binary": {"base64": "AAjSl5Tuyjc+8qWOD10ThSBQOM4=", "subType": "00"}}, "keyId": 7448674344508588123}}, "operationTime": {"$timestamp": {"t": 1742888042, "i": 1}}} at com.mongodb.internal.connection.ProtocolHelper .getCommandFailureException(ProtocolHelper.java:205) at com.mongodb.internal.connection.InternalStreamConnection .receiveCommandMessageResponse(InternalStreamConnection.java:443) at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive( InternalStreamConnection.java:365) at com.mongodb.internal.connection.UsageTrackingInternalConnection .sendAndReceive(UsageTrackingInternalConnection.java:114) at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection .sendAndReceive(DefaultConnectionPool.java:643) at com.mongodb.internal.connection.CommandProtocolImpl.execute( CommandProtocolImpl.java:73) I think the error is mainly for MongoDB ops log side. Any idea how I can approach the team handling MongoDB to get them to resolve at their end, so such errors are not propagated at Flink side. Anything I can fix at Flink or Flink CDC side to stop application from continuously restarting due to these errors ? Thanks Sachin