So I have an anonymous class implementing MongoDeserializationSchema new MongoDeserializationSchema<Data>() { @Override public Data deserialize(BsonDocument document) { String json = document.toJson(); Data data = null; try { data = gson.fromJson(json, Data.class); } catch (JsonSyntaxException e) { logger.error("Error decoding Data {}", json, e); } return data; }
@Override public TypeInformation<Data> getProducedType() { return Types.POJO(Data.class); } } I don't see any errors logged in from these methods. Also the pipeline works fine for a while and then it stops working due to the error posted. Error states something with the key and not the record itself. Maybe it partitioned records based on some fields of the collection and that value is null for that record ? I am using PartitionStrategy.SAMPLE. Also watermark strategy is WatermarkStrategy.*noWatermarks.* Thanks Sachin On Mon, Aug 5, 2024 at 5:47 PM Xiqian YU <kono....@outlook.com> wrote: > Hi Sachin, > > > > Seems KeyGroupStreamPartitioner is complaining about receiving a null > StreamRecord, which is abnormal since MongoDeserializationSchema ensures > non-nullability before putting it into stream: > > > > ``` > > default void deserialize(BsonDocument document, Collector<T> out) throws > IOException { > T deserialize = deserialize(document); > if (deserialize != null) { > out.collect(deserialize); // No null value will be emitted > } > } > > ``` > > > > Could you please clarify what methods does the > MongoDeserializationSchema<Data> > class overrides, like `deserialize(BsonDocument)` method, or > `deserialize(BsonDocument, Collector)`, too? > > > > Regards, > > Xiqian > > > > *De : *Sachin Mittal <sjmit...@gmail.com> > *Date : *lundi, 5 août 2024 à 19:59 > *À : *user@flink.apache.org <user@flink.apache.org> > *Objet : *How can I debug Assigned key must not be null error when > reading from Mongodb source > > Hi, > > I am using mongodb connector provided in here: > > > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/mongodb/ > > > > I am instantiating it pretty much in the recommended way: > > > > MongoSource<Data> source = > > MongoSource.<Data>builder() > > .setUri("...") > > .setDatabase("...") > > .setCollection("...") > > .setFetchSize(2048) > > .setNoCursorTimeout(true) > > .setPartitionStrategy(PartitionStrategy.SAMPLE) > > .setPartitionSize(MemorySize.ofMebiBytes(64)) > > .setSamplesPerPartition(10) > > .setDeserializationSchema( > > new MongoDeserializationSchema<Data>() { > > ... > > }) > > .build(); > > final DataStream<Data> events = > env.fromSource(source, WatermarkStrategy.*noWatermarks*(), "Src"); > > > > It is running fine but after a while the job crashed with following > exception: > > > > 2024-08-05 16:21:42,580 WARN org.apache.flink.runtime.taskmanager.Task [] > - Source: Src (1/1)#0 > (394ae5d24e01c10336875aec33ad43c2_bc764cd8ddf7a0cff126f51c16239658_0_0) > switched from RUNNING to FAILED with failure cause: > > java.lang.NullPointerException: Assigned key must not be null! > > at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76) > ~[flink-dist-1.18.1.jar:1.18.1] > > at org.apache.flink.runtime.state.KeyGroupRangeAssignment > .assignKeyToParallelOperator(KeyGroupRangeAssignment.java:51) > ~[flink-dist-1.18.1.jar:1.18.1] > > at org.apache.flink.streaming.runtime.partitioner. > KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:63) > ~[flink-dist-1.18.1.jar:1.18.1] > > at org.apache.flink.streaming.runtime.partitioner. > KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:35) > ~[flink-dist-1.18.1.jar:1.18.1] > > at org.apache.flink.runtime.io.network.api.writer. > ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55) > ~[flink-dist-1.18.1.jar:1.18.1] > > at org.apache.flink.streaming.runtime.io.RecordWriterOutput > .pushToRecordWriter(RecordWriterOutput.java:134) ~[flink-dist-1.18.1.jar: > 1.18.1] > > at org.apache.flink.streaming.runtime.io.RecordWriterOutput > .collectAndCheckIfChained(RecordWriterOutput.java:114) ~[flink-dist-1.18.1 > .jar:1.18.1] > > at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect( > RecordWriterOutput.java:95) ~[flink-dist-1.18.1.jar:1.18.1] > > at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect( > RecordWriterOutput.java:48) ~[flink-dist-1.18.1.jar:1.18.1] > > at org.apache.flink.streaming.api.operators.CountingOutput.collect( > CountingOutput.java:59) ~[flink-dist-1.18.1.jar:1.18.1] > > at org.apache.flink.streaming.api.operators.CountingOutput.collect( > CountingOutput.java:31) ~[flink-dist-1.18.1.jar:1.18.1] > > at org.apache.flink.streaming.runtime.tasks. > SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord( > SourceOperatorStreamTask.java:309) ~[flink-dist-1.18.1.jar:1.18.1] > > at org.apache.flink.streaming.api.operators.source. > SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) > ~[flink-dist-1.18.1.jar:1.18.1] > > at org.apache.flink.streaming.api.operators.source. > SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101) > ~[flink-dist-1.18.1.jar:1.18.1] > > at org.apache.flink.connector.mongodb.source.reader.emitter. > MongoRecordEmitter$SourceOutputWrapper.collect(MongoRecordEmitter.java:62) > ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?] > > at org.apache.flink.connector.mongodb.source.reader.deserializer. > MongoDeserializationSchema.deserialize(MongoDeserializationSchema.java:60) > ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?] > > at org.apache.flink.connector.mongodb.source.reader.emitter. > MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:54) > ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?] > > at org.apache.flink.connector.mongodb.source.reader.emitter. > MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:34) > ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?] > > ... > > > > Any idea where should I look and how can I fix this ? > > > > Thanks > > Sachin > > >