[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16702968#comment-16702968
 ] 

ASF GitHub Bot commented on FLINK-7599:
---------------------------------------

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237419538
 
 

 ##########
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ##########
 @@ -1169,6 +1169,94 @@ object AggregateUtil {
     (propPos._1, propPos._2, propPos._3)
   }
 
+  private[flink] def transformToAggregateFunction(
+      aggregateCall: SqlAggFunction,
+      isDistinct: Boolean,
+      aggregateInputType: Seq[RelDataType],
+      needRetraction: Boolean,
+      tableConfig: TableConfig,
+      isStateBackedDataViews: Boolean = false,
+      index: Int = 0)
+    : (TableAggregateFunction[_, _],
+      TypeInformation[_],
+      Seq[DataViewSpec[_]]) = {
+    // store the aggregate fields of each aggregate function, by the same 
order of aggregates.
+    // create aggregate function instances by function type and aggregate 
field data type.
+
+    var accumulatorType: TypeInformation[_] = null
+    val inputType = () => if (aggregateInputType.isEmpty) {
+      throw new TableException("Aggregate fields should not be empty.")
+    } else {
+      aggregateInputType.get(0)
+    }
+
+    val aggregate: TableAggregateFunction[_, _] = aggregateCall match {
+
+      case _: SqlCountAggFunction =>
+        new CountAggFunction
+
+      case collect: SqlAggFunction if collect.getKind == SqlKind.COLLECT =>
+        val agg = new 
CollectAggFunction(FlinkTypeFactory.toTypeInfo(inputType()))
+        accumulatorType = agg.getAccumulatorType
+        agg
+
+      case udagg: AggSqlFunction =>
+        accumulatorType = udagg.accType
+        udagg.getFunction
+
+      case aggFunction: SqlAggFunction =>
+        createFlinkAggFunction(aggFunction,
+          needRetraction,
+          inputType(),
+          tableConfig)
+    }
+
+    val accSpecs =
+      if (accumulatorType != null) {
+        val (accType, specs) = removeStateViewFieldsFromAccTypeInfo(index,
+          aggregate,
+          accumulatorType,
+          isStateBackedDataViews)
+        if (specs.isDefined) {
+          accumulatorType = accType
+          specs.get
+        } else {
+          Seq()
+        }
+      } else {
+        accumulatorType = getAccumulatorTypeOfAggregateFunction(aggregate)
+        Seq()
+      }
+
+    // create distinct accumulator filter argument
+    if (isDistinct) {
+      // Using Pojo fields for the real underlying accumulator
+      val pojoFields = new util.ArrayList[PojoField]()
+      pojoFields.add(new PojoField(
+        classOf[DistinctAccumulator[_]].getDeclaredField("realAcc"),
+        accumulatorType)
+      )
+      // If StateBackend is not enabled, the distinct mapping also needs
+      // to be added to the Pojo fields.
+      if (!isStateBackedDataViews) {
+
+        val argTypes: Array[TypeInformation[_]] = aggregateInputType
+          .map(FlinkTypeFactory.toTypeInfo).toArray
+
+        val mapViewTypeInfo = new MapViewTypeInfo(
+          new RowTypeInfo(argTypes: _*),
+          BasicTypeInfo.LONG_TYPE_INFO)
+        pojoFields.add(new PojoField(
+          classOf[DistinctAccumulator[_]].getDeclaredField("distinctValueMap"),
+          mapViewTypeInfo)
+        )
+      }
+      accumulatorType = new PojoTypeInfo(classOf[DistinctAccumulator[_]], 
pojoFields)
+    }
+
+    (aggregate, accumulatorType, accSpecs)
+  }
+
   private def transformToAggregateFunctions(
 
 Review comment:
   We definitely need more documentation for this function. Can you create case 
classes for the return type of this function for proper naming because 
`Array[Int]` or `Array[Boolean]` is not very helpful.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-7599
>                 URL: https://issues.apache.org/jira/browse/FLINK-7599
>             Project: Flink
>          Issue Type: Sub-task
>          Components: CEP, Table API & SQL
>            Reporter: Dian Fu
>            Assignee: Dawid Wysakowicz
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to