Hi,
We are using Flink CDC as our datastream application deployed on AWS KDA.
Our MongoDB is deployed on Mongo Atlas.

The versions are:
Flink : 1.20.0
MongoCDC (flink-connector-mongodb-cdc) :  3.1.1

After the application is running for few days, I get the following error:

java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager
.checkErrors(SplitFetcherManager.java:333)
at org.apache.flink.connector.base.source.reader.SourceReaderBase
.getNextFetch(SourceReaderBase.java:228)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(
SourceReaderBase.java:190)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(
SourceOperator.java:444)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(
StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:638)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:973)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:917)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
Task.java:972)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:951)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:765)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:577)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received
unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher
.runOnce(SplitFetcher.java:168)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(
SplitFetcher.java:117)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors
.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:628)
... 1 more
Caused by: java.io.IOException: org.apache.flink.util.FlinkRuntimeException:
Read split StreamSplit{splitId='stream-split', offset={resumeToken={"_data":
"8267DD842E000000A22B0229296E04"}, timestamp=7484283488862994594},
endOffset={resumeToken=null, timestamp=9223372034707292159}, isSuspended=
false} error due to Open change stream failed.
at org.apache.flink.cdc.connectors.base.source.reader.
IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:101)
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(
FetchTask.java:58)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher
.runOnce(SplitFetcher.java:165)
... 6 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Read split
StreamSplit{splitId='stream-split', offset={resumeToken={"_data":
"8267DD842E000000A22B0229296E04"}, timestamp=7484283488862994594},
endOffset={resumeToken=null, timestamp=9223372034707292159}, isSuspended=
false} error due to Open change stream failed.
at org.apache.flink.cdc.connectors.base.source.reader.external.
IncrementalSourceStreamFetcher.checkReadException(
IncrementalSourceStreamFetcher.java:137)
at org.apache.flink.cdc.connectors.base.source.reader.external.
IncrementalSourceStreamFetcher.pollSplitRecords(
IncrementalSourceStreamFetcher.java:115)
at org.apache.flink.cdc.connectors.base.source.reader.
IncrementalSourceSplitReader.pollSplitRecords(IncrementalSourceSplitReader
.java:192)
at org.apache.flink.cdc.connectors.base.source.reader.
IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:98)
... 8 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Open change stream
failed
at org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.
MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:
317)
at org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.
MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:
248)
at org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.
MongoDBStreamFetchTask.execute(MongoDBStreamFetchTask.java:104)
at org.apache.flink.cdc.connectors.base.source.reader.external.
IncrementalSourceStreamFetcher.lambda$submitTask$0(
IncrementalSourceStreamFetcher.java:89)
... 5 more
Caused by: com.mongodb.MongoCommandException: Command failed with error 286
(ChangeStreamHistoryLost): 'PlanExecutor error during aggregation :: caused
by :: Resume of change stream was not possible, as the resume point may no
longer be in the oplog.' on server xxxxx.yyyy.mongodb.net:27017. The full
response is {"errorLabels": ["NonResumableChangeStreamError"], "ok": 0.0,
"errmsg": "PlanExecutor error during aggregation :: caused by :: Resume of
change stream was not possible, as the resume point may no longer be in the
oplog.", "code": 286, "codeName": "ChangeStreamHistoryLost", "$clusterTime":
{"clusterTime": {"$timestamp": {"t": 1742888042, "i": 1}}, "signature": {
"hash": {"$binary": {"base64": "AAjSl5Tuyjc+8qWOD10ThSBQOM4=", "subType":
"00"}}, "keyId": 7448674344508588123}}, "operationTime": {"$timestamp": {"t":
1742888042, "i": 1}}}
at com.mongodb.internal.connection.ProtocolHelper
.getCommandFailureException(ProtocolHelper.java:205)
at com.mongodb.internal.connection.InternalStreamConnection
.receiveCommandMessageResponse(InternalStreamConnection.java:443)
at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(
InternalStreamConnection.java:365)
at com.mongodb.internal.connection.UsageTrackingInternalConnection
.sendAndReceive(UsageTrackingInternalConnection.java:114)
at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection
.sendAndReceive(DefaultConnectionPool.java:643)
at com.mongodb.internal.connection.CommandProtocolImpl.execute(
CommandProtocolImpl.java:73)

I think the error is mainly for MongoDB ops log side. Any idea how I can
approach the team handling MongoDB to get them to resolve at their end, so
such errors are not propagated at Flink side.
Anything I can fix at Flink or Flink CDC side to stop application from
continuously restarting due to these errors ?

Thanks
Sachin

Reply via email to