[ 
https://issues.apache.org/jira/browse/FLINK-38550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanquan Lv resolved FLINK-38550.
--------------------------------
    Fix Version/s: cdc-3.6.0
       Resolution: Fixed

Resolved via 4698b7a6397fafbf3e54d81ce7c29ad1f62d7d16.

> Flink CDC source connector for mongodb repeatedly failing in streaming mode 
> with error code 286
> -----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38550
>                 URL: https://issues.apache.org/jira/browse/FLINK-38550
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.5.0
>            Reporter: Sachin Mittal
>            Assignee: Sachin Mittal
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: cdc-3.6.0
>
>
> When the datastream application starts and if there is no resume token for a 
> collection in oplog, it fails with following stacktrace upon startup:
>  
> {code:java}
> com.mongodb.MongoCommandException: Command failed with error 286 
> (ChangeStreamHistoryLost): 'PlanExecutor error during aggregation :: caused 
> by :: Resume of change stream was not possible, as the resume point may no 
> longer be in the oplog.' on server 
> rh-doat2-service-prod-shard-00-02.bbvw6.mongodb.net:27017. The full response 
> is {"errorLabels": ["NonResumableChangeStreamError"], "ok": 0.0, "errmsg": 
> "PlanExecutor error during aggregation :: caused by :: Resume of change 
> stream was not possible, as the resume point may no longer be in the oplog.", 
> "code": 286, "codeName": "ChangeStreamHistoryLost", "$clusterTime": 
> {"clusterTime": {"$timestamp": {"t": 1760708556, "i": 4}}, "signature": 
> {"hash": {"$binary": {"base64": "TbHFeBl/kiI3w3EG+TjtkkNn5Sk=", "subType": 
> "00"}}, "keyId": 7511389671713144834}}, "operationTime": {"$timestamp": {"t": 
> 1760708556, "i": 4}}}at 
> com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:205)
>     at 
> com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:454)
>     at 
> com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:372)
>     at 
> com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:114)
>     at 
> com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:765)
>     at 
> com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:76)
>     at 
> com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:209)
>     at 
> com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:115)
>     at 
> com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:83)
>     at 
> com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:74)
>     at 
> com.mongodb.internal.connection.DefaultServer$OperationCountTrackingConnection.command(DefaultServer.java:299)
>     at 
> com.mongodb.internal.operation.SyncOperationHelper.createReadCommandAndExecute(SyncOperationHelper.java:273)
>     at 
> com.mongodb.internal.operation.SyncOperationHelper.lambda$executeRetryableRead$3(SyncOperationHelper.java:191)
>     at 
> com.mongodb.internal.operation.SyncOperationHelper.lambda$withSourceAndConnection$0(SyncOperationHelper.java:127)
>     at 
> com.mongodb.internal.operation.SyncOperationHelper.withSuppliedResource(SyncOperationHelper.java:152)
>     at 
> com.mongodb.internal.operation.SyncOperationHelper.lambda$withSourceAndConnection$1(SyncOperationHelper.java:126)
>     at 
> com.mongodb.internal.operation.SyncOperationHelper.withSuppliedResource(SyncOperationHelper.java:152)
>     at 
> com.mongodb.internal.operation.SyncOperationHelper.withSourceAndConnection(SyncOperationHelper.java:125)
>     at 
> com.mongodb.internal.operation.SyncOperationHelper.lambda$executeRetryableRead$4(SyncOperationHelper.java:189)
>     at 
> com.mongodb.internal.operation.SyncOperationHelper.lambda$decorateReadWithRetries$12(SyncOperationHelper.java:292)
>     at 
> com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:67)
>     at 
> com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead(SyncOperationHelper.java:194)
>     at 
> com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead(SyncOperationHelper.java:176)
>     at 
> com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:193)
>     at 
> com.mongodb.internal.operation.ChangeStreamOperation.lambda$execute$0(ChangeStreamOperation.java:187)
>     at 
> com.mongodb.internal.operation.SyncOperationHelper.withReadConnectionSource(SyncOperationHelper.java:99)
>     at 
> com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:185)
>     at 
> com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:54)
>     at 
> com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:153)
>     at 
> com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:212)
>     at 
> com.mongodb.client.internal.ChangeStreamIterableImpl.access$000(ChangeStreamIterableImpl.java:55)
>     at 
> com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:139)
>     at 
> com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:131)
>     at 
> org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:287)
>     at 
> org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:249)
>     at 
> org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.execute(MongoDBStreamFetchTask.java:104)
>     at 
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:93)
>     at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)    
> at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>     at java.base/java.lang.Thread.run(Thread.java:829) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to