Hi,
I am using mongodb connector provided in here:
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/mongodb/

I am instantiating it pretty much in the recommended way:

MongoSource<Data> source =
MongoSource.<Data>builder()
.setUri("...")
.setDatabase("...")
.setCollection("...")
.setFetchSize(2048)
.setNoCursorTimeout(true)
.setPartitionStrategy(PartitionStrategy.SAMPLE)
.setPartitionSize(MemorySize.ofMebiBytes(64))
.setSamplesPerPartition(10)
.setDeserializationSchema(
new MongoDeserializationSchema<Data>() {
...
})
.build();

final DataStream<Data> events =
    env.fromSource(source, WatermarkStrategy.noWatermarks(), "Src");


It is running fine but after a while the job crashed with following
exception:

2024-08-05 16:21:42,580 WARN org.apache.flink.runtime.taskmanager.Task [] -
Source: Src (1/1)#0
(394ae5d24e01c10336875aec33ad43c2_bc764cd8ddf7a0cff126f51c16239658_0_0)
switched from RUNNING to FAILED with failure cause:
java.lang.NullPointerException: Assigned key must not be null!
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76)
~[flink-dist-1.18.1.jar:1.18.1]
at org.apache.flink.runtime.state.KeyGroupRangeAssignment
.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:51) ~[flink-dist-
1.18.1.jar:1.18.1]
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner
.selectChannel(KeyGroupStreamPartitioner.java:63) ~[flink-dist-1.18.1.jar:
1.18.1]
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner
.selectChannel(KeyGroupStreamPartitioner.java:35) ~[flink-dist-1.18.1.jar:
1.18.1]
at org.apache.flink.runtime.io.network.api.writer.
ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55)
~[flink-dist-1.18.1.jar:1.18.1]
at org.apache.flink.streaming.runtime.io.RecordWriterOutput
.pushToRecordWriter(RecordWriterOutput.java:134) ~[flink-dist-1.18.1.jar:
1.18.1]
at org.apache.flink.streaming.runtime.io.RecordWriterOutput
.collectAndCheckIfChained(RecordWriterOutput.java:114) ~[flink-dist-1.18.1
.jar:1.18.1]
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(
RecordWriterOutput.java:95) ~[flink-dist-1.18.1.jar:1.18.1]
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(
RecordWriterOutput.java:48) ~[flink-dist-1.18.1.jar:1.18.1]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(
CountingOutput.java:59) ~[flink-dist-1.18.1.jar:1.18.1]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(
CountingOutput.java:31) ~[flink-dist-1.18.1.jar:1.18.1]
at org.apache.flink.streaming.runtime.tasks.
SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(
SourceOperatorStreamTask.java:309) ~[flink-dist-1.18.1.jar:1.18.1]
at org.apache.flink.streaming.api.operators.source.
SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
~[flink-dist-1.18.1.jar:1.18.1]
at org.apache.flink.streaming.api.operators.source.
SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
~[flink-dist-1.18.1.jar:1.18.1]
at org.apache.flink.connector.mongodb.source.reader.emitter.
MongoRecordEmitter$SourceOutputWrapper.collect(MongoRecordEmitter.java:62)
~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?]
at org.apache.flink.connector.mongodb.source.reader.deserializer.
MongoDeserializationSchema.deserialize(MongoDeserializationSchema.java:60)
~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?]
at org.apache.flink.connector.mongodb.source.reader.emitter.
MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:54)
~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?]
at org.apache.flink.connector.mongodb.source.reader.emitter.
MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:34)
~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?]
...

Any idea where should I look and how can I fix this ?

Thanks
Sachin

Reply via email to