Hi! Null is indeed not supported for some basic data types (tuples / case classes).
Can you use Option for nullable fields? Stephan On Mon, Aug 29, 2016 at 8:04 PM, Jack Huang <jackhu...@mz.com> wrote: > Hi all, > > It seems like flink does not allow passing case class objects with > null-valued fields to the next operators. I am getting the following error > message: > > *Caused by: java.lang.RuntimeException: Could not forward element to next > operator* > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:399) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:381) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) > at > org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:340) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:239) > at java.lang.Thread.run(Thread.java:745)*Caused by: > java.lang.NullPointerException* > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:67) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:32) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:90) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:30) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:90) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:30) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:371) > ... 9 more > > > This error goes away when I force all objects to not have fields with null > values. However, null is a valid value in my use case. Is there a way to > make it work? I am using flink-1.1.1. > > > Thanks, > Jack >