Hi Allan,
Map does support null but tuple serializer does not. You might want to use
pojo or row types if you need to deal with null values. Read more here
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#flinks-typeinformation-class>
.

- Abhishek

On Sun, 29 Sep 2019 at 14:01, allan <18612537...@163.com> wrote:

> Hi guys,
>
> When I  use  like the code,
>
> .map(*new *MapFunction<String, Tuple2<String, String>>() {
>     @Override
>     *public *Tuple2<String, String> map(String value) *throws *Exception {
>         *if *(properties != *null*) {
>
>                 *return new *Tuple2<>(cv_no, json.toJSONString());
>
>         }
>
>
>         *return null*;
>
>     }
> })
>
> next,
>
> .filter(f->f!=*null*)
>
>
>
> I submit my job ,  then the job throws an exception as follows.
>
> java.lang.NullPointerException
>
>        at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)
>
>        at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)
>
>        at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
>
>        at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>
>        at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>
>        at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>
>        at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>
>        at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>
>        at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>
>        at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>
>        at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>
>        at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>
>        at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>
>        at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
>
>        at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>
>        at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>
>        at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>
>        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>
>        at java.lang.Thread.run(Thread.java:748)
>
> .
>
>
>
> I found this method , record is null so the job threw an exception.why map 
> can’t return null ? is this a bug?
>
>     *protected *<X> *void *pushToOperator(StreamRecord<X> record) {
>       *try *{
>
>
> *// we know that the given outputTag matches our OutputTag so the record      
>    // must be of the type that our operator (and Serializer) expects.         
> *@SuppressWarnings(*"unchecked"*)
>          StreamRecord<T> castRecord = (StreamRecord<T>) record;
>
>          *numRecordsIn*.inc();
>          StreamRecord<T> copy = 
> castRecord.copy(*serializer*.copy(castRecord.getValue()));
>          *operator*.setKeyContextElement1(copy);
>          *operator*.processElement(copy);
>       } *catch *(ClassCastException e) {
>          *if *(*outputTag *!= *null*) {
>
> *// Enrich error message            *ClassCastException replace = *new 
> *ClassCastException(
>                String.*format*(
>                   *"%s. Failed to push OutputTag with id '%s' to operator. " 
> *+
>                      *"This can occur when multiple OutputTags with different 
> types " *+
>                      *"but identical names are being used."*,
>                   e.getMessage(),
>                   *outputTag*.getId()));
>
>             *throw new *ExceptionInChainedOperatorException(replace);
>          } *else *{
>             *throw new *ExceptionInChainedOperatorException(e);
>          }
>       } *catch *(Exception e) {
>          *throw new *ExceptionInChainedOperatorException(e);
>       }
>
>    }
> }
>
>
>
>
>
>
>

Reply via email to