Hi André, hi Martin, This looks very much like a bug. Martin, I would be happy if you opened a JIRA issue.
Thanks, Max On Sun, Nov 22, 2015 at 12:27 PM, Martin Junghanns <m.jungha...@mailbox.org> wrote: > Hi, > > What he meant was MultipleProgramsTestBase, not FlinkTestBase. > > I debugged this a bit. > > The NPE is thrown in > > https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java#L296 > > since current can be null if the input iterator is empty. > > In Cluster Execution, it is checked that the output of the previous function > (e.g. Filter) is not empty in: > > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java#L144 > > which avoids going into AggregateOperator and getting a NPE. > > However, in Collection Mode, the execution is not grouped (don't know why, > yet). In > > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java#L207 > > the copied input data is handed over to the aggregate function which leads > to the NPE. > > Checking inputDataCopy.size() > 0 before calling the aggregate solves the > problem. > > If someone can confirm that this is not a more generic problem, I would open > an issue and a PR. > > Best, > Martin > > > On 20.11.2015 18:41, André Petermann wrote: >> >> Hi all, >> >> during a workflow, a data set may run empty, e.g., because of a join >> without matches. >> >> We're using FlinkTestBase and found out, that aggregate functions on >> empty data sets work fine in CLUSTER execution mode but cause a Null >> Pointer Exception at AggregateOperator$AggregatingUdf in COLLECTION mode. >> >> Here is the minimal example on 1.0-SNAPSHOT: >> https://gist.github.com/p3et/59a65bab11098dd11054 >> >> Are we doing something wrong, or is this a bug? >> >> Cheers, >> Andre >> >