Hi Stephan, In the end I decided to specify a default value (e.g. empty string) when a field is null.
On Mon, Aug 29, 2016 at 11:25 AM, Stephan Ewen <se...@apache.org> wrote: > Hi! > > Null is indeed not supported for some basic data types (tuples / case > classes). > > Can you use Option for nullable fields? > > Stephan > > > On Mon, Aug 29, 2016 at 8:04 PM, Jack Huang <jackhu...@mz.com> wrote: > >> Hi all, >> >> It seems like flink does not allow passing case class objects with >> null-valued fields to the next operators. I am getting the following error >> message: >> >> *Caused by: java.lang.RuntimeException: Could not forward element to next >> operator* >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:399) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:381) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) >> at >> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:340) >> at >> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225) >> at >> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:239) >> at java.lang.Thread.run(Thread.java:745)*Caused by: >> java.lang.NullPointerException* >> at >> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:67) >> at >> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:32) >> at >> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:90) >> at >> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:30) >> at >> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:90) >> at >> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:30) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:371) >> ... 9 more >> >> >> This error goes away when I force all objects to not have fields with >> null values. However, null is a valid value in my use case. Is there a way >> to make it work? I am using flink-1.1.1. >> >> >> Thanks, >> Jack >> > >