Hi Sachin, Seems KeyGroupStreamPartitioner is complaining about receiving a null StreamRecord, which is abnormal since MongoDeserializationSchema ensures non-nullability before putting it into stream:
``` default void deserialize(BsonDocument document, Collector<T> out) throws IOException { T deserialize = deserialize(document); if (deserialize != null) { out.collect(deserialize); // No null value will be emitted } } ``` Could you please clarify what methods does the MongoDeserializationSchema<Data> class overrides, like `deserialize(BsonDocument)` method, or `deserialize(BsonDocument, Collector)`, too? Regards, Xiqian De : Sachin Mittal <sjmit...@gmail.com> Date : lundi, 5 août 2024 à 19:59 À : user@flink.apache.org <user@flink.apache.org> Objet : How can I debug Assigned key must not be null error when reading from Mongodb source Hi, I am using mongodb connector provided in here: https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/mongodb/ I am instantiating it pretty much in the recommended way: MongoSource<Data> source = MongoSource.<Data>builder() .setUri("...") .setDatabase("...") .setCollection("...") .setFetchSize(2048) .setNoCursorTimeout(true) .setPartitionStrategy(PartitionStrategy.SAMPLE) .setPartitionSize(MemorySize.ofMebiBytes(64)) .setSamplesPerPartition(10) .setDeserializationSchema( new MongoDeserializationSchema<Data>() { ... }) .build(); final DataStream<Data> events = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Src"); It is running fine but after a while the job crashed with following exception: 2024-08-05 16:21:42,580 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Src (1/1)#0 (394ae5d24e01c10336875aec33ad43c2_bc764cd8ddf7a0cff126f51c16239658_0_0) switched from RUNNING to FAILED with failure cause: java.lang.NullPointerException: Assigned key must not be null! at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:51) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:63) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:35) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:134) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.RecordWriterOutput.collectAndCheckIfChained(RecordWriterOutput.java:114) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.RecordWriterOutput.collect(RecordWriterOutput.java:95) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.RecordWriterOutput.collect(RecordWriterOutput.java:48) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:59) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:31) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:309) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.connector.mongodb.source.reader.emitter.MongoRecordEmitter$SourceOutputWrapper.collect(MongoRecordEmitter.java:62) ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?] at org.apache.flink.connector.mongodb.source.reader.deserializer.MongoDeserializationSchema.deserialize(MongoDeserializationSchema.java:60) ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?] at org.apache.flink.connector.mongodb.source.reader.emitter.MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:54) ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?] at org.apache.flink.connector.mongodb.source.reader.emitter.MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:34) ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?] ... Any idea where should I look and how can I fix this ? Thanks Sachin