[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15661165#comment-15661165
 ] 

ASF GitHub Bot commented on FLINK-4937:
---------------------------------------

Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2792#discussion_r87707098
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
    @@ -135,50 +130,124 @@ class DataStreamAggregate(
           namedProperties)
     
         val prepareOpName = s"prepare select: ($aggString)"
    -    val mappedInput = inputDS
    -      .map(aggregateResult._1)
    -      .name(prepareOpName)
    -
    -    val groupReduceFunction = aggregateResult._2
    -    val rowTypeInfo = new RowTypeInfo(fieldTypes)
     
    -    val result = {
    -      // grouped / keyed aggregation
    -      if (groupingKeys.length > 0) {
    -        val aggOpName = s"groupBy: (${groupingToString(inputType, 
grouping)}), " +
    -          s"window: ($window), " +
    -          s"select: ($aggString)"
    -        val aggregateFunction =
    -          createWindowAggregationFunction(window, namedProperties, 
groupReduceFunction)
    -
    -        val keyedStream = mappedInput.keyBy(groupingKeys: _*)
    -
    -        val windowedStream = createKeyedWindowedStream(window, keyedStream)
    -          .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
    -
    -        windowedStream
    -          .apply(aggregateFunction)
    -          .returns(rowTypeInfo)
    -          .name(aggOpName)
    -          .asInstanceOf[DataStream[Any]]
    +    val (aggFieldIndexes, aggregates) =
    +      AggregateUtil.transformToAggregateFunctions(
    +        namedAggregates.map(_.getKey), inputType, grouping.length)
    +
    +    val result: DataStream[Any] = {
    +
    +      // check all aggregates are support Partial aggregate
    +      if (aggregates.map(_.supportPartial).forall(x => x)) {
    --- End diff --
    
    There is too much code in `translateToPlan` method. I would like to split 
the incremental-aggregation and non-incremental-aggregation into separate 
private methods.
    
    That would be more readable.


> Add incremental group window aggregation for streaming Table API
> ----------------------------------------------------------------
>
>                 Key: FLINK-4937
>                 URL: https://issues.apache.org/jira/browse/FLINK-4937
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>    Affects Versions: 1.2.0
>            Reporter: Fabian Hueske
>            Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to