Hi Sachin,

Seems KeyGroupStreamPartitioner is complaining about receiving a null 
StreamRecord, which is abnormal since MongoDeserializationSchema ensures 
non-nullability before putting it into stream:

```
default void deserialize(BsonDocument document, Collector<T> out) throws 
IOException {
    T deserialize = deserialize(document);
    if (deserialize != null) {
        out.collect(deserialize); // No null value will be emitted
    }
}
```

Could you please clarify what methods does the MongoDeserializationSchema<Data> 
class overrides, like `deserialize(BsonDocument)` method, or 
`deserialize(BsonDocument, Collector)`, too?

Regards,
Xiqian

De : Sachin Mittal <sjmit...@gmail.com>
Date : lundi, 5 août 2024 à 19:59
À : user@flink.apache.org <user@flink.apache.org>
Objet : How can I debug Assigned key must not be null error when reading from 
Mongodb source
Hi,
I am using mongodb connector provided in here:
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/mongodb/

I am instantiating it pretty much in the recommended way:

MongoSource<Data> source =
MongoSource.<Data>builder()
.setUri("...")
.setDatabase("...")
.setCollection("...")
.setFetchSize(2048)
.setNoCursorTimeout(true)
.setPartitionStrategy(PartitionStrategy.SAMPLE)
.setPartitionSize(MemorySize.ofMebiBytes(64))
.setSamplesPerPartition(10)
.setDeserializationSchema(
new MongoDeserializationSchema<Data>() {
...
})
.build();

final DataStream<Data> events =
    env.fromSource(source, WatermarkStrategy.noWatermarks(), "Src");


It is running fine but after a while the job crashed with following exception:

2024-08-05 16:21:42,580 WARN org.apache.flink.runtime.taskmanager.Task [] - 
Source: Src (1/1)#0 
(394ae5d24e01c10336875aec33ad43c2_bc764cd8ddf7a0cff126f51c16239658_0_0) 
switched from RUNNING to FAILED with failure cause:
java.lang.NullPointerException: Assigned key must not be null!
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76) 
~[flink-dist-1.18.1.jar:1.18.1]
at 
org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:51)
 ~[flink-dist-1.18.1.jar:1.18.1]
at 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:63)
 ~[flink-dist-1.18.1.jar:1.18.1]
at 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:35)
 ~[flink-dist-1.18.1.jar:1.18.1]
at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55)
 ~[flink-dist-1.18.1.jar:1.18.1]
at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:134)
 ~[flink-dist-1.18.1.jar:1.18.1]
at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.RecordWriterOutput.collectAndCheckIfChained(RecordWriterOutput.java:114)
 ~[flink-dist-1.18.1.jar:1.18.1]
at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.RecordWriterOutput.collect(RecordWriterOutput.java:95)
 ~[flink-dist-1.18.1.jar:1.18.1]
at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.RecordWriterOutput.collect(RecordWriterOutput.java:48)
 ~[flink-dist-1.18.1.jar:1.18.1]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:59)
 ~[flink-dist-1.18.1.jar:1.18.1]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:31)
 ~[flink-dist-1.18.1.jar:1.18.1]
at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:309)
 ~[flink-dist-1.18.1.jar:1.18.1]
at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
 ~[flink-dist-1.18.1.jar:1.18.1]
at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
 ~[flink-dist-1.18.1.jar:1.18.1]
at 
org.apache.flink.connector.mongodb.source.reader.emitter.MongoRecordEmitter$SourceOutputWrapper.collect(MongoRecordEmitter.java:62)
 
~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?]
at 
org.apache.flink.connector.mongodb.source.reader.deserializer.MongoDeserializationSchema.deserialize(MongoDeserializationSchema.java:60)
 
~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?]
at 
org.apache.flink.connector.mongodb.source.reader.emitter.MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:54)
 
~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?]
at 
org.apache.flink.connector.mongodb.source.reader.emitter.MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:34)
 
~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?]
...

Any idea where should I look and how can I fix this ?

Thanks
Sachin

Reply via email to