Re: Help with Flink experimental Table API

2015-06-16 Thread Aljoscha Krettek
Yes, what I meant was to have a single bit mask that is written before all the fields are written. Then, for example, 1011 would mean that field 1, 2, and 4 are non-null while field 3 is null. On Tue, 16 Jun 2015 at 10:24 Shiti Saxena wrote: > Can we use 0(false) and 1(true)? > > On Tue, Jun 16,

Re: Help with Flink experimental Table API

2015-06-16 Thread Shiti Saxena
Can we use 0(false) and 1(true)? On Tue, Jun 16, 2015 at 1:32 PM, Aljoscha Krettek wrote: > One more thing, it would be good if the TupleSerializer didn't write a > boolean for every field. A single integer could be used where one bit > specifies if a given field is null or not. (Maybe we should

Re: Help with Flink experimental Table API

2015-06-16 Thread Aljoscha Krettek
One more thing, it would be good if the TupleSerializer didn't write a boolean for every field. A single integer could be used where one bit specifies if a given field is null or not. (Maybe we should also add this to the RowSerializer in the future.) On Tue, 16 Jun 2015 at 07:30 Aljoscha Krettek

Re: Help with Flink experimental Table API

2015-06-15 Thread Aljoscha Krettek
I think you can work on it. By the way, there are actually two serializers. For Scala, CaseClassSerializer is responsible for tuples as well. In Java, TupleSerializer is responsible for, well, Tuples. On Tue, 16 Jun 2015 at 06:25 Shiti Saxena wrote: > Hi, > > Can I work on the issue with TupleSe

Re: Help with Flink experimental Table API

2015-06-15 Thread Shiti Saxena
Hi, Can I work on the issue with TupleSerializer or is someone working on it? On Mon, Jun 15, 2015 at 11:20 AM, Aljoscha Krettek wrote: > Hi, > the reason why this doesn't work is that the TupleSerializer cannot deal > with null values: > > @Test > def testAggregationWithNull(): Unit = { > > v

Re: Help with Flink experimental Table API

2015-06-14 Thread Aljoscha Krettek
Hi, the reason why this doesn't work is that the TupleSerializer cannot deal with null values: @Test def testAggregationWithNull(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val table = env.fromElements[(Integer, String)]( (123, "a"), (234, "b"), (345, "c"), (null, "d")).

Re: Help with Flink experimental Table API

2015-06-14 Thread Shiti Saxena
Hi, Re-writing the test in the following manner works. But I am not sure if this is the correct way. def testAggregationWithNull(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val dataSet = env.fromElements[(Integer, String)]((123, "a"), (234, "b"), (345, "c"), (0, "d

Re: Help with Flink experimental Table API

2015-06-14 Thread Shiti Saxena
Hi, For val table = env.fromElements[(Integer, String)]((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable I get the following error, Error translating node 'Data Source "at org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505) (org.apache.flink.api.

Re: Help with Flink experimental Table API

2015-06-14 Thread Aljoscha Krettek
Hi, sorry, my mail client sent before I was done. I think the problem is that the Scala compiler derives a wrong type for this statement: val table = env.fromElements((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable Because of the null value it derives (Any, String) as the type if you do

Re: Help with Flink experimental Table API

2015-06-14 Thread Aljoscha Krettek
Hi, I think the problem is that the Scala compiler derives a wrong type for this statement: On Sun, 14 Jun 2015 at 18:28 Shiti Saxena wrote: > Hi Aljoscha, > > I created the issue FLINK-2210 > for aggregate on null. > I made changes to Express

Re: Help with Flink experimental Table API

2015-06-14 Thread Shiti Saxena
Hi Aljoscha, I created the issue FLINK-2210 for aggregate on null. I made changes to ExpressionAggregateFunction to handle ignore null values. But I am unable to create a Table with null values in tests. The code I used is, def testAggregationWi

Re: Help with Flink experimental Table API

2015-06-14 Thread Shiti Saxena
I'll do the fix On Sun, Jun 14, 2015 at 12:42 AM, Aljoscha Krettek wrote: > I merged your PR for the RowSerializer. Teaching the aggregators to deal > with null values should be a very simple fix in > ExpressionAggregateFunction.scala. There it is simply always aggregating > the values without c

Re: Help with Flink experimental Table API

2015-06-13 Thread Aljoscha Krettek
I merged your PR for the RowSerializer. Teaching the aggregators to deal with null values should be a very simple fix in ExpressionAggregateFunction.scala. There it is simply always aggregating the values without checking whether they are null. If you want you can also fix that or I can quickly fix

Re: Help with Flink experimental Table API

2015-06-11 Thread Aljoscha Krettek
Cool, good to hear. The PojoSerializer already handles null fields. The RowSerializer can be modified in pretty much the same way. So you should start by looking at the copy()/serialize()/deserialize() methods of PojoSerializer and then modify RowSerializer in a similar way. You can also send me

Re: Help with Flink experimental Table API

2015-06-11 Thread Till Rohrmann
Hi Shiti, here is the issue [1]. Cheers, Till [1] https://issues.apache.org/jira/browse/FLINK-2203 On Thu, Jun 11, 2015 at 8:42 AM Shiti Saxena wrote: > Hi Aljoscha, > > Could you please point me to the JIRA tickets? If you could provide some > guidance on how to resolve these, I will work on

Re: Help with Flink experimental Table API

2015-06-10 Thread Shiti Saxena
Hi Aljoscha, Could you please point me to the JIRA tickets? If you could provide some guidance on how to resolve these, I will work on them and raise a pull-request. Thanks, Shiti On Thu, Jun 11, 2015 at 11:31 AM, Aljoscha Krettek wrote: > Hi, > yes, I think the problem is that the RowSerializ

Re: Help with Flink experimental Table API

2015-06-10 Thread Aljoscha Krettek
Hi, yes, I think the problem is that the RowSerializer does not support null-values. I think we can add support for this, I will open a Jira issue. Another problem I then see is that the aggregations can not properly deal with null-values. This would need separate support. Regards, Aljoscha On T

Help with Flink experimental Table API

2015-06-10 Thread Shiti Saxena
Hi, In our project, we are using the Flink Table API and are facing the following issues, We load data from a CSV file and create a DataSet[Row]. The CSV file can also have invalid entries in some of the fields which we replace with null when building the DataSet[Row]. This DataSet[Row] is later