Sure! The problem is that Dataset API does an implicit conversion to Tuples
during chaining and I didn't found any documentation about this (actually I
was  pleasantly surprised by the fact that the Table API were supporting
aggregates on null values..).

Here it is: https://issues.apache.org/jira/browse/FLINK-10947

Thanks for the reply,
Flavio

On Tue, Nov 20, 2018 at 11:33 AM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Flavio,
>
> Whether groupBy with null values works or not depends on the type of the
> key, or more specifically on the TypeComparator and TypeSerializer that are
> used to serialize, compare, and hash the key type.
> The processing engine supports null values If the comparator and
> serializer can handle null input values.
>
> Flink SQL wraps keys in the Row type and the corresponding serializer /
> comparator can handle null fields.
> If you use Row in DataSet / DataStream programs, null values are supported
> as well.
>
> I think it would be good to discuss the handling of null keys on the
> documentation about data types [1] and link to that from operators that
> require keys.
> Would you mind creating a Jira issue for that?
>
> Thank you,
> Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/types_serialization.html
>
> Am Mo., 19. Nov. 2018 um 12:31 Uhr schrieb Flavio Pompermaier <
> pomperma...@okkam.it>:
>
>> Hi to all,
>> we wanted to do a group by on elements that can contains null values and
>> we discovered that Table API support this while Dataset API does not.
>> Is this documented somehwere on the Flink site?
>>
>> Best,
>> Flavio
>>
>> -------------------------------------------------------
>>
>> PS: you can test this with the following main:
>>
>> public static void main(String[] args) throws Exception {
>>     final ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>>     final BatchTableEnvironment btEnv =
>> TableEnvironment.getTableEnvironment(env);
>>     final DataSet<String> testDs = env
>>         .fromElements("test", "test", "test2", "null", "null", "test3")
>>         .map(x -> "null".equals(x) ? null : x);
>>
>>     boolean testDatasetApi = true;
>>     if (testDatasetApi) {
>>       testDs.groupBy(x -> x).reduceGroup(new GroupReduceFunction<String,
>> Integer>() {
>>
>>         @Override
>>         public void reduce(Iterable<String> values, Collector<Integer>
>> out) throws Exception {
>>           int cnt = 0;
>>           for (String value : values) {
>>             cnt++;
>>           }
>>           out.collect(cnt);
>>         }
>>       }).print();
>>     }
>>
>>     btEnv.registerDataSet("TEST", testDs, "field1");
>>     Table res = btEnv.sqlQuery("SELECT field1, count(*) as cnt FROM TEST
>> GROUP BY field1");
>>     DataSet<Row> result = btEnv.toDataSet(res,
>>         new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
>> BasicTypeInfo.LONG_TYPE_INFO));
>>     result.print();
>>   }
>>
>

Reply via email to