[
https://issues.apache.org/jira/browse/FLINK-38550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yanquan Lv resolved FLINK-38550.
--------------------------------
Fix Version/s: cdc-3.6.0
Resolution: Fixed
Resolved via 4698b7a6397fafbf3e54d81ce7c29ad1f62d7d16.
> Flink CDC source connector for mongodb repeatedly failing in streaming mode
> with error code 286
> -----------------------------------------------------------------------------------------------
>
> Key: FLINK-38550
> URL: https://issues.apache.org/jira/browse/FLINK-38550
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.5.0
> Reporter: Sachin Mittal
> Assignee: Sachin Mittal
> Priority: Major
> Labels: pull-request-available
> Fix For: cdc-3.6.0
>
>
> When the datastream application starts and if there is no resume token for a
> collection in oplog, it fails with following stacktrace upon startup:
>
> {code:java}
> com.mongodb.MongoCommandException: Command failed with error 286
> (ChangeStreamHistoryLost): 'PlanExecutor error during aggregation :: caused
> by :: Resume of change stream was not possible, as the resume point may no
> longer be in the oplog.' on server
> rh-doat2-service-prod-shard-00-02.bbvw6.mongodb.net:27017. The full response
> is {"errorLabels": ["NonResumableChangeStreamError"], "ok": 0.0, "errmsg":
> "PlanExecutor error during aggregation :: caused by :: Resume of change
> stream was not possible, as the resume point may no longer be in the oplog.",
> "code": 286, "codeName": "ChangeStreamHistoryLost", "$clusterTime":
> {"clusterTime": {"$timestamp": {"t": 1760708556, "i": 4}}, "signature":
> {"hash": {"$binary": {"base64": "TbHFeBl/kiI3w3EG+TjtkkNn5Sk=", "subType":
> "00"}}, "keyId": 7511389671713144834}}, "operationTime": {"$timestamp": {"t":
> 1760708556, "i": 4}}}at
> com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:205)
> at
> com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:454)
> at
> com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:372)
> at
> com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:114)
> at
> com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:765)
> at
> com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:76)
> at
> com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:209)
> at
> com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:115)
> at
> com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:83)
> at
> com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:74)
> at
> com.mongodb.internal.connection.DefaultServer$OperationCountTrackingConnection.command(DefaultServer.java:299)
> at
> com.mongodb.internal.operation.SyncOperationHelper.createReadCommandAndExecute(SyncOperationHelper.java:273)
> at
> com.mongodb.internal.operation.SyncOperationHelper.lambda$executeRetryableRead$3(SyncOperationHelper.java:191)
> at
> com.mongodb.internal.operation.SyncOperationHelper.lambda$withSourceAndConnection$0(SyncOperationHelper.java:127)
> at
> com.mongodb.internal.operation.SyncOperationHelper.withSuppliedResource(SyncOperationHelper.java:152)
> at
> com.mongodb.internal.operation.SyncOperationHelper.lambda$withSourceAndConnection$1(SyncOperationHelper.java:126)
> at
> com.mongodb.internal.operation.SyncOperationHelper.withSuppliedResource(SyncOperationHelper.java:152)
> at
> com.mongodb.internal.operation.SyncOperationHelper.withSourceAndConnection(SyncOperationHelper.java:125)
> at
> com.mongodb.internal.operation.SyncOperationHelper.lambda$executeRetryableRead$4(SyncOperationHelper.java:189)
> at
> com.mongodb.internal.operation.SyncOperationHelper.lambda$decorateReadWithRetries$12(SyncOperationHelper.java:292)
> at
> com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:67)
> at
> com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead(SyncOperationHelper.java:194)
> at
> com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead(SyncOperationHelper.java:176)
> at
> com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:193)
> at
> com.mongodb.internal.operation.ChangeStreamOperation.lambda$execute$0(ChangeStreamOperation.java:187)
> at
> com.mongodb.internal.operation.SyncOperationHelper.withReadConnectionSource(SyncOperationHelper.java:99)
> at
> com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:185)
> at
> com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:54)
> at
> com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:153)
> at
> com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:212)
> at
> com.mongodb.client.internal.ChangeStreamIterableImpl.access$000(ChangeStreamIterableImpl.java:55)
> at
> com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:139)
> at
> com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:131)
> at
> org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:287)
> at
> org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:249)
> at
> org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.execute(MongoDBStreamFetchTask.java:104)
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:93)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829) {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)