Yes, this fixed the issue.

Thanks
Sachin


On Mon, Aug 5, 2024 at 6:38 PM Jiabao Sun <jiabao...@apache.org> wrote:

> Hi Sachin,
>
> Could you please check if you have used the keyBy operator and ensure that
> the keyBy field is not null?
>
> Best,
> Jiabao
>
> On 2024/08/05 12:33:27 Sachin Mittal wrote:
> > So I have an anonymous class implementing MongoDeserializationSchema
> >
> > new MongoDeserializationSchema<Data>() {
> >   @Override
> >   public Data deserialize(BsonDocument document) {
> >     String json = document.toJson();
> >     Data data = null;
> >     try {
> >       data = gson.fromJson(json, Data.class);
> >     } catch (JsonSyntaxException e) {
> >       logger.error("Error decoding Data {}", json, e);
> >     }
> >     return data;
> >   }
> >
> >   @Override
> >   public TypeInformation<Data> getProducedType() {
> >     return Types.POJO(Data.class);
> >   }
> > }
> >
> > I don't see any errors logged in from these methods.
> > Also the pipeline works fine for a while and then it stops working due to
> > the error posted.
> > Error states something with the key and not the record itself. Maybe it
> > partitioned records based on some fields of the collection and that value
> > is null for that record ?
> > I am using PartitionStrategy.SAMPLE. Also watermark strategy is
> > WatermarkStrategy.*noWatermarks.*
> >
> > Thanks
> > Sachin
> >
> >
> > On Mon, Aug 5, 2024 at 5:47 PM Xiqian YU <kono....@outlook.com> wrote:
> >
> > > Hi Sachin,
> > >
> > >
> > >
> > > Seems KeyGroupStreamPartitioner is complaining about receiving a null
> > > StreamRecord, which is abnormal since MongoDeserializationSchema
> ensures
> > > non-nullability before putting it into stream:
> > >
> > >
> > >
> > > ```
> > >
> > > default void deserialize(BsonDocument document, Collector<T> out)
> throws
> > > IOException {
> > >     T deserialize = deserialize(document);
> > >     if (deserialize != null) {
> > >         out.collect(deserialize); // No null value will be emitted
> > >     }
> > > }
> > >
> > > ```
> > >
> > >
> > >
> > > Could you please clarify what methods does the
> MongoDeserializationSchema<Data>
> > > class overrides, like `deserialize(BsonDocument)` method, or
> > > `deserialize(BsonDocument, Collector)`, too?
> > >
> > >
> > >
> > > Regards,
> > >
> > > Xiqian
> > >
> > >
> > >
> > > *De : *Sachin Mittal <sjmit...@gmail.com>
> > > *Date : *lundi, 5 août 2024 à 19:59
> > > *À : *user@flink.apache.org <user@flink.apache.org>
> > > *Objet : *How can I debug Assigned key must not be null error when
> > > reading from Mongodb source
> > >
> > > Hi,
> > >
> > > I am using mongodb connector provided in here:
> > >
> > >
> > >
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/mongodb/
> > >
> > >
> > >
> > > I am instantiating it pretty much in the recommended way:
> > >
> > >
> > >
> > > MongoSource<Data> source =
> > >
> > > MongoSource.<Data>builder()
> > >
> > > .setUri("...")
> > >
> > > .setDatabase("...")
> > >
> > > .setCollection("...")
> > >
> > > .setFetchSize(2048)
> > >
> > > .setNoCursorTimeout(true)
> > >
> > > .setPartitionStrategy(PartitionStrategy.SAMPLE)
> > >
> > > .setPartitionSize(MemorySize.ofMebiBytes(64))
> > >
> > > .setSamplesPerPartition(10)
> > >
> > > .setDeserializationSchema(
> > >
> > > new MongoDeserializationSchema<Data>() {
> > >
> > > ...
> > >
> > > })
> > >
> > > .build();
> > >
> > > final DataStream<Data> events =
> > >     env.fromSource(source, WatermarkStrategy.*noWatermarks*(), "Src");
> > >
> > >
> > >
> > > It is running fine but after a while the job crashed with following
> > > exception:
> > >
> > >
> > >
> > > 2024-08-05 16:21:42,580 WARN org.apache.flink.runtime.taskmanager.Task
> []
> > > - Source: Src (1/1)#0
> (394ae5d24e01c10336875aec33ad43c2_bc764cd8ddf7a0cff126f51c16239658_0_0)
> > > switched from RUNNING to FAILED with failure cause:
> > >
> > > java.lang.NullPointerException: Assigned key must not be null!
> > >
> > > at
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76)
> > > ~[flink-dist-1.18.1.jar:1.18.1]
> > >
> > > at org.apache.flink.runtime.state.KeyGroupRangeAssignment
> > > .assignKeyToParallelOperator(KeyGroupRangeAssignment.java:51)
> > > ~[flink-dist-1.18.1.jar:1.18.1]
> > >
> > > at org.apache.flink.streaming.runtime.partitioner.
> > >
> KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:63)
> > > ~[flink-dist-1.18.1.jar:1.18.1]
> > >
> > > at org.apache.flink.streaming.runtime.partitioner.
> > >
> KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:35)
> > > ~[flink-dist-1.18.1.jar:1.18.1]
> > >
> > > at org.apache.flink.runtime.io.network.api.writer.
> > > ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55)
> > > ~[flink-dist-1.18.1.jar:1.18.1]
> > >
> > > at org.apache.flink.streaming.runtime.io.RecordWriterOutput
> > > .pushToRecordWriter(RecordWriterOutput.java:134)
> ~[flink-dist-1.18.1.jar:
> > > 1.18.1]
> > >
> > > at org.apache.flink.streaming.runtime.io.RecordWriterOutput
> > > .collectAndCheckIfChained(RecordWriterOutput.java:114)
> ~[flink-dist-1.18.1
> > > .jar:1.18.1]
> > >
> > > at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(
> > > RecordWriterOutput.java:95) ~[flink-dist-1.18.1.jar:1.18.1]
> > >
> > > at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(
> > > RecordWriterOutput.java:48) ~[flink-dist-1.18.1.jar:1.18.1]
> > >
> > > at org.apache.flink.streaming.api.operators.CountingOutput.collect(
> > > CountingOutput.java:59) ~[flink-dist-1.18.1.jar:1.18.1]
> > >
> > > at org.apache.flink.streaming.api.operators.CountingOutput.collect(
> > > CountingOutput.java:31) ~[flink-dist-1.18.1.jar:1.18.1]
> > >
> > > at org.apache.flink.streaming.runtime.tasks.
> > > SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(
> > > SourceOperatorStreamTask.java:309) ~[flink-dist-1.18.1.jar:1.18.1]
> > >
> > > at org.apache.flink.streaming.api.operators.source.
> > > SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
> > > ~[flink-dist-1.18.1.jar:1.18.1]
> > >
> > > at org.apache.flink.streaming.api.operators.source.
> > > SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
> > > ~[flink-dist-1.18.1.jar:1.18.1]
> > >
> > > at org.apache.flink.connector.mongodb.source.reader.emitter.
> > >
> MongoRecordEmitter$SourceOutputWrapper.collect(MongoRecordEmitter.java:62)
> > >
> ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?]
> > >
> > > at org.apache.flink.connector.mongodb.source.reader.deserializer.
> > >
> MongoDeserializationSchema.deserialize(MongoDeserializationSchema.java:60)
> > >
> ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?]
> > >
> > > at org.apache.flink.connector.mongodb.source.reader.emitter.
> > > MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:54)
> > >
> ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?]
> > >
> > > at org.apache.flink.connector.mongodb.source.reader.emitter.
> > > MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:34)
> > >
> ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?]
> > >
> > > ...
> > >
> > >
> > >
> > > Any idea where should I look and how can I fix this ?
> > >
> > >
> > >
> > > Thanks
> > >
> > > Sachin
> > >
> > >
> > >
> >
>

Reply via email to