Hi, I am using mongodb connector provided in here: https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/mongodb/
I am instantiating it pretty much in the recommended way: MongoSource<Data> source = MongoSource.<Data>builder() .setUri("...") .setDatabase("...") .setCollection("...") .setFetchSize(2048) .setNoCursorTimeout(true) .setPartitionStrategy(PartitionStrategy.SAMPLE) .setPartitionSize(MemorySize.ofMebiBytes(64)) .setSamplesPerPartition(10) .setDeserializationSchema( new MongoDeserializationSchema<Data>() { ... }) .build(); final DataStream<Data> events = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Src"); It is running fine but after a while the job crashed with following exception: 2024-08-05 16:21:42,580 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Src (1/1)#0 (394ae5d24e01c10336875aec33ad43c2_bc764cd8ddf7a0cff126f51c16239658_0_0) switched from RUNNING to FAILED with failure cause: java.lang.NullPointerException: Assigned key must not be null! at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.state.KeyGroupRangeAssignment .assignKeyToParallelOperator(KeyGroupRangeAssignment.java:51) ~[flink-dist- 1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner .selectChannel(KeyGroupStreamPartitioner.java:63) ~[flink-dist-1.18.1.jar: 1.18.1] at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner .selectChannel(KeyGroupStreamPartitioner.java:35) ~[flink-dist-1.18.1.jar: 1.18.1] at org.apache.flink.runtime.io.network.api.writer. ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.io.RecordWriterOutput .pushToRecordWriter(RecordWriterOutput.java:134) ~[flink-dist-1.18.1.jar: 1.18.1] at org.apache.flink.streaming.runtime.io.RecordWriterOutput .collectAndCheckIfChained(RecordWriterOutput.java:114) ~[flink-dist-1.18.1 .jar:1.18.1] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect( RecordWriterOutput.java:95) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect( RecordWriterOutput.java:48) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect( CountingOutput.java:59) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect( CountingOutput.java:31) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.tasks. SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord( SourceOperatorStreamTask.java:309) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.api.operators.source. SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.api.operators.source. SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.connector.mongodb.source.reader.emitter. MongoRecordEmitter$SourceOutputWrapper.collect(MongoRecordEmitter.java:62) ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?] at org.apache.flink.connector.mongodb.source.reader.deserializer. MongoDeserializationSchema.deserialize(MongoDeserializationSchema.java:60) ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?] at org.apache.flink.connector.mongodb.source.reader.emitter. MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:54) ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?] at org.apache.flink.connector.mongodb.source.reader.emitter. MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:34) ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?] ... Any idea where should I look and how can I fix this ? Thanks Sachin