Sachin Mittal created FLINK-38550:
-------------------------------------

             Summary: Flink CDC source connector for mongodb repeatedly failing 
in streaming mode with error code 286
                 Key: FLINK-38550
                 URL: https://issues.apache.org/jira/browse/FLINK-38550
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
    Affects Versions: cdc-3.5.0
            Reporter: Sachin Mittal


When the datastream application starts and if there is no resume token for a 
collection in oplog, it fails with following stacktrace upon startup:

 
{code:java}
com.mongodb.MongoCommandException: Command failed with error 286 
(ChangeStreamHistoryLost): 'PlanExecutor error during aggregation :: caused by 
:: Resume of change stream was not possible, as the resume point may no longer 
be in the oplog.' on server 
rh-doat2-service-prod-shard-00-02.bbvw6.mongodb.net:27017. The full response is 
{"errorLabels": ["NonResumableChangeStreamError"], "ok": 0.0, "errmsg": 
"PlanExecutor error during aggregation :: caused by :: Resume of change stream 
was not possible, as the resume point may no longer be in the oplog.", "code": 
286, "codeName": "ChangeStreamHistoryLost", "$clusterTime": {"clusterTime": 
{"$timestamp": {"t": 1760708556, "i": 4}}, "signature": {"hash": {"$binary": 
{"base64": "TbHFeBl/kiI3w3EG+TjtkkNn5Sk=", "subType": "00"}}, "keyId": 
7511389671713144834}}, "operationTime": {"$timestamp": {"t": 1760708556, "i": 
4}}}at 
com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:205)
    at 
com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:454)
    at 
com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:372)
    at 
com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:114)
    at 
com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:765)
    at 
com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:76)
    at 
com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:209)
    at 
com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:115)
    at 
com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:83)
    at 
com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:74)
    at 
com.mongodb.internal.connection.DefaultServer$OperationCountTrackingConnection.command(DefaultServer.java:299)
    at 
com.mongodb.internal.operation.SyncOperationHelper.createReadCommandAndExecute(SyncOperationHelper.java:273)
    at 
com.mongodb.internal.operation.SyncOperationHelper.lambda$executeRetryableRead$3(SyncOperationHelper.java:191)
    at 
com.mongodb.internal.operation.SyncOperationHelper.lambda$withSourceAndConnection$0(SyncOperationHelper.java:127)
    at 
com.mongodb.internal.operation.SyncOperationHelper.withSuppliedResource(SyncOperationHelper.java:152)
    at 
com.mongodb.internal.operation.SyncOperationHelper.lambda$withSourceAndConnection$1(SyncOperationHelper.java:126)
    at 
com.mongodb.internal.operation.SyncOperationHelper.withSuppliedResource(SyncOperationHelper.java:152)
    at 
com.mongodb.internal.operation.SyncOperationHelper.withSourceAndConnection(SyncOperationHelper.java:125)
    at 
com.mongodb.internal.operation.SyncOperationHelper.lambda$executeRetryableRead$4(SyncOperationHelper.java:189)
    at 
com.mongodb.internal.operation.SyncOperationHelper.lambda$decorateReadWithRetries$12(SyncOperationHelper.java:292)
    at 
com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:67)
    at 
com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead(SyncOperationHelper.java:194)
    at 
com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead(SyncOperationHelper.java:176)
    at 
com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:193)
    at 
com.mongodb.internal.operation.ChangeStreamOperation.lambda$execute$0(ChangeStreamOperation.java:187)
    at 
com.mongodb.internal.operation.SyncOperationHelper.withReadConnectionSource(SyncOperationHelper.java:99)
    at 
com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:185)
    at 
com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:54)
    at 
com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:153)
    at 
com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:212)
    at 
com.mongodb.client.internal.ChangeStreamIterableImpl.access$000(ChangeStreamIterableImpl.java:55)
    at 
com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:139)
    at 
com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:131)
    at 
org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:287)
    at 
org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:249)
    at 
org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.execute(MongoDBStreamFetchTask.java:104)
    at 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:93)
    at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to