hi,
   ok,thanks.I'll read it. Then I have another problem, which was that I had 
caught the exception ,but it still came out.





At 2019-09-29 17:05:20, "Biao Liu" <mmyy1...@gmail.com> wrote:

Hi allan,


It's not a bug. Flink does not support null value, see discussion [1].


In you example, you have to return something with MapFunction even there is 
nothing to return. Maybe you could use flatmap instead of map to handle this 
null value scenario. It's allowed to collect nothing (skip collecting when 
there is no data to return) with FlatMapFunction. Does it satisfy your 
requirement?


1. 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Connectors-and-NULL-handling-td29695.html#a29942



Thanks,
Biao /'bɪ.aʊ/






On Sun, 29 Sep 2019 at 16:48, Abhishek Jain <abhijai...@gmail.com> wrote:

Hi Allan,
Map does support null but tuple serializer does not. You might want to use pojo 
or row types if you need to deal with null values. Read more here. 


- Abhishek


On Sun, 29 Sep 2019 at 14:01, allan <18612537...@163.com> wrote:


Hi guys,

When I  use  like the code,

.map(new MapFunction<String, Tuple2<String, String>>() {
    @Override
    public Tuple2<String, String> map(String value) throws Exception {
        if (properties != null) {
                   
                return new Tuple2<>(cv_no, json.toJSONString());
            
        }

        return null;

    }
})

next,

.filter(f->f!=null)

 

I submit my job ,  then the job throws an exception as follows.

java.lang.NullPointerException

       at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)

       at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)

       at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)

       at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

       at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

       at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)

       at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)

       at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)

       at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)

       at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

       at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

       at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)

       at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)

       at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)

       at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)

       at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)

       at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)

       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

       at java.lang.Thread.run(Thread.java:748)

.

 

I found this method , record is null so the job threw an exception.why map 
can’t return null ? is this a bug?
    protected <X> void pushToOperator(StreamRecord<X> record) {
      try {
         // we know that the given outputTag matches our OutputTag so the record
         // must be of the type that our operator (and Serializer) expects.
         @SuppressWarnings("unchecked")
         StreamRecord<T> castRecord = (StreamRecord<T>) record;

         numRecordsIn.inc();
         StreamRecord<T> copy = 
castRecord.copy(serializer.copy(castRecord.getValue()));
         operator.setKeyContextElement1(copy);
         operator.processElement(copy);
      } catch (ClassCastException e) {
         if (outputTag != null) {
            // Enrich error message
            ClassCastException replace = new ClassCastException(
               String.format(
                  "%s. Failed to push OutputTag with id '%s' to operator. " +
                     "This can occur when multiple OutputTags with different 
types " +
                     "but identical names are being used.",
                  e.getMessage(),
                  outputTag.getId()));

            throw new ExceptionInChainedOperatorException(replace);
         } else {
            throw new ExceptionInChainedOperatorException(e);
         }
      } catch (Exception e) {
         throw new ExceptionInChainedOperatorException(e);
      }

   }
}
 
 

 


Reply via email to