Sure! The problem is that Dataset API does an implicit conversion to Tuples during chaining and I didn't found any documentation about this (actually I was pleasantly surprised by the fact that the Table API were supporting aggregates on null values..).
Here it is: https://issues.apache.org/jira/browse/FLINK-10947 Thanks for the reply, Flavio On Tue, Nov 20, 2018 at 11:33 AM Fabian Hueske <fhue...@gmail.com> wrote: > Hi Flavio, > > Whether groupBy with null values works or not depends on the type of the > key, or more specifically on the TypeComparator and TypeSerializer that are > used to serialize, compare, and hash the key type. > The processing engine supports null values If the comparator and > serializer can handle null input values. > > Flink SQL wraps keys in the Row type and the corresponding serializer / > comparator can handle null fields. > If you use Row in DataSet / DataStream programs, null values are supported > as well. > > I think it would be good to discuss the handling of null keys on the > documentation about data types [1] and link to that from operators that > require keys. > Would you mind creating a Jira issue for that? > > Thank you, > Fabian > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/types_serialization.html > > Am Mo., 19. Nov. 2018 um 12:31 Uhr schrieb Flavio Pompermaier < > pomperma...@okkam.it>: > >> Hi to all, >> we wanted to do a group by on elements that can contains null values and >> we discovered that Table API support this while Dataset API does not. >> Is this documented somehwere on the Flink site? >> >> Best, >> Flavio >> >> ------------------------------------------------------- >> >> PS: you can test this with the following main: >> >> public static void main(String[] args) throws Exception { >> final ExecutionEnvironment env = >> ExecutionEnvironment.getExecutionEnvironment(); >> final BatchTableEnvironment btEnv = >> TableEnvironment.getTableEnvironment(env); >> final DataSet<String> testDs = env >> .fromElements("test", "test", "test2", "null", "null", "test3") >> .map(x -> "null".equals(x) ? null : x); >> >> boolean testDatasetApi = true; >> if (testDatasetApi) { >> testDs.groupBy(x -> x).reduceGroup(new GroupReduceFunction<String, >> Integer>() { >> >> @Override >> public void reduce(Iterable<String> values, Collector<Integer> >> out) throws Exception { >> int cnt = 0; >> for (String value : values) { >> cnt++; >> } >> out.collect(cnt); >> } >> }).print(); >> } >> >> btEnv.registerDataSet("TEST", testDs, "field1"); >> Table res = btEnv.sqlQuery("SELECT field1, count(*) as cnt FROM TEST >> GROUP BY field1"); >> DataSet<Row> result = btEnv.toDataSet(res, >> new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, >> BasicTypeInfo.LONG_TYPE_INFO)); >> result.print(); >> } >> >