Shaoxuan Wang created FLINK-5433:
------------------------------------

             Summary: initiate function of Aggregate does not take effect for 
DataStream aggregation
                 Key: FLINK-5433
                 URL: https://issues.apache.org/jira/browse/FLINK-5433
             Project: Flink
          Issue Type: Bug
          Components: Table API & SQL
            Reporter: Shaoxuan Wang


The initiate function of Aggregate works for dataset aggregation, but does not 
work for DataStream aggregation.

For instance, when giving an initial value, say 2, for CountAggregate. The 
result of dataset aggregate will take this change into account, but dataStream 
aggregate will not.
{code}
class CountAggregate extends Aggregate[Long] {
  override def initiate(intermediate: Row): Unit = {
    intermediate.setField(countIndex, 2L)
  }
}
{code}

The output for dataset test(testWorkingAggregationDataTypes) will result in
.select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count)
 expected: [1,1,1,1,1.5,1.5,2]
 received: [1,1,1,1,1.5,1.5,4] (the result of last count aggregate is bigger 
than expect value by 2, as expected)

But the output for datastream 
test(testProcessingTimeSlidingGroupWindowOverCount) will remain the same:
.select('string, 'int.count, 'int.avg)
Expected :List(Hello world,1,3, Hello world,2,3, Hello,1,2, Hello,2,2, Hi,1,1)
Actual   :MutableList(Hello world,1,3, Hello world,2,3, Hello,1,2, Hello,2,2, 
Hi,1,1)




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to