Shaoxuan Wang created FLINK-5433: ------------------------------------ Summary: initiate function of Aggregate does not take effect for DataStream aggregation Key: FLINK-5433 URL: https://issues.apache.org/jira/browse/FLINK-5433 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Shaoxuan Wang
The initiate function of Aggregate works for dataset aggregation, but does not work for DataStream aggregation. For instance, when giving an initial value, say 2, for CountAggregate. The result of dataset aggregate will take this change into account, but dataStream aggregate will not. {code} class CountAggregate extends Aggregate[Long] { override def initiate(intermediate: Row): Unit = { intermediate.setField(countIndex, 2L) } } {code} The output for dataset test(testWorkingAggregationDataTypes) will result in .select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count) expected: [1,1,1,1,1.5,1.5,2] received: [1,1,1,1,1.5,1.5,4] (the result of last count aggregate is bigger than expect value by 2, as expected) But the output for datastream test(testProcessingTimeSlidingGroupWindowOverCount) will remain the same: .select('string, 'int.count, 'int.avg) Expected :List(Hello world,1,3, Hello world,2,3, Hello,1,2, Hello,2,2, Hi,1,1) Actual :MutableList(Hello world,1,3, Hello world,2,3, Hello,1,2, Hello,2,2, Hi,1,1) -- This message was sent by Atlassian JIRA (v6.3.4#6332)