So I have an anonymous class implementing MongoDeserializationSchema

new MongoDeserializationSchema<Data>() {
  @Override
  public Data deserialize(BsonDocument document) {
    String json = document.toJson();
    Data data = null;
    try {
      data = gson.fromJson(json, Data.class);
    } catch (JsonSyntaxException e) {
      logger.error("Error decoding Data {}", json, e);
    }
    return data;
  }

  @Override
  public TypeInformation<Data> getProducedType() {
    return Types.POJO(Data.class);
  }
}

I don't see any errors logged in from these methods.
Also the pipeline works fine for a while and then it stops working due to
the error posted.
Error states something with the key and not the record itself. Maybe it
partitioned records based on some fields of the collection and that value
is null for that record ?
I am using PartitionStrategy.SAMPLE. Also watermark strategy is
WatermarkStrategy.*noWatermarks.*

Thanks
Sachin


On Mon, Aug 5, 2024 at 5:47 PM Xiqian YU <kono....@outlook.com> wrote:

> Hi Sachin,
>
>
>
> Seems KeyGroupStreamPartitioner is complaining about receiving a null
> StreamRecord, which is abnormal since MongoDeserializationSchema ensures
> non-nullability before putting it into stream:
>
>
>
> ```
>
> default void deserialize(BsonDocument document, Collector<T> out) throws
> IOException {
>     T deserialize = deserialize(document);
>     if (deserialize != null) {
>         out.collect(deserialize); // No null value will be emitted
>     }
> }
>
> ```
>
>
>
> Could you please clarify what methods does the 
> MongoDeserializationSchema<Data>
> class overrides, like `deserialize(BsonDocument)` method, or
> `deserialize(BsonDocument, Collector)`, too?
>
>
>
> Regards,
>
> Xiqian
>
>
>
> *De : *Sachin Mittal <sjmit...@gmail.com>
> *Date : *lundi, 5 août 2024 à 19:59
> *À : *user@flink.apache.org <user@flink.apache.org>
> *Objet : *How can I debug Assigned key must not be null error when
> reading from Mongodb source
>
> Hi,
>
> I am using mongodb connector provided in here:
>
>
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/mongodb/
>
>
>
> I am instantiating it pretty much in the recommended way:
>
>
>
> MongoSource<Data> source =
>
> MongoSource.<Data>builder()
>
> .setUri("...")
>
> .setDatabase("...")
>
> .setCollection("...")
>
> .setFetchSize(2048)
>
> .setNoCursorTimeout(true)
>
> .setPartitionStrategy(PartitionStrategy.SAMPLE)
>
> .setPartitionSize(MemorySize.ofMebiBytes(64))
>
> .setSamplesPerPartition(10)
>
> .setDeserializationSchema(
>
> new MongoDeserializationSchema<Data>() {
>
> ...
>
> })
>
> .build();
>
> final DataStream<Data> events =
>     env.fromSource(source, WatermarkStrategy.*noWatermarks*(), "Src");
>
>
>
> It is running fine but after a while the job crashed with following
> exception:
>
>
>
> 2024-08-05 16:21:42,580 WARN org.apache.flink.runtime.taskmanager.Task []
> - Source: Src (1/1)#0 
> (394ae5d24e01c10336875aec33ad43c2_bc764cd8ddf7a0cff126f51c16239658_0_0)
> switched from RUNNING to FAILED with failure cause:
>
> java.lang.NullPointerException: Assigned key must not be null!
>
> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76)
> ~[flink-dist-1.18.1.jar:1.18.1]
>
> at org.apache.flink.runtime.state.KeyGroupRangeAssignment
> .assignKeyToParallelOperator(KeyGroupRangeAssignment.java:51)
> ~[flink-dist-1.18.1.jar:1.18.1]
>
> at org.apache.flink.streaming.runtime.partitioner.
> KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:63)
> ~[flink-dist-1.18.1.jar:1.18.1]
>
> at org.apache.flink.streaming.runtime.partitioner.
> KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:35)
> ~[flink-dist-1.18.1.jar:1.18.1]
>
> at org.apache.flink.runtime.io.network.api.writer.
> ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55)
> ~[flink-dist-1.18.1.jar:1.18.1]
>
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput
> .pushToRecordWriter(RecordWriterOutput.java:134) ~[flink-dist-1.18.1.jar:
> 1.18.1]
>
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput
> .collectAndCheckIfChained(RecordWriterOutput.java:114) ~[flink-dist-1.18.1
> .jar:1.18.1]
>
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(
> RecordWriterOutput.java:95) ~[flink-dist-1.18.1.jar:1.18.1]
>
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(
> RecordWriterOutput.java:48) ~[flink-dist-1.18.1.jar:1.18.1]
>
> at org.apache.flink.streaming.api.operators.CountingOutput.collect(
> CountingOutput.java:59) ~[flink-dist-1.18.1.jar:1.18.1]
>
> at org.apache.flink.streaming.api.operators.CountingOutput.collect(
> CountingOutput.java:31) ~[flink-dist-1.18.1.jar:1.18.1]
>
> at org.apache.flink.streaming.runtime.tasks.
> SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(
> SourceOperatorStreamTask.java:309) ~[flink-dist-1.18.1.jar:1.18.1]
>
> at org.apache.flink.streaming.api.operators.source.
> SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
> ~[flink-dist-1.18.1.jar:1.18.1]
>
> at org.apache.flink.streaming.api.operators.source.
> SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
> ~[flink-dist-1.18.1.jar:1.18.1]
>
> at org.apache.flink.connector.mongodb.source.reader.emitter.
> MongoRecordEmitter$SourceOutputWrapper.collect(MongoRecordEmitter.java:62)
> ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?]
>
> at org.apache.flink.connector.mongodb.source.reader.deserializer.
> MongoDeserializationSchema.deserialize(MongoDeserializationSchema.java:60)
> ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?]
>
> at org.apache.flink.connector.mongodb.source.reader.emitter.
> MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:54)
> ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?]
>
> at org.apache.flink.connector.mongodb.source.reader.emitter.
> MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:34)
> ~[blob_p-a646f47ef42137569e51317c00fb62c1b42f5535-a9f5c41761610ad6715a8aa23323aee6:?]
>
> ...
>
>
>
> Any idea where should I look and how can I fix this ?
>
>
>
> Thanks
>
> Sachin
>
>
>

Reply via email to