Hi,

What he meant was MultipleProgramsTestBase, not FlinkTestBase.

I debugged this a bit.

The NPE is thrown in

https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java#L296

since current can be null if the input iterator is empty.

In Cluster Execution, it is checked that the output of the previous function (e.g. Filter) is not empty in:

https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java#L144

which avoids going into AggregateOperator and getting a NPE.

However, in Collection Mode, the execution is not grouped (don't know why, yet). In

https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java#L207

the copied input data is handed over to the aggregate function which leads to the NPE.

Checking inputDataCopy.size() > 0 before calling the aggregate solves the problem.

If someone can confirm that this is not a more generic problem, I would open an issue and a PR.

Best,
Martin

On 20.11.2015 18:41, André Petermann wrote:
Hi all,

during a workflow, a data set may run empty, e.g., because of a join
without matches.

We're using FlinkTestBase and found out, that aggregate functions on
empty data sets work fine in CLUSTER execution mode but cause a Null
Pointer Exception at AggregateOperator$AggregatingUdf in COLLECTION mode.

Here is the minimal example on 1.0-SNAPSHOT:
https://gist.github.com/p3et/59a65bab11098dd11054

Are we doing something wrong, or is this a bug?

Cheers,
Andre

Reply via email to