[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708923#comment-16708923
 ] 

ASF GitHub Bot commented on FLINK-7599:
---------------------------------------

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r238709945
 
 

 ##########
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##########
 @@ -49,6 +50,90 @@ import scala.collection.mutable
 /**
   * A code generator for generating CEP related functions.
   *
+  * Aggregates are generated as follows:
+  * 1. all aggregate [[RexCall]]s are grouped by corresponding pattern variable
+  * 2. even if the same aggregation is used multiple times in an expression
+  *    (e.g. SUM(A.price) > SUM(A.price) + 1) it will be calculated once. To 
do so [[AggBuilder]]
+  *    keeps set of already seen different aggregation calls, and reuses the 
code to access
+  *    appropriate field of aggregation result
+  * 3. after translating every expression (either in [[generateCondition]] or 
in
+  *    [[generateOneRowPerMatchExpression]]) there will be generated code for
+  *       * [[GeneratedFunction]], which will be an inner class
+  *       * said [[GeneratedFunction]] will be instantiated in the ctor and 
opened/closed
+  *         in corresponding methods of top level generated classes
+  *       * function that transforms input rows (row by row) into aggregate 
input rows
+  *       * function that calculates aggregates for variable, that uses the 
previous method
+  *    The generated code will look similar to this:
+  *
+  *
+  * {{{
+  *
+  * public class MatchRecognizePatternSelectFunction$175 extends 
RichPatternSelectFunction {
+  *
+  *     // Class used to calculate aggregates for a single pattern variable
+  *     public final class AggFunction_variable$115$151 extends 
GeneratedAggregations {
+  *       ...
+  *     }
+  *
+  *     private final AggFunction_variable$115$151 aggregator_variable$115;
+  *
+  *     public MatchRecognizePatternSelectFunction$175() {
+  *       aggregator_variable$115 = new AggFunction_variable$115$151();
+  *     }
+  *
+  *     public void open() {
+  *       aggregator_variable$115.open();
+  *       ...
+  *     }
+  *
+  *     // Function to transform incoming row into aggregate specific row. It 
can e.g calculate
+  *     // inner expression of said aggregate
+  *     private Row transformRowForAgg_variable$115(Row inAgg) {
+  *         ...
+  *     }
+  *
+  *     // Function to calculate all aggregates for a single pattern variable
+  *     private Row calculateAgg_variable$115(List<Row> input) {
+  *       Acc accumulator = aggregator_variable$115.createAccumulator();
+  *       for (Row row : input) {
+  *         aggregator_variable$115.accumulate(accumulator, 
transformRowForAgg_variable$115(row));
+  *       }
+  *
+  *       return aggregator_variable$115.getResult(accumulator);
+  *     }
+  *
+  *     @Override
+  *     public Object select(Map<String, List<Row>> in1) throws Exception {
+  *
+  *       // Extract list of rows assigned to a single pattern variable
+  *       java.util.List patternEvents$130 = (java.util.List) in1.get("A");
+  *       ...
+  *
+  *       // Calculate aggregates
+  *       Row aggRow_variable$110$111 = 
calculateAgg_variable$110(patternEvents$114);
+  *
+  *       // Every aggregation (e.g SUM(A.price) and AVG(A.price)) will be 
extracted to a variable
+  *       double result$135 = aggRow_variable$126$127.getField(0);
+  *       long result$137 = aggRow_variable$126$127.getField(1);
+  *
+  *       // Result of aggregation will be used in expression evaluation
+  *       out.setField(0, result$135)
+  *
+  *       long result$140 = result$137 * 2;
+  *       out.setField(1, result$140);
+  *
+  *       double result$144 = $result135 + result$137;
+  *       out.setField(2, result$144);
+  *     }
+  *
+  *     public void close() {
+  *       aggregator_variable$115.close();
+  *       ...
+  *     }
+  *
+  * }
+  * }}}
+  *
 
 Review comment:
   Awesome documentation. Very helpful.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-7599
>                 URL: https://issues.apache.org/jira/browse/FLINK-7599
>             Project: Flink
>          Issue Type: Sub-task
>          Components: CEP, Table API &amp; SQL
>            Reporter: Dian Fu
>            Assignee: Dawid Wysakowicz
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to