Hi Max,

fixed in https://github.com/apache/flink/pull/1396

Best,
Martin

On 24.11.2015 13:46, Maximilian Michels wrote:
Hi André, hi Martin,

This looks very much like a bug. Martin, I would be happy if you
opened a JIRA issue.

Thanks,
Max

On Sun, Nov 22, 2015 at 12:27 PM, Martin Junghanns
<m.jungha...@mailbox.org> wrote:
Hi,

What he meant was MultipleProgramsTestBase, not FlinkTestBase.

I debugged this a bit.

The NPE is thrown in

https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java#L296

since current can be null if the input iterator is empty.

In Cluster Execution, it is checked that the output of the previous function
(e.g. Filter) is not empty in:

https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java#L144

which avoids going into AggregateOperator and getting a NPE.

However, in Collection Mode, the execution is not grouped (don't know why,
yet). In

https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java#L207

the copied input data is handed over to the aggregate function which leads
to the NPE.

Checking inputDataCopy.size() > 0 before calling the aggregate solves the
problem.

If someone can confirm that this is not a more generic problem, I would open
an issue and a PR.

Best,
Martin


On 20.11.2015 18:41, André Petermann wrote:

Hi all,

during a workflow, a data set may run empty, e.g., because of a join
without matches.

We're using FlinkTestBase and found out, that aggregate functions on
empty data sets work fine in CLUSTER execution mode but cause a Null
Pointer Exception at AggregateOperator$AggregatingUdf in COLLECTION mode.

Here is the minimal example on 1.0-SNAPSHOT:
https://gist.github.com/p3et/59a65bab11098dd11054

Are we doing something wrong, or is this a bug?

Cheers,
Andre


Reply via email to