Hello Sachin, The same issue had been reported in the past and JIRA was closed without resolution.
https://issues.apache.org/jira/browse/FLINK-4823 I do see this is as a data quality issue. You need to understand what you would like to do with the null value. Either way, better to filter out the null data earlier so that you may not necessary manage the null or you may also try using POJO as POJO might support null. Sincerely, -A On Tue, Apr 2, 2024 at 12:21 PM Sachin Mittal <sjmit...@gmail.com> wrote: > Hello folks, > I am keying my stream using a Tuple: > > example: > > public class MyKeySelector implements KeySelector<Data, Tuple2<Long, Long>> { > > @Override > public Tuple2<Long, Long> getKey(Data data) { > return Tuple2.of(data.id, data.id1); > } > > } > > Now id1 can have null values. In this case how should I handle this? > > Right now I am getting this error: > > java.lang.RuntimeException: Exception occurred while setting the current key > context. > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:373) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.setCurrentKey(AbstractStreamOperator.java:508) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:503) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.api.operators.OneInputStreamOperator.setKeyContextElement(OneInputStreamOperator.java:36) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:59) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > [flink-dist-1.17.1.jar:1.17.1] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > [flink-dist-1.17.1.jar:1.17.1] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > [flink-dist-1.17.1.jar:1.17.1] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292] > Caused by: org.apache.flink.types.NullFieldException: Field 1 is null, but > expected to hold a value. > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:135) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:31) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.runtime.state.SerializedCompositeKeyBuilder.serializeKeyGroupAndKey(SerializedCompositeKeyBuilder.java:192) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.runtime.state.SerializedCompositeKeyBuilder.setKeyAndKeyGroup(SerializedCompositeKeyBuilder.java:95) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:431) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:371) > ~[flink-dist-1.17.1.jar:1.17.1] > ... 18 more > Caused by: java.lang.NullPointerException > at > org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:67) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:30) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:133) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:31) > ~[flink-dist-1.17.1.jar:1.17.1] > > > Thanks > Sachin > >