
What he meant was MultipleProgramsTestBase, not FlinkTestBase.

I debugged this a bit.

The NPE is thrown in


since current can be null if the input iterator is empty.

In Cluster Execution, it is checked that the output of the previous function (e.g. Filter) is not empty in:


which avoids going into AggregateOperator and getting a NPE.

However, in Collection Mode, the execution is not grouped (don't know why, yet). In


the copied input data is handed over to the aggregate function which leads to the NPE.

Checking inputDataCopy.size() > 0 before calling the aggregate solves the problem.

If someone can confirm that this is not a more generic problem, I would open an issue and a PR.


On 20.11.2015 18:41, André Petermann wrote:
Hi all,

during a workflow, a data set may run empty, e.g., because of a join
without matches.

We're using FlinkTestBase and found out, that aggregate functions on
empty data sets work fine in CLUSTER execution mode but cause a Null
Pointer Exception at AggregateOperator$AggregatingUdf in COLLECTION mode.

Here is the minimal example on 1.0-SNAPSHOT:

Are we doing something wrong, or is this a bug?


Reply via email to