Hi Allan, Map does support null but tuple serializer does not. You might want to use pojo or row types if you need to deal with null values. Read more here <https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#flinks-typeinformation-class> .
- Abhishek On Sun, 29 Sep 2019 at 14:01, allan <18612537...@163.com> wrote: > Hi guys, > > When I use like the code, > > .map(*new *MapFunction<String, Tuple2<String, String>>() { > @Override > *public *Tuple2<String, String> map(String value) *throws *Exception { > *if *(properties != *null*) { > > *return new *Tuple2<>(cv_no, json.toJSONString()); > > } > > > *return null*; > > } > }) > > next, > > .filter(f->f!=*null*) > > > > I submit my job , then the job throws an exception as follows. > > java.lang.NullPointerException > > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104) > > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) > > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) > > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > > at java.lang.Thread.run(Thread.java:748) > > . > > > > I found this method , record is null so the job threw an exception.why map > can’t return null ? is this a bug? > > *protected *<X> *void *pushToOperator(StreamRecord<X> record) { > *try *{ > > > *// we know that the given outputTag matches our OutputTag so the record > // must be of the type that our operator (and Serializer) expects. > *@SuppressWarnings(*"unchecked"*) > StreamRecord<T> castRecord = (StreamRecord<T>) record; > > *numRecordsIn*.inc(); > StreamRecord<T> copy = > castRecord.copy(*serializer*.copy(castRecord.getValue())); > *operator*.setKeyContextElement1(copy); > *operator*.processElement(copy); > } *catch *(ClassCastException e) { > *if *(*outputTag *!= *null*) { > > *// Enrich error message *ClassCastException replace = *new > *ClassCastException( > String.*format*( > *"%s. Failed to push OutputTag with id '%s' to operator. " > *+ > *"This can occur when multiple OutputTags with different > types " *+ > *"but identical names are being used."*, > e.getMessage(), > *outputTag*.getId())); > > *throw new *ExceptionInChainedOperatorException(replace); > } *else *{ > *throw new *ExceptionInChainedOperatorException(e); > } > } *catch *(Exception e) { > *throw new *ExceptionInChainedOperatorException(e); > } > > } > } > > > > > > >