[ 
https://issues.apache.org/jira/browse/FLINK-10380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10380:
-----------------------------------
    Labels: pull-request-available  (was: )

> Check if key is not nul before assign to group in KeyedStream
> -------------------------------------------------------------
>
>                 Key: FLINK-10380
>                 URL: https://issues.apache.org/jira/browse/FLINK-10380
>             Project: Flink
>          Issue Type: Task
>    Affects Versions: 1.6.0
>            Reporter: Sayat Satybaldiyev
>            Priority: Minor
>              Labels: pull-request-available
>
> If a user creates a KeyedStream and partition by key which might be null, 
> Flink job throws NullPointerExceptoin at runtime. However, NPE that Flink 
> throws hard to debug and understand as it doesn't refer to place in Flink job.
> *Suggestion:*
> Add precondition that checks if the key is not null and throw a descriptive 
> error if it's a null.
>  
> *Job Example*:
>  
> {code:java}
> DataStream<String> stream = env.fromCollection(Arrays.asList("aaa", "bbb"))
>  .map(x -> (String)null)
>  .keyBy(x -> x);{code}
>  
>  
> An error that is thrown:
>  
> {code:java}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: 
> java.lang.RuntimeException
>  at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
>  at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>  at org.myorg.quickstart.BuggyKeyedStream.main(BuggyKeyedStream.java:61)
> Caused by: java.lang.RuntimeException
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)16:26:43,110
>  INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC 
> service.
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>  at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>  at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>  at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
>  at 
> org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignToKeyGroup(KeyGroupRangeAssignment.java:59)
>  at 
> org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:48)
>  at 
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:63)
>  at 
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
>  at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> {code}
> ... 10 more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to