Gabor Gevay created FLINK-4578: ---------------------------------- Summary: AggregateOperator incorrectly sets ForwardedField with nested composite types Key: FLINK-4578 URL: https://issues.apache.org/jira/browse/FLINK-4578 Project: Flink Issue Type: Bug Components: DataSet API Reporter: Gabor Gevay
When an aggregation is called on a grouped DataSet, {{AggregateOperator.translateToDataFlow}} tries to determine whether the field that is being aggregated is the same field that the grouping is based on. If this is not the case, then it adds the ForwardedField property for the key field. However, the mechanism that makes this decision breaks when there are nested composite types involved, because it gets the key positions with {{getKeys().computeLogicalKeyPositions()}}, which returns the _flat_ positions, whereas the position of the field to aggregate is counted only on the outer type. Example code: https://github.com/ggevay/flink/tree/agg-bad-forwarded-fields Here, I have changed the WordCount example to have the type {{Tuple3<Tuple2<Byte,Byte>, String, Integer>}}, and do {{.groupBy(1).sum(2)}} (which groups by the String field and sums the Integer field). If you set a breakpoint into {{AggregateOperator.translateToDataFlow}}, you can see that {{logicalKeyPositions}} contains 2, and {{fields}} also contains 2, which causes {{keyFieldUsedInAgg}} to be erroneously set to true. The problem is caused by the Tuple2 being counted as 2 fields in {{logicalKeyPositions}}, but only 1 field in {{fields}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)