Ciprian Anton created FLINK-38601:
-------------------------------------

             Summary: MongoDB CDC silently stops consuming from unbounded 
streams when Throwable errors occur and never recovers
                 Key: FLINK-38601
                 URL: https://issues.apache.org/jira/browse/FLINK-38601
             Project: Flink
          Issue Type: Bug
          Components: Connectors / MongoDB, Flink CDC
            Reporter: Ciprian Anton


The Flink MongoDB CDC connector does not properly handle 
{{java.lang.Throwable}} errors (such as {{{}OutOfMemoryError{}}}, 
{{{}StackOverflowError{}}}, etc.) in the stream consumption loop. When such 
errors occur, the connector silently stops consuming from unbounded change 
streams without propagating the error or triggering any recovery mechanism, 
causing the CDC pipeline to become permanently stuck. Note that these 
exceptions do not inherit java Exception, thus they will not be caught by 
{{catch (Exception e)}}

 

I confirmed this hypothesis by adding a {{catch (Throwable t)}} block in 
{{{}MongoDBStreamFetchTask.java{}}}, which successfully caught the error that 
was previously being silently swallowed:

{{2025-10-30 12:53:17,255 ERROR 
org.ni.flink.systemscdc.mongosource.CustomMongoDBStreamFetchTask [] - Captured 
throwable }}
{{java.lang.OutOfMemoryError: Java heap space}}
{{    at java.base/java.util.Arrays.copyOf(Unknown Source) ~[?:?]}}
{{    at 
java.base/java.lang.AbstractStringBuilder.ensureCapacityInternal(Unknown 
Source) ~[?:?]}}
{{    at java.base/java.lang.AbstractStringBuilder.append(Unknown Source) 
~[?:?]}}
{{    at java.base/java.lang.StringBuffer.append(Unknown Source) ~[?:?]}}
{{    at java.base/java.io.StringWriter.write(Unknown Source) ~[?:?]}}
{{    at 
org.bson.json.StrictCharacterStreamJsonWriter.write(StrictCharacterStreamJsonWriter.java:383)
 
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{    at 
org.bson.json.StrictCharacterStreamJsonWriter.writeStringHelper(StrictCharacterStreamJsonWriter.java:349)
 
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{    at 
org.bson.json.StrictCharacterStreamJsonWriter.writeName(StrictCharacterStreamJsonWriter.java:149)
 
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{    at org.bson.json.JsonWriter.doWriteName(JsonWriter.java:82) 
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{    at org.bson.AbstractBsonWriter.writeName(AbstractBsonWriter.java:537) 
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{    at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:117) 
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{    at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:42) 
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{    at org.bson.internal.LazyCodec.encode(LazyCodec.java:43) 
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{    at 
org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext.java:91) 
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{    at 
org.bson.codecs.BsonDocumentCodec.writeValue(BsonDocumentCodec.java:139) 
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{    at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:118) 
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{    at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:42) 
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{    at 
org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext.java:91) 
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{    at 
org.bson.codecs.BsonDocumentCodec.writeValue(BsonDocumentCodec.java:139) 
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{    at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:118) 
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{    at org.bson.BsonDocument.toJson(BsonDocument.java:848) 
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{    at 
com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.stringToSchemaAndValue(BsonValueToSchemaAndValue.java:195)
 
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{    at 
com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.toSchemaAndValue(BsonValueToSchemaAndValue.java:91)
 
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{    at 
com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.lambda$recordToSchemaAndValue$2(BsonValueToSchemaAndValue.java:275)
 
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{    at 
com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue$$Lambda$1273/0x00007f65cf873c58.accept(Unknown
 Source) ~[?:?]}}
{{    at java.base/java.util.ArrayList.forEach(Unknown Source) ~[?:?]}}
{{    at java.base/java.util.Collections$UnmodifiableCollection.forEach(Unknown 
Source) ~[?:?]}}
{{    at 
com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.recordToSchemaAndValue(BsonValueToSchemaAndValue.java:271)
 
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{    at 
com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.toSchemaAndValue(BsonValueToSchemaAndValue.java:103)
 
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{    at 
org.apache.flink.cdc.connectors.mongodb.source.utils.MongoRecordUtils.createSourceRecord(MongoRecordUtils.java:159)
 
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{    at 
org.apache.flink.cdc.connectors.mongodb.source.utils.MongoRecordUtils.createSourceRecord(MongoRecordUtils.java:138)
 
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{    at 
org.ni.flink.systemscdc.mongosource.CustomMongoDBStreamFetchTask.execute(CustomMongoDBStreamFetchTask.java:149)
 
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}

 

I think the root cause may come from 
{{IncrementalSourceStreamFetcher.submitTask()}} which also does not handle 
{{Throwable}} errors, preventing the exception from being propagated further up 
the stack to trigger task failure and recovery.

The logs also don't show any error, just indicates that the stream fetcher 
gracefully stopped:

{{2025-10-30 12:53:30,226 INFO  
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Finished reading from splits [stream-split]}}
{{2025-10-30 12:53:42,275 INFO  
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished 
reading split(s) [stream-split]}}
{{2025-10-30 12:53:42,277 INFO  
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - 
Closing splitFetcher 21 because it is idle.}}
{{2025-10-30 12:53:42,277 INFO  
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Shutting down split fetcher 21}}
{{2025-10-30 12:53:42,277 INFO  
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split 
fetcher 21 exited.}}

 
Since Flink is supposed to be fault tolerant, my expectation is that the task 
would be restarted and some errors would show up in the logs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to