Yes, this fixed the issue. Thanks Sachin
On Mon, Aug 5, 2024 at 6:38 PM Jiabao Sun <jiabao...@apache.org> wrote: > Hi Sachin, > > Could you please check if you have used the keyBy operator and ensure that > the keyBy field is not null? > > Best, > Jiabao > > On 2024/08/05 12:33:27 Sachin Mittal wrote: > > So I have an anonymous class implementing MongoDeserializationSchema > > > > new MongoDeserializationSchema<Data>() { > > @Override > > public Data deserialize(BsonDocument document) { > > String json = document.toJson(); > > Data data = null; > > try { > > data = gson.fromJson(json, Data.class); > > } catch (JsonSyntaxException e) { > > logger.error("Error decoding Data {}", json, e); > > } > > return data; > > } > > > > @Override > > public TypeInformation<Data> getProducedType() { > > return Types.POJO(Data.class); > > } > > } > > > > I don't see any errors logged in from these methods. > > Also the pipeline works fine for a while and then it stops working due to > > the error posted. > > Error states something with the key and not the record itself. Maybe it > > partitioned records based on some fields of the collection and that value > > is null for that record ? > > I am using PartitionStrategy.SAMPLE. Also watermark strategy is > > WatermarkStrategy.*noWatermarks.* > > > > Thanks > > Sachin > > > > > > On Mon, Aug 5, 2024 at 5:47 PM Xiqian YU <kono....@outlook.com> wrote: > > > > > Hi Sachin, > > > > > > > > > > > > Seems KeyGroupStreamPartitioner is complaining about receiving a null > > > StreamRecord, which is abnormal since MongoDeserializationSchema > ensures > > > non-nullability before putting it into stream: > > > > > > > > > > > > ``` > > > > > > default void deserialize(BsonDocument document, Collector<T> out) > throws > > > IOException { > > > T deserialize = deserialize(document); > > > if (deserialize != null) { > > > out.collect(deserialize); // No null value will be emitted > > > } > > > } > > > > > > ``` > > > > > > > > > > > > Could you please clarify what methods does the > MongoDeserializationSchema<Data> > > > class overrides, like `deserialize(BsonDocument)` method, or > > > `deserialize(BsonDocument, Collector)`, too? > > > > > > > > > > > > Regards, > > > > > > Xiqian > > > > > > > > > > > > *De : *Sachin Mittal <sjmit...@gmail.com> > > > *Date : *lundi, 5 août 2024 à 19:59 > > > *À : *user@flink.apache.org <user@flink.apache.org> > > > *Objet : *How can I debug Assigned key must not be null error when > > > reading from Mongodb source > > > > > > Hi, > > > > > > I am using mongodb connector provided in here: > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/mongodb/ > > > > > > > > > > > > I am instantiating it pretty much in the recommended way: > > > > > > > > > > > > MongoSource<Data> source = > > > > > > MongoSource.<Data>builder() > > > > > > .setUri("...") > > > > > > .setDatabase("...") > > > > > > .setCollection("...") > > > > > > .setFetchSize(2048) > > > > > > .setNoCursorTimeout(true) > > > > > > .setPartitionStrategy(PartitionStrategy.SAMPLE) > > > > > > .setPartitionSize(MemorySize.ofMebiBytes(64)) > > > > > > .setSamplesPerPartition(10) > > > > > > .setDeserializationSchema( > > > > > > new MongoDeserializationSchema<Data>() { > > > > > > ... > > > > > > }) > > > > > > .build(); > > > > > > final DataStream<Data> events = > > > env.fromSource(source, WatermarkStrategy.*noWatermarks*(), "Src"); > > > > > > > > > > > > It is running fine but after a while the job crashed with following > > > exception: > > > > > > > > > > > > 2024-08-05 16:21:42,580 WARN org.apache.flink.runtime.taskmanager.Task > [] > > > - Source: Src (1/1)#0 > (394ae5d24e01c10336875aec33ad43c2_bc764cd8ddf7a0cff126f51c16239658_0_0) > > > switched from RUNNING to FAILED with failure cause: > > > > > > java.lang.NullPointerException: Assigned key must not be null! > > > > > > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76) > > > ~[flink-dist-1.18.1.jar:1.18.1] > > > > > > at org.apache.flink.runtime.state.KeyGroupRangeAssignment > > > .assignKeyToParallelOperator(KeyGroupRangeAssignment.java:51) > > > ~[flink-dist-1.18.1.jar:1.18.1] > > > > > > at org.apache.flink.streaming.runtime.partitioner. > > > > KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:63) > > > ~[flink-dist-1.18.1.jar:1.18.1] > > > > > > at org.apache.flink.streaming.runtime.partitioner. > > > > KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:35) > > > ~[flink-dist-1.18.1.jar:1.18.1] > > > > > > at org.apache.flink.runtime.io.network.api.writer. > > > ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55) > > > ~[flink-dist-1.18.1.jar:1.18.1] > > > > > > at org.apache.flink.streaming.runtime.io.RecordWriterOutput > > > .pushToRecordWriter(RecordWriterOutput.java:134) > ~[flink-dist-1.18.1.jar: > > > 1.18.1] > > > > > > at org.apache.flink.streaming.runtime.io.RecordWriterOutput > > > .collectAndCheckIfChained(RecordWriterOutput.java:114) > ~[flink-dist-1.18.1 > > > .jar:1.18.1] > > > > > > at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect( > > > RecordWriterOutput.java:95) ~[flink-dist-1.18.1.jar:1.18.1] > > > > > > at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect( > > > RecordWriterOutput.java:48) ~[flink-dist-1.18.1.jar:1.18.1] > > > > > > at org.apache.flink.streaming.api.operators.CountingOutput.collect( > > > CountingOutput.java:59) ~[flink-dist-1.18.1.jar:1.18.1] > > > > > > at org.apache.flink.streaming.api.operators.CountingOutput.collect( > > > CountingOutput.java:31) ~[flink-dist-1.18.1.jar:1.18.1] > > > > > > at org.apache.flink.streaming.runtime.tasks. > > > SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord( > > > SourceOperatorStreamTask.java:309) ~[flink-dist-1.18.1.jar:1.18.1] > > > > > > at org.apache.flink.streaming.api.operators.source. > > > SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) > > > ~[flink-dist-1.18.1.jar:1.18.1] > > > > > > at org.apache.flink.streaming.api.operators.source. > > > SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101) > > > ~[flink-dist-1.18.1.jar:1.18.1] > > > > > > at org.apache.flink.connector.mongodb.source.reader.emitter. > > > > MongoRecordEmitter$SourceOutputWrapper.collect(MongoRecordEmitter.java:62) > > > > ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?] > > > > > > at org.apache.flink.connector.mongodb.source.reader.deserializer. > > > > MongoDeserializationSchema.deserialize(MongoDeserializationSchema.java:60) > > > > ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?] > > > > > > at org.apache.flink.connector.mongodb.source.reader.emitter. > > > MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:54) > > > > ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?] > > > > > > at org.apache.flink.connector.mongodb.source.reader.emitter. > > > MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:34) > > > > ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?] > > > > > > ... > > > > > > > > > > > > Any idea where should I look and how can I fix this ? > > > > > > > > > > > > Thanks > > > > > > Sachin > > > > > > > > > > > >