Yes, what I meant was to have a single bit mask that is written before all the fields are written. Then, for example, 1011 would mean that field 1, 2, and 4 are non-null while field 3 is null.
On Tue, 16 Jun 2015 at 10:24 Shiti Saxena <ssaxena....@gmail.com> wrote: > Can we use 0(false) and 1(true)? > > On Tue, Jun 16, 2015 at 1:32 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> One more thing, it would be good if the TupleSerializer didn't write a >> boolean for every field. A single integer could be used where one bit >> specifies if a given field is null or not. (Maybe we should also add this >> to the RowSerializer in the future.) >> >> On Tue, 16 Jun 2015 at 07:30 Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> I think you can work on it. By the way, there are actually two >>> serializers. For Scala, CaseClassSerializer is responsible for tuples as >>> well. In Java, TupleSerializer is responsible for, well, Tuples. >>> >>> On Tue, 16 Jun 2015 at 06:25 Shiti Saxena <ssaxena....@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> Can I work on the issue with TupleSerializer or is someone working on >>>> it? >>>> >>>> On Mon, Jun 15, 2015 at 11:20 AM, Aljoscha Krettek <aljos...@apache.org >>>> > wrote: >>>> >>>>> Hi, >>>>> the reason why this doesn't work is that the TupleSerializer cannot >>>>> deal with null values: >>>>> >>>>> @Test >>>>> def testAggregationWithNull(): Unit = { >>>>> >>>>> val env = ExecutionEnvironment.getExecutionEnvironment >>>>> val table = env.fromElements[(Integer, String)]( >>>>> (123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable >>>>> >>>>> val total = table.select('_1.sum).collect().head.productElement(0) >>>>> assertEquals(total, 702) >>>>> } >>>>> >>>>> it would have to modified in a similar way to the PojoSerializer and >>>>> RowSerializer. You could either leave the tests as they are now in you >>>>> pull request or also modify the TupleSerializer. Both seem fine to me. >>>>> >>>>> Cheers, >>>>> >>>>> Aljoscha >>>>> >>>>> >>>>> On Sun, 14 Jun 2015 at 20:28 Shiti Saxena <ssaxena....@gmail.com> wrote: >>>>> >>>>> Hi, >>>>>> >>>>>> Re-writing the test in the following manner works. But I am not sure >>>>>> if this is the correct way. >>>>>> >>>>>> def testAggregationWithNull(): Unit = { >>>>>> >>>>>> val env = ExecutionEnvironment.getExecutionEnvironment >>>>>> val dataSet = env.fromElements[(Integer, String)]((123, "a"), >>>>>> (234, "b"), (345, "c"), (0, "d")) >>>>>> >>>>>> implicit val rowInfo: TypeInformation[Row] = new RowTypeInfo( >>>>>> Seq(BasicTypeInfo.INT_TYPE_INFO, >>>>>> BasicTypeInfo.STRING_TYPE_INFO), Seq("id", "name")) >>>>>> >>>>>> val rowDataSet = dataSet.map { >>>>>> entry => >>>>>> val row = new Row(2) >>>>>> val amount = if(entry._1<100) null else entry._1 >>>>>> row.setField(0, amount) >>>>>> row.setField(1, entry._2) >>>>>> row >>>>>> } >>>>>> >>>>>> val total = >>>>>> rowDataSet.toTable.select('id.sum).collect().head.productElement(0) >>>>>> assertEquals(total, 702) >>>>>> } >>>>>> >>>>>> >>>>>> >>>>>> On Sun, Jun 14, 2015 at 11:42 PM, Shiti Saxena <ssaxena....@gmail.com >>>>>> > wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> For >>>>>>> >>>>>>> val table = env.fromElements[(Integer, String)]((123, "a"), (234, >>>>>>> "b"), (345, "c"), (null, "d")).toTable >>>>>>> >>>>>>> I get the following error, >>>>>>> >>>>>>> Error translating node 'Data Source "at >>>>>>> org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505) >>>>>>> (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE [[ >>>>>>> GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties >>>>>>> [ordering=null, grouped=null, unique=null] ]]': null >>>>>>> org.apache.flink.optimizer.CompilerException: Error translating node >>>>>>> 'Data Source "at >>>>>>> org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505) >>>>>>> (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE [[ >>>>>>> GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties >>>>>>> [ordering=null, grouped=null, unique=null] ]]': null >>>>>>> at >>>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360) >>>>>>> at >>>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103) >>>>>>> at >>>>>>> org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:87) >>>>>>> at >>>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) >>>>>>> at >>>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) >>>>>>> at >>>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) >>>>>>> at >>>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) >>>>>>> at >>>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) >>>>>>> at >>>>>>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) >>>>>>> at >>>>>>> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127) >>>>>>> at >>>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170) >>>>>>> at >>>>>>> org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:52) >>>>>>> at >>>>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) >>>>>>> at >>>>>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576) >>>>>>> at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544) >>>>>>> at >>>>>>> org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135) >>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>> at >>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>>>>> at >>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>>>>> at >>>>>>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) >>>>>>> at >>>>>>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) >>>>>>> at >>>>>>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) >>>>>>> at >>>>>>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) >>>>>>> at >>>>>>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) >>>>>>> at >>>>>>> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) >>>>>>> at >>>>>>> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) >>>>>>> at org.junit.rules.RunRules.evaluate(RunRules.java:20) >>>>>>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) >>>>>>> at >>>>>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) >>>>>>> at >>>>>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) >>>>>>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) >>>>>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) >>>>>>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) >>>>>>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) >>>>>>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) >>>>>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:309) >>>>>>> at org.junit.runners.Suite.runChild(Suite.java:127) >>>>>>> at org.junit.runners.Suite.runChild(Suite.java:26) >>>>>>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) >>>>>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) >>>>>>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) >>>>>>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) >>>>>>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) >>>>>>> at >>>>>>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) >>>>>>> at >>>>>>> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) >>>>>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:309) >>>>>>> at org.junit.runner.JUnitCore.run(JUnitCore.java:160) >>>>>>> at >>>>>>> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78) >>>>>>> at >>>>>>> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212) >>>>>>> at >>>>>>> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68) >>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>> at >>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>>>>> at >>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>>>>> at >>>>>>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) >>>>>>> Caused by: java.lang.NullPointerException >>>>>>> at >>>>>>> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63) >>>>>>> at >>>>>>> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27) >>>>>>> at >>>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89) >>>>>>> at >>>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29) >>>>>>> at org.apache.flink.api.java.io >>>>>>> .CollectionInputFormat.writeObject(CollectionInputFormat.java:88) >>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>> at >>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>>>>> at >>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>>>>> at >>>>>>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) >>>>>>> at >>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) >>>>>>> at >>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) >>>>>>> at >>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) >>>>>>> at >>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541) >>>>>>> at >>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506) >>>>>>> at >>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) >>>>>>> at >>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) >>>>>>> at >>>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) >>>>>>> at >>>>>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:314) >>>>>>> at >>>>>>> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:268) >>>>>>> at >>>>>>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273) >>>>>>> at >>>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createDataSourceVertex(JobGraphGenerator.java:853) >>>>>>> at >>>>>>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:260) >>>>>>> ... 55 more >>>>>>> >>>>>>> >>>>>>> Does this mean that the collect method is being called before doing >>>>>>> the aggregation? Is this because base serializers do not handle null >>>>>>> values >>>>>>> like POJOSerializer? And is that why fromCollection does not support >>>>>>> collections with null values? >>>>>>> >>>>>>> Or I could write the test using a file load if thats alright. >>>>>>> >>>>>>> >>>>>>> On Sun, Jun 14, 2015 at 11:11 PM, Aljoscha Krettek < >>>>>>> aljos...@apache.org> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> sorry, my mail client sent before I was done. >>>>>>>> >>>>>>>> I think the problem is that the Scala compiler derives a wrong type >>>>>>>> for this statement: >>>>>>>> val table = env.fromElements((123, "a"), (234, "b"), (345, "c"), >>>>>>>> (null, "d")).toTable >>>>>>>> >>>>>>>> Because of the null value it derives (Any, String) as the type if >>>>>>>> you do it like this, I think it should work: >>>>>>>> val table = env.fromElements[(Integer, String)]((123, "a"), (234, >>>>>>>> "b"), (345, "c"), (null, "d")).toTable >>>>>>>> >>>>>>>> I used Integer instead of Int because Scala will complain that null >>>>>>>> is not a valid value for Int otherwise. >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Aljoscha >>>>>>>> >>>>>>>> >>>>>>>> On Sun, 14 Jun 2015 at 19:34 Aljoscha Krettek <aljos...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> I think the problem is that the Scala compiler derives a wrong >>>>>>>>> type for this statement: >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Sun, 14 Jun 2015 at 18:28 Shiti Saxena <ssaxena....@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Aljoscha, >>>>>>>>>> >>>>>>>>>> I created the issue FLINK-2210 >>>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-2210> for aggregate >>>>>>>>>> on null. I made changes to ExpressionAggregateFunction to handle >>>>>>>>>> ignore >>>>>>>>>> null values. But I am unable to create a Table with null values in >>>>>>>>>> tests. >>>>>>>>>> >>>>>>>>>> The code I used is, >>>>>>>>>> >>>>>>>>>> def testAggregationWithNull(): Unit = { >>>>>>>>>> >>>>>>>>>> val env = ExecutionEnvironment.getExecutionEnvironment >>>>>>>>>> val table = env.fromElements((123, "a"), (234, "b"), (345, >>>>>>>>>> "c"), (null, "d")).toTable >>>>>>>>>> >>>>>>>>>> val total = >>>>>>>>>> table.select('_1.sum).collect().head.productElement(0) >>>>>>>>>> assertEquals(total, 702) >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> and the error i get is, >>>>>>>>>> >>>>>>>>>> org.apache.flink.api.table.ExpressionException: Invalid >>>>>>>>>> expression "('_1).sum": Unsupported type >>>>>>>>>> GenericType<java.lang.Object> for >>>>>>>>>> aggregation ('_1).sum. Only numeric data types supported. >>>>>>>>>> at >>>>>>>>>> org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:50) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:31) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:34) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:31) >>>>>>>>>> at scala.collection.immutable.List.foreach(List.scala:318) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.api.table.trees.Analyzer.analyze(Analyzer.scala:31) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59) >>>>>>>>>> at >>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>>>>>>>>> at >>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>>>>>>>>> at >>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>>>>>>>>> at >>>>>>>>>> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) >>>>>>>>>> at >>>>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >>>>>>>>>> at scala.collection.AbstractTraversable.map(Traversable.scala:105) >>>>>>>>>> at org.apache.flink.api.table.Table.select(Table.scala:59) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135) >>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>>>>> at >>>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>>>>>>>> at >>>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>>>>> at >>>>>>>>>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) >>>>>>>>>> at >>>>>>>>>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) >>>>>>>>>> at >>>>>>>>>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) >>>>>>>>>> at >>>>>>>>>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) >>>>>>>>>> at >>>>>>>>>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) >>>>>>>>>> at >>>>>>>>>> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) >>>>>>>>>> at >>>>>>>>>> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) >>>>>>>>>> at org.junit.rules.RunRules.evaluate(RunRules.java:20) >>>>>>>>>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) >>>>>>>>>> at >>>>>>>>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) >>>>>>>>>> at >>>>>>>>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) >>>>>>>>>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) >>>>>>>>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) >>>>>>>>>> at >>>>>>>>>> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) >>>>>>>>>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) >>>>>>>>>> at >>>>>>>>>> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) >>>>>>>>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:309) >>>>>>>>>> at org.junit.runners.Suite.runChild(Suite.java:127) >>>>>>>>>> at org.junit.runners.Suite.runChild(Suite.java:26) >>>>>>>>>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) >>>>>>>>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) >>>>>>>>>> at >>>>>>>>>> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) >>>>>>>>>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) >>>>>>>>>> at >>>>>>>>>> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) >>>>>>>>>> at >>>>>>>>>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) >>>>>>>>>> at >>>>>>>>>> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) >>>>>>>>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:309) >>>>>>>>>> at org.junit.runner.JUnitCore.run(JUnitCore.java:160) >>>>>>>>>> at >>>>>>>>>> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78) >>>>>>>>>> at >>>>>>>>>> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212) >>>>>>>>>> at >>>>>>>>>> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68) >>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>>>>> at >>>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>>>>>>>> at >>>>>>>>>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> The ExecutionEnvironment.fromCollection method also throws an >>>>>>>>>> error when the collection contains a null. >>>>>>>>>> >>>>>>>>>> Could you please point out what I am doing wrong? How do we >>>>>>>>>> create a Table with null values? >>>>>>>>>> >>>>>>>>>> In our application, we load a file and transform each line into a >>>>>>>>>> Row resulting in a DataSet[Row]. This DataSet[Row] is then converted >>>>>>>>>> into >>>>>>>>>> Table. Should I use the same approach for the test case? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Shiti >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Sun, Jun 14, 2015 at 4:10 PM, Shiti Saxena < >>>>>>>>>> ssaxena....@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> I'll do the fix >>>>>>>>>>> >>>>>>>>>>> On Sun, Jun 14, 2015 at 12:42 AM, Aljoscha Krettek < >>>>>>>>>>> aljos...@apache.org> wrote: >>>>>>>>>>> >>>>>>>>>>>> I merged your PR for the RowSerializer. Teaching the >>>>>>>>>>>> aggregators to deal with null values should be a very simple fix in >>>>>>>>>>>> ExpressionAggregateFunction.scala. There it is simply always >>>>>>>>>>>> aggregating >>>>>>>>>>>> the values without checking whether they are null. If you want you >>>>>>>>>>>> can also >>>>>>>>>>>> fix that or I can quickly fix it. >>>>>>>>>>>> >>>>>>>>>>>> On Thu, 11 Jun 2015 at 10:40 Aljoscha Krettek < >>>>>>>>>>>> aljos...@apache.org> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Cool, good to hear. >>>>>>>>>>>>> >>>>>>>>>>>>> The PojoSerializer already handles null fields. The >>>>>>>>>>>>> RowSerializer can be modified in pretty much the same way. So you >>>>>>>>>>>>> should >>>>>>>>>>>>> start by looking at the copy()/serialize()/deserialize() methods >>>>>>>>>>>>> of >>>>>>>>>>>>> PojoSerializer and then modify RowSerializer in a similar way. >>>>>>>>>>>>> >>>>>>>>>>>>> You can also send me a private mail if you want more in-depth >>>>>>>>>>>>> explanations. >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, 11 Jun 2015 at 09:33 Till Rohrmann < >>>>>>>>>>>>> trohrm...@apache.org> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Shiti, >>>>>>>>>>>>>> >>>>>>>>>>>>>> here is the issue [1]. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>> Till >>>>>>>>>>>>>> >>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-2203 >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Jun 11, 2015 at 8:42 AM Shiti Saxena < >>>>>>>>>>>>>> ssaxena....@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Aljoscha, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Could you please point me to the JIRA tickets? If you could >>>>>>>>>>>>>>> provide some guidance on how to resolve these, I will work on >>>>>>>>>>>>>>> them and >>>>>>>>>>>>>>> raise a pull-request. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> Shiti >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, Jun 11, 2015 at 11:31 AM, Aljoscha Krettek < >>>>>>>>>>>>>>> aljos...@apache.org> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>> yes, I think the problem is that the RowSerializer does not >>>>>>>>>>>>>>>> support null-values. I think we can add support for this, I >>>>>>>>>>>>>>>> will open a >>>>>>>>>>>>>>>> Jira issue. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Another problem I then see is that the aggregations can not >>>>>>>>>>>>>>>> properly deal with null-values. This would need separate >>>>>>>>>>>>>>>> support. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>> Aljoscha >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Thu, 11 Jun 2015 at 06:41 Shiti Saxena < >>>>>>>>>>>>>>>> ssaxena....@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> In our project, we are using the Flink Table API and are >>>>>>>>>>>>>>>>> facing the following issues, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> We load data from a CSV file and create a DataSet[Row]. >>>>>>>>>>>>>>>>> The CSV file can also have invalid entries in some of the >>>>>>>>>>>>>>>>> fields which we >>>>>>>>>>>>>>>>> replace with null when building the DataSet[Row]. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> This DataSet[Row] is later on transformed to Table >>>>>>>>>>>>>>>>> whenever required and specific operation such as select or >>>>>>>>>>>>>>>>> aggregate, etc >>>>>>>>>>>>>>>>> are performed. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> When a null value is encountered, we get a null pointer >>>>>>>>>>>>>>>>> exception and the whole job fails. (We can see this by >>>>>>>>>>>>>>>>> calling collect on >>>>>>>>>>>>>>>>> the resulting DataSet). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The error message is similar to, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Job execution failed. >>>>>>>>>>>>>>>>> org.apache.flink.runtime.client.JobExecutionException: Job >>>>>>>>>>>>>>>>> execution failed. >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:315) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) >>>>>>>>>>>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94) >>>>>>>>>>>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>>>>>>>>>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>>>>>>>>>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) >>>>>>>>>>>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:221) >>>>>>>>>>>>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>>>>>>>>>>>>> Caused by: java.lang.NullPointerException >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:80) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:28) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51) >>>>>>>>>>>>>>>>> at org.apache.flink.runtime.io >>>>>>>>>>>>>>>>> .network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76) >>>>>>>>>>>>>>>>> at org.apache.flink.runtime.io >>>>>>>>>>>>>>>>> .network.api.writer.RecordWriter.emit(RecordWriter.java:83) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:724) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Could this be because the RowSerializer does not support >>>>>>>>>>>>>>>>> null values? (Similar to Flink-629 >>>>>>>>>>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-629> ) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Currently, to overcome this issue, we are ignoring all the >>>>>>>>>>>>>>>>> rows which may have null values. For example, we have a >>>>>>>>>>>>>>>>> method cleanData >>>>>>>>>>>>>>>>> defined as, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> def cleanData(table:Table, >>>>>>>>>>>>>>>>> relevantColumns:Seq[String]):Table = { >>>>>>>>>>>>>>>>> val whereClause: String = relevantColumns.map{ >>>>>>>>>>>>>>>>> cName=> >>>>>>>>>>>>>>>>> s"$cName.isNotNull" >>>>>>>>>>>>>>>>> }.mkString(" && ") >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> val result :Table = >>>>>>>>>>>>>>>>> table.select(relevantColumns.mkString(",")).where(whereClause) >>>>>>>>>>>>>>>>> result >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Before operating on any Table, we use this method and then >>>>>>>>>>>>>>>>> continue with task. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Is this the right way to handle this? If not please let me >>>>>>>>>>>>>>>>> know how to go about it. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>> Shiti >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>> >>>>>> >>>> >