Hi Sachin,

Seems MongoDB CDC is trying to restore from a previous change stream position, 
which has been removed from oplog collection. The resumeToken data provided 
(8267DD842E000000A22B0229296E04) was created at Friday, March 21, 2025 3:22:22 
PM (4 days ago), which may have exceeded the MongoDB server oplog's TTL.

For the CDC client side, there’s a 
“heartbeat.interval.ms<http://heartbeat.interval.ms>” [1] option to send 
heartbeat requests to MongoDB server regularly and refreshes resume token 
position. It is suggested to set it to a reasonable interval if captured 
collection doesn’t have much change logs produced.

[1] 
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.3/docs/connectors/flink-sources/mongodb-cdc/#connector-options

Best Regards,
Xiqian

2025年3月25日 15:42,Sachin Mittal <sjmit...@gmail.com> 写道:

Hi,
We are using Flink CDC as our datastream application deployed on AWS KDA.
Our MongoDB is deployed on Mongo Atlas.

The versions are:
Flink : 1.20.0
MongoCDC (flink-connector-mongodb-cdc) :  3.1.1

After the application is running for few days, I get the following error:

java.lang.RuntimeException: One or more fetchers have encountered exception
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:333)
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:228)
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:190)
at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:444)
at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io/>.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io/>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:972)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:951)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:765)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:577)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
unexpected exception while polling the records
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
Caused by: java.io<http://java.io/>.IOException: 
org.apache.flink.util.FlinkRuntimeException: Read split 
StreamSplit{splitId='stream-split', offset={resumeToken={"_data": 
"8267DD842E000000A22B0229296E04"}, timestamp=7484283488862994594}, 
endOffset={resumeToken=null, timestamp=9223372034707292159}, isSuspended=false} 
error due to Open change stream failed.
at 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:101)
at 
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
... 6 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Read split 
StreamSplit{splitId='stream-split', offset={resumeToken={"_data": 
"8267DD842E000000A22B0229296E04"}, timestamp=7484283488862994594}, 
endOffset={resumeToken=null, timestamp=9223372034707292159}, isSuspended=false} 
error due to Open change stream failed.
at 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.checkReadException(IncrementalSourceStreamFetcher.java:137)
at 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.pollSplitRecords(IncrementalSourceStreamFetcher.java:115)
at 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.pollSplitRecords(IncrementalSourceSplitReader.java:192)
at 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:98)
... 8 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Open change stream 
failed
at 
org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:317)
at 
org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:248)
at 
org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.execute(MongoDBStreamFetchTask.java:104)
at 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89)
... 5 more
Caused by: com.mongodb.MongoCommandException: Command failed with error 286 
(ChangeStreamHistoryLost): 'PlanExecutor error during aggregation :: caused by 
:: Resume of change stream was not possible, as the resume point may no longer 
be in the oplog.' on server 
xxxxx.yyyy.mongodb.net<http://xxxxx.yyyy.mongodb.net/>:27017. The full response 
is {"errorLabels": ["NonResumableChangeStreamError"], "ok": 0.0, "errmsg": 
"PlanExecutor error during aggregation :: caused by :: Resume of change stream 
was not possible, as the resume point may no longer be in the oplog.", "code": 
286, "codeName": "ChangeStreamHistoryLost", "$clusterTime": {"clusterTime": 
{"$timestamp": {"t": 1742888042, "i": 1}}, "signature": {"hash": {"$binary": 
{"base64": "AAjSl5Tuyjc+8qWOD10ThSBQOM4=", "subType": "00"}}, "keyId": 
7448674344508588123}}, "operationTime": {"$timestamp": {"t": 1742888042, "i": 
1}}}
at 
com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:205)
at 
com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:443)
at 
com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:365)
at 
com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:114)
at 
com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:643)
at 
com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:73)

I think the error is mainly for MongoDB ops log side. Any idea how I can 
approach the team handling MongoDB to get them to resolve at their end, so such 
errors are not propagated at Flink side.
Anything I can fix at Flink or Flink CDC side to stop application from 
continuously restarting due to these errors ?

Thanks
Sachin


Reply via email to