Hi, Can I work on the issue with TupleSerializer or is someone working on it?
On Mon, Jun 15, 2015 at 11:20 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > the reason why this doesn't work is that the TupleSerializer cannot deal > with null values: > > @Test > def testAggregationWithNull(): Unit = { > > val env = ExecutionEnvironment.getExecutionEnvironment > val table = env.fromElements[(Integer, String)]( > (123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable > > val total = table.select('_1.sum).collect().head.productElement(0) > assertEquals(total, 702) > } > > it would have to modified in a similar way to the PojoSerializer and > RowSerializer. You could either leave the tests as they are now in you pull > request or also modify the TupleSerializer. Both seem fine to me. > > Cheers, > > Aljoscha > > > On Sun, 14 Jun 2015 at 20:28 Shiti Saxena <ssaxena....@gmail.com> wrote: > > Hi, >> >> Re-writing the test in the following manner works. But I am not sure if >> this is the correct way. >> >> def testAggregationWithNull(): Unit = { >> >> val env = ExecutionEnvironment.getExecutionEnvironment >> val dataSet = env.fromElements[(Integer, String)]((123, "a"), (234, >> "b"), (345, "c"), (0, "d")) >> >> implicit val rowInfo: TypeInformation[Row] = new RowTypeInfo( >> Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), >> Seq("id", "name")) >> >> val rowDataSet = dataSet.map { >> entry => >> val row = new Row(2) >> val amount = if(entry._1<100) null else entry._1 >> row.setField(0, amount) >> row.setField(1, entry._2) >> row >> } >> >> val total = >> rowDataSet.toTable.select('id.sum).collect().head.productElement(0) >> assertEquals(total, 702) >> } >> >> >> >> On Sun, Jun 14, 2015 at 11:42 PM, Shiti Saxena <ssaxena....@gmail.com> >> wrote: >> >>> Hi, >>> >>> For >>> >>> val table = env.fromElements[(Integer, String)]((123, "a"), (234, "b"), >>> (345, "c"), (null, "d")).toTable >>> >>> I get the following error, >>> >>> Error translating node 'Data Source "at >>> org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505) >>> (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE [[ >>> GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties >>> [ordering=null, grouped=null, unique=null] ]]': null >>> org.apache.flink.optimizer.CompilerException: Error translating node >>> 'Data Source "at >>> org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505) >>> (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE [[ >>> GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties >>> [ordering=null, grouped=null, unique=null] ]]': null >>> at >>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360) >>> at >>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103) >>> at >>> org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:87) >>> at >>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) >>> at >>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) >>> at >>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) >>> at >>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) >>> at >>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) >>> at >>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) >>> at >>> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127) >>> at >>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170) >>> at >>> org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:52) >>> at >>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) >>> at >>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576) >>> at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544) >>> at >>> org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:606) >>> at >>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) >>> at >>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) >>> at >>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) >>> at >>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) >>> at >>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) >>> at >>> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) >>> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) >>> at org.junit.rules.RunRules.evaluate(RunRules.java:20) >>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) >>> at >>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) >>> at >>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) >>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) >>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) >>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) >>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) >>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) >>> at org.junit.runners.ParentRunner.run(ParentRunner.java:309) >>> at org.junit.runners.Suite.runChild(Suite.java:127) >>> at org.junit.runners.Suite.runChild(Suite.java:26) >>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) >>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) >>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) >>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) >>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) >>> at >>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) >>> at >>> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) >>> at org.junit.runners.ParentRunner.run(ParentRunner.java:309) >>> at org.junit.runner.JUnitCore.run(JUnitCore.java:160) >>> at >>> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78) >>> at >>> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212) >>> at >>> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:606) >>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) >>> Caused by: java.lang.NullPointerException >>> at >>> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63) >>> at >>> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27) >>> at >>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89) >>> at >>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29) >>> at org.apache.flink.api.java.io >>> .CollectionInputFormat.writeObject(CollectionInputFormat.java:88) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:606) >>> at >>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) >>> at >>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) >>> at >>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) >>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) >>> at >>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541) >>> at >>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506) >>> at >>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) >>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) >>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) >>> at >>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:314) >>> at >>> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:268) >>> at >>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273) >>> at >>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createDataSourceVertex(JobGraphGenerator.java:853) >>> at >>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:260) >>> ... 55 more >>> >>> >>> Does this mean that the collect method is being called before doing the >>> aggregation? Is this because base serializers do not handle null values >>> like POJOSerializer? And is that why fromCollection does not support >>> collections with null values? >>> >>> Or I could write the test using a file load if thats alright. >>> >>> >>> On Sun, Jun 14, 2015 at 11:11 PM, Aljoscha Krettek <aljos...@apache.org> >>> wrote: >>> >>>> Hi, >>>> sorry, my mail client sent before I was done. >>>> >>>> I think the problem is that the Scala compiler derives a wrong type for >>>> this statement: >>>> val table = env.fromElements((123, "a"), (234, "b"), (345, "c"), (null, >>>> "d")).toTable >>>> >>>> Because of the null value it derives (Any, String) as the type if you >>>> do it like this, I think it should work: >>>> val table = env.fromElements[(Integer, String)]((123, "a"), (234, "b"), >>>> (345, "c"), (null, "d")).toTable >>>> >>>> I used Integer instead of Int because Scala will complain that null is >>>> not a valid value for Int otherwise. >>>> >>>> Cheers, >>>> Aljoscha >>>> >>>> >>>> On Sun, 14 Jun 2015 at 19:34 Aljoscha Krettek <aljos...@apache.org> >>>> wrote: >>>> >>>>> Hi, >>>>> I think the problem is that the Scala compiler derives a wrong type >>>>> for this statement: >>>>> >>>>> >>>>> >>>>> On Sun, 14 Jun 2015 at 18:28 Shiti Saxena <ssaxena....@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Aljoscha, >>>>>> >>>>>> I created the issue FLINK-2210 >>>>>> <https://issues.apache.org/jira/browse/FLINK-2210> for aggregate on >>>>>> null. I made changes to ExpressionAggregateFunction to handle ignore null >>>>>> values. But I am unable to create a Table with null values in tests. >>>>>> >>>>>> The code I used is, >>>>>> >>>>>> def testAggregationWithNull(): Unit = { >>>>>> >>>>>> val env = ExecutionEnvironment.getExecutionEnvironment >>>>>> val table = env.fromElements((123, "a"), (234, "b"), (345, "c"), >>>>>> (null, "d")).toTable >>>>>> >>>>>> val total = table.select('_1.sum).collect().head.productElement(0) >>>>>> assertEquals(total, 702) >>>>>> } >>>>>> >>>>>> and the error i get is, >>>>>> >>>>>> org.apache.flink.api.table.ExpressionException: Invalid expression >>>>>> "('_1).sum": Unsupported type GenericType<java.lang.Object> for >>>>>> aggregation >>>>>> ('_1).sum. Only numeric data types supported. >>>>>> at >>>>>> org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:50) >>>>>> at >>>>>> org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:31) >>>>>> at >>>>>> org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:34) >>>>>> at >>>>>> org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:31) >>>>>> at scala.collection.immutable.List.foreach(List.scala:318) >>>>>> at >>>>>> org.apache.flink.api.table.trees.Analyzer.analyze(Analyzer.scala:31) >>>>>> at org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59) >>>>>> at org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59) >>>>>> at >>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>>>>> at >>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>>>>> at >>>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>>>>> at >>>>>> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) >>>>>> at >>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >>>>>> at scala.collection.AbstractTraversable.map(Traversable.scala:105) >>>>>> at org.apache.flink.api.table.Table.select(Table.scala:59) >>>>>> at >>>>>> org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135) >>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>> at >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>>>> at >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> at >>>>>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) >>>>>> at >>>>>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) >>>>>> at >>>>>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) >>>>>> at >>>>>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) >>>>>> at >>>>>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) >>>>>> at >>>>>> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) >>>>>> at >>>>>> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) >>>>>> at org.junit.rules.RunRules.evaluate(RunRules.java:20) >>>>>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) >>>>>> at >>>>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) >>>>>> at >>>>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) >>>>>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) >>>>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) >>>>>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) >>>>>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) >>>>>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) >>>>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:309) >>>>>> at org.junit.runners.Suite.runChild(Suite.java:127) >>>>>> at org.junit.runners.Suite.runChild(Suite.java:26) >>>>>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) >>>>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) >>>>>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) >>>>>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) >>>>>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) >>>>>> at >>>>>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) >>>>>> at >>>>>> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) >>>>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:309) >>>>>> at org.junit.runner.JUnitCore.run(JUnitCore.java:160) >>>>>> at >>>>>> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78) >>>>>> at >>>>>> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212) >>>>>> at >>>>>> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68) >>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>> at >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>>>> at >>>>>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) >>>>>> >>>>>> >>>>>> The ExecutionEnvironment.fromCollection method also throws an error >>>>>> when the collection contains a null. >>>>>> >>>>>> Could you please point out what I am doing wrong? How do we create a >>>>>> Table with null values? >>>>>> >>>>>> In our application, we load a file and transform each line into a Row >>>>>> resulting in a DataSet[Row]. This DataSet[Row] is then converted into >>>>>> Table. Should I use the same approach for the test case? >>>>>> >>>>>> >>>>>> Thanks, >>>>>> Shiti >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Sun, Jun 14, 2015 at 4:10 PM, Shiti Saxena <ssaxena....@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> I'll do the fix >>>>>>> >>>>>>> On Sun, Jun 14, 2015 at 12:42 AM, Aljoscha Krettek < >>>>>>> aljos...@apache.org> wrote: >>>>>>> >>>>>>>> I merged your PR for the RowSerializer. Teaching the aggregators to >>>>>>>> deal with null values should be a very simple fix in >>>>>>>> ExpressionAggregateFunction.scala. There it is simply always >>>>>>>> aggregating >>>>>>>> the values without checking whether they are null. If you want you can >>>>>>>> also >>>>>>>> fix that or I can quickly fix it. >>>>>>>> >>>>>>>> On Thu, 11 Jun 2015 at 10:40 Aljoscha Krettek <aljos...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Cool, good to hear. >>>>>>>>> >>>>>>>>> The PojoSerializer already handles null fields. The RowSerializer >>>>>>>>> can be modified in pretty much the same way. So you should start by >>>>>>>>> looking >>>>>>>>> at the copy()/serialize()/deserialize() methods of PojoSerializer and >>>>>>>>> then >>>>>>>>> modify RowSerializer in a similar way. >>>>>>>>> >>>>>>>>> You can also send me a private mail if you want more in-depth >>>>>>>>> explanations. >>>>>>>>> >>>>>>>>> On Thu, 11 Jun 2015 at 09:33 Till Rohrmann <trohrm...@apache.org> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Shiti, >>>>>>>>>> >>>>>>>>>> here is the issue [1]. >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Till >>>>>>>>>> >>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-2203 >>>>>>>>>> >>>>>>>>>> On Thu, Jun 11, 2015 at 8:42 AM Shiti Saxena < >>>>>>>>>> ssaxena....@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Aljoscha, >>>>>>>>>>> >>>>>>>>>>> Could you please point me to the JIRA tickets? If you could >>>>>>>>>>> provide some guidance on how to resolve these, I will work on them >>>>>>>>>>> and >>>>>>>>>>> raise a pull-request. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Shiti >>>>>>>>>>> >>>>>>>>>>> On Thu, Jun 11, 2015 at 11:31 AM, Aljoscha Krettek < >>>>>>>>>>> aljos...@apache.org> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> yes, I think the problem is that the RowSerializer does not >>>>>>>>>>>> support null-values. I think we can add support for this, I will >>>>>>>>>>>> open a >>>>>>>>>>>> Jira issue. >>>>>>>>>>>> >>>>>>>>>>>> Another problem I then see is that the aggregations can not >>>>>>>>>>>> properly deal with null-values. This would need separate support. >>>>>>>>>>>> >>>>>>>>>>>> Regards, >>>>>>>>>>>> Aljoscha >>>>>>>>>>>> >>>>>>>>>>>> On Thu, 11 Jun 2015 at 06:41 Shiti Saxena < >>>>>>>>>>>> ssaxena....@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi, >>>>>>>>>>>>> >>>>>>>>>>>>> In our project, we are using the Flink Table API and are >>>>>>>>>>>>> facing the following issues, >>>>>>>>>>>>> >>>>>>>>>>>>> We load data from a CSV file and create a DataSet[Row]. The >>>>>>>>>>>>> CSV file can also have invalid entries in some of the fields >>>>>>>>>>>>> which we >>>>>>>>>>>>> replace with null when building the DataSet[Row]. >>>>>>>>>>>>> >>>>>>>>>>>>> This DataSet[Row] is later on transformed to Table whenever >>>>>>>>>>>>> required and specific operation such as select or aggregate, etc >>>>>>>>>>>>> are >>>>>>>>>>>>> performed. >>>>>>>>>>>>> >>>>>>>>>>>>> When a null value is encountered, we get a null pointer >>>>>>>>>>>>> exception and the whole job fails. (We can see this by calling >>>>>>>>>>>>> collect on >>>>>>>>>>>>> the resulting DataSet). >>>>>>>>>>>>> >>>>>>>>>>>>> The error message is similar to, >>>>>>>>>>>>> >>>>>>>>>>>>> Job execution failed. >>>>>>>>>>>>> org.apache.flink.runtime.client.JobExecutionException: Job >>>>>>>>>>>>> execution failed. >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:315) >>>>>>>>>>>>> at >>>>>>>>>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >>>>>>>>>>>>> at >>>>>>>>>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >>>>>>>>>>>>> at >>>>>>>>>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) >>>>>>>>>>>>> at >>>>>>>>>>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) >>>>>>>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94) >>>>>>>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>>>>>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>>>>>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) >>>>>>>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:221) >>>>>>>>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) >>>>>>>>>>>>> at >>>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>>>>>>>>> at >>>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>>>>>>>>> at >>>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>>>>>>>>> at >>>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>>>>>>>>> Caused by: java.lang.NullPointerException >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:80) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:28) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51) >>>>>>>>>>>>> at org.apache.flink.runtime.io >>>>>>>>>>>>> .network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76) >>>>>>>>>>>>> at org.apache.flink.runtime.io >>>>>>>>>>>>> .network.api.writer.RecordWriter.emit(RecordWriter.java:83) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177) >>>>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:724) >>>>>>>>>>>>> >>>>>>>>>>>>> Could this be because the RowSerializer does not support null >>>>>>>>>>>>> values? (Similar to Flink-629 >>>>>>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-629> ) >>>>>>>>>>>>> >>>>>>>>>>>>> Currently, to overcome this issue, we are ignoring all the >>>>>>>>>>>>> rows which may have null values. For example, we have a method >>>>>>>>>>>>> cleanData >>>>>>>>>>>>> defined as, >>>>>>>>>>>>> >>>>>>>>>>>>> def cleanData(table:Table, relevantColumns:Seq[String]):Table >>>>>>>>>>>>> = { >>>>>>>>>>>>> val whereClause: String = relevantColumns.map{ >>>>>>>>>>>>> cName=> >>>>>>>>>>>>> s"$cName.isNotNull" >>>>>>>>>>>>> }.mkString(" && ") >>>>>>>>>>>>> >>>>>>>>>>>>> val result :Table = >>>>>>>>>>>>> table.select(relevantColumns.mkString(",")).where(whereClause) >>>>>>>>>>>>> result >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> Before operating on any Table, we use this method and then >>>>>>>>>>>>> continue with task. >>>>>>>>>>>>> >>>>>>>>>>>>> Is this the right way to handle this? If not please let me >>>>>>>>>>>>> know how to go about it. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Shiti >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>> >>>>>> >>> >>