[ https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16702970#comment-16702970 ]
ASF GitHub Bot commented on FLINK-7599: --------------------------------------- twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r237417018 ########## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ########## @@ -1185,331 +1273,281 @@ object AggregateUtil { val aggFieldIndexes = new Array[Array[Int]](aggregateCalls.size) val aggregates = new Array[TableAggregateFunction[_ <: Any, _ <: Any]](aggregateCalls.size) val accTypes = new Array[TypeInformation[_]](aggregateCalls.size) + val accSpecs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) + val isDistinctAggs = new Array[Boolean](aggregateCalls.size) - // create aggregate function instances by function type and aggregate field data type. - aggregateCalls.zipWithIndex.foreach { case (aggregateCall, index) => - val argList: util.List[Integer] = aggregateCall.getArgList + aggregateCalls + .zipWithIndex.foreach { + case (aggregateCall, index) => + val argList: util.List[Integer] = aggregateCall.getArgList - if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) { - aggregates(index) = new CountAggFunction if (argList.isEmpty) { - aggFieldIndexes(index) = Array[Int](-1) + if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) { + aggFieldIndexes(index) = Array[Int](-1) + } else { + throw new TableException("Aggregate fields should not be empty.") + } } else { aggFieldIndexes(index) = argList.asScala.map(i => i.intValue).toArray } - } else { - if (argList.isEmpty) { - throw new TableException("Aggregate fields should not be empty.") + + val inputTypes = argList.map(aggregateInputType.getFieldList.get(_).getType) + val result = transformToAggregateFunction(aggregateCall.getAggregation, + aggregateCall.isDistinct, + inputTypes, + needRetraction, + tableConfig, + isStateBackedDataViews, + index) + + aggregates(index) = result._1 + accTypes(index) = result._2 + accSpecs(index) = result._3 + isDistinctAggs(index) = aggregateCall.isDistinct + } + + (aggFieldIndexes, aggregates, isDistinctAggs, accTypes, accSpecs) + } + + private def createFlinkAggFunction( Review comment: Can you add a comment to every method in this class? This class is quite big every comment helps in understanding what is going on. For example, `Converts Calcite's [[SqlAggFunction]] to a Flink UDF [[TableAggregationFunction]]`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support aggregation functions in the define and measures clause of > MatchRecognize > --------------------------------------------------------------------------------- > > Key: FLINK-7599 > URL: https://issues.apache.org/jira/browse/FLINK-7599 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API & SQL > Reporter: Dian Fu > Assignee: Dawid Wysakowicz > Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)