Ciprian Anton created FLINK-38601:
-------------------------------------
Summary: MongoDB CDC silently stops consuming from unbounded
streams when Throwable errors occur and never recovers
Key: FLINK-38601
URL: https://issues.apache.org/jira/browse/FLINK-38601
Project: Flink
Issue Type: Bug
Components: Connectors / MongoDB, Flink CDC
Reporter: Ciprian Anton
The Flink MongoDB CDC connector does not properly handle
{{java.lang.Throwable}} errors (such as {{{}OutOfMemoryError{}}},
{{{}StackOverflowError{}}}, etc.) in the stream consumption loop. When such
errors occur, the connector silently stops consuming from unbounded change
streams without propagating the error or triggering any recovery mechanism,
causing the CDC pipeline to become permanently stuck. Note that these
exceptions do not inherit java Exception, thus they will not be caught by
{{catch (Exception e)}}
I confirmed this hypothesis by adding a {{catch (Throwable t)}} block in
{{{}MongoDBStreamFetchTask.java{}}}, which successfully caught the error that
was previously being silently swallowed:
{{2025-10-30 12:53:17,255 ERROR
org.ni.flink.systemscdc.mongosource.CustomMongoDBStreamFetchTask [] - Captured
throwable }}
{{java.lang.OutOfMemoryError: Java heap space}}
{{ at java.base/java.util.Arrays.copyOf(Unknown Source) ~[?:?]}}
{{ at
java.base/java.lang.AbstractStringBuilder.ensureCapacityInternal(Unknown
Source) ~[?:?]}}
{{ at java.base/java.lang.AbstractStringBuilder.append(Unknown Source)
~[?:?]}}
{{ at java.base/java.lang.StringBuffer.append(Unknown Source) ~[?:?]}}
{{ at java.base/java.io.StringWriter.write(Unknown Source) ~[?:?]}}
{{ at
org.bson.json.StrictCharacterStreamJsonWriter.write(StrictCharacterStreamJsonWriter.java:383)
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{ at
org.bson.json.StrictCharacterStreamJsonWriter.writeStringHelper(StrictCharacterStreamJsonWriter.java:349)
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{ at
org.bson.json.StrictCharacterStreamJsonWriter.writeName(StrictCharacterStreamJsonWriter.java:149)
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{ at org.bson.json.JsonWriter.doWriteName(JsonWriter.java:82)
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{ at org.bson.AbstractBsonWriter.writeName(AbstractBsonWriter.java:537)
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{ at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:117)
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{ at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:42)
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{ at org.bson.internal.LazyCodec.encode(LazyCodec.java:43)
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{ at
org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext.java:91)
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{ at
org.bson.codecs.BsonDocumentCodec.writeValue(BsonDocumentCodec.java:139)
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{ at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:118)
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{ at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:42)
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{ at
org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext.java:91)
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{ at
org.bson.codecs.BsonDocumentCodec.writeValue(BsonDocumentCodec.java:139)
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{ at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:118)
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{ at org.bson.BsonDocument.toJson(BsonDocument.java:848)
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{ at
com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.stringToSchemaAndValue(BsonValueToSchemaAndValue.java:195)
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{ at
com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.toSchemaAndValue(BsonValueToSchemaAndValue.java:91)
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{ at
com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.lambda$recordToSchemaAndValue$2(BsonValueToSchemaAndValue.java:275)
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{ at
com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue$$Lambda$1273/0x00007f65cf873c58.accept(Unknown
Source) ~[?:?]}}
{{ at java.base/java.util.ArrayList.forEach(Unknown Source) ~[?:?]}}
{{ at java.base/java.util.Collections$UnmodifiableCollection.forEach(Unknown
Source) ~[?:?]}}
{{ at
com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.recordToSchemaAndValue(BsonValueToSchemaAndValue.java:271)
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{ at
com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.toSchemaAndValue(BsonValueToSchemaAndValue.java:103)
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{ at
org.apache.flink.cdc.connectors.mongodb.source.utils.MongoRecordUtils.createSourceRecord(MongoRecordUtils.java:159)
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{ at
org.apache.flink.cdc.connectors.mongodb.source.utils.MongoRecordUtils.createSourceRecord(MongoRecordUtils.java:138)
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
{{ at
org.ni.flink.systemscdc.mongosource.CustomMongoDBStreamFetchTask.execute(CustomMongoDBStreamFetchTask.java:149)
~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
I think the root cause may come from
{{IncrementalSourceStreamFetcher.submitTask()}} which also does not handle
{{Throwable}} errors, preventing the exception from being propagated further up
the stack to trigger task failure and recovery.
The logs also don't show any error, just indicates that the stream fetcher
gracefully stopped:
{{2025-10-30 12:53:30,226 INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
Finished reading from splits [stream-split]}}
{{2025-10-30 12:53:42,275 INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished
reading split(s) [stream-split]}}
{{2025-10-30 12:53:42,277 INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] -
Closing splitFetcher 21 because it is idle.}}
{{2025-10-30 12:53:42,277 INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
Shutting down split fetcher 21}}
{{2025-10-30 12:53:42,277 INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split
fetcher 21 exited.}}
Since Flink is supposed to be fault tolerant, my expectation is that the task
would be restarted and some errors would show up in the logs.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)