grzegorz8 commented on code in PR #24699:
URL: https://github.com/apache/flink/pull/24699#discussion_r1819361358


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java:
##########
@@ -51,7 +71,95 @@ public BatchExecMatch(
     }
 
     @Override
-    public boolean isProcTime(RowType inputRowType) {
-        return true;
+    public void checkOrderKeys(RowType inputRowType) {
+        SortSpec orderKeys = matchSpec.getOrderKeys();
+        if (orderKeys.getFieldSize() == 0) {
+            throw new TableException("You must specify non-empty order by.");
+        }
+
+        SortSpec.SortFieldSpec timeOrderField = orderKeys.getFieldSpec(0);
+        int timeOrderFieldIdx = timeOrderField.getFieldIndex();
+        LogicalType timeOrderFieldType = 
inputRowType.getTypeAt(timeOrderFieldIdx);
+
+        if (!TypeCheckUtils.isTimePoint(timeOrderFieldType)) {
+            throw new TableException("You must specify time point for order by 
as the first one.");
+        }
+
+        // time ordering needs to be ascending
+        if (!orderKeys.getAscendingOrders()[0]) {
+            throw new TableException("Primary sort order of a table must be 
ascending on time.");
+        }
+    }
+
+    @Override
+    protected Transformation<RowData> translateOrder(
+            PlannerBase planner,
+            Transformation<RowData> inputTransform,
+            RowType inputRowType,
+            ExecEdge inputEdge,
+            ExecNodeConfig config) {
+        if (isProcTime(inputRowType)) {
+            // In proctime process records in the order they come.
+            return inputTransform;
+        }
+
+        SortSpec sortSpec = matchSpec.getOrderKeys();
+        RowType inputType = (RowType) inputEdge.getOutputType();
+        SortCodeGenerator codeGen =
+                new SortCodeGenerator(
+                        config, planner.getFlinkContext().getClassLoader(), 
inputType, sortSpec);
+        SortOperator operator =

Review Comment:
   > Do we need the sorting before the MATCH operator? Won't the sorting of 
CepOperator be enough?
   
   Yes, you are right, sorting is not needed.
   
   Correct me if I'm wrong: In batch mode events for given key are buffered in 
`CepOperator` state and they all are processed when 
`onEventTime(MAX_WATERMARK)` is called. `onEventTime()` is called exactly once 
for each key.  In `onEventTime()` `CepOperator` iterates `MapState<Long, 
List<IN>>` in ascending order by key (ties are resolved using 
`EventComparator<IN> comparator`).



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java:
##########
@@ -567,7 +912,7 @@ void testUserDefinedFunctions() {
                                 "SELECT *\n"
                                         + "FROM MyTable\n"
                                         + "MATCH_RECOGNIZE (\n"
-                                        + "  ORDER BY proctime\n"
+                                        + "  ORDER BY ts\n"

Review Comment:
   > As a question, do we need to change all of the tests from using PROCTIME() 
to ts?
   > 
   > E.g., can the existing behavior remain?
   
   If you ask me, MATCH_RECOGNIZE using proctime() in batch mode does not make 
any sense so I think there is no need to have so many tests for it. But maybe I 
don't understand the big picture.
   
   If you wish, I can revert my changes and have similar tests for both 
proctime and event-time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to