[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16702970#comment-16702970
 ] 

ASF GitHub Bot commented on FLINK-7599:
---------------------------------------

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237417018
 
 

 ##########
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ##########
 @@ -1185,331 +1273,281 @@ object AggregateUtil {
     val aggFieldIndexes = new Array[Array[Int]](aggregateCalls.size)
     val aggregates = new Array[TableAggregateFunction[_ <: Any, _ <: 
Any]](aggregateCalls.size)
     val accTypes = new Array[TypeInformation[_]](aggregateCalls.size)
+    val accSpecs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+    val isDistinctAggs = new Array[Boolean](aggregateCalls.size)
 
-    // create aggregate function instances by function type and aggregate 
field data type.
-    aggregateCalls.zipWithIndex.foreach { case (aggregateCall, index) =>
-      val argList: util.List[Integer] = aggregateCall.getArgList
+    aggregateCalls
+      .zipWithIndex.foreach {
+      case (aggregateCall, index) =>
+        val argList: util.List[Integer] = aggregateCall.getArgList
 
-      if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
-        aggregates(index) = new CountAggFunction
         if (argList.isEmpty) {
-          aggFieldIndexes(index) = Array[Int](-1)
+          if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
+            aggFieldIndexes(index) = Array[Int](-1)
+          } else {
+            throw new TableException("Aggregate fields should not be empty.")
+          }
         } else {
           aggFieldIndexes(index) = argList.asScala.map(i => i.intValue).toArray
         }
-      } else {
-        if (argList.isEmpty) {
-          throw new TableException("Aggregate fields should not be empty.")
+
+        val inputTypes = 
argList.map(aggregateInputType.getFieldList.get(_).getType)
+        val result = transformToAggregateFunction(aggregateCall.getAggregation,
+          aggregateCall.isDistinct,
+          inputTypes,
+          needRetraction,
+          tableConfig,
+          isStateBackedDataViews,
+          index)
+
+        aggregates(index) = result._1
+        accTypes(index) = result._2
+        accSpecs(index) = result._3
+        isDistinctAggs(index) = aggregateCall.isDistinct
+    }
+
+    (aggFieldIndexes, aggregates, isDistinctAggs, accTypes, accSpecs)
+  }
+
+  private def createFlinkAggFunction(
 
 Review comment:
   Can you add a comment to every method in this class? This class is quite big 
every comment helps in understanding what is going on. For example, `Converts 
Calcite's [[SqlAggFunction]] to a Flink UDF [[TableAggregationFunction]]`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-7599
>                 URL: https://issues.apache.org/jira/browse/FLINK-7599
>             Project: Flink
>          Issue Type: Sub-task
>          Components: CEP, Table API &amp; SQL
>            Reporter: Dian Fu
>            Assignee: Dawid Wysakowicz
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to