grzegorz8 commented on code in PR #24699: URL: https://github.com/apache/flink/pull/24699#discussion_r1819361358
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java: ########## @@ -51,7 +71,95 @@ public BatchExecMatch( } @Override - public boolean isProcTime(RowType inputRowType) { - return true; + public void checkOrderKeys(RowType inputRowType) { + SortSpec orderKeys = matchSpec.getOrderKeys(); + if (orderKeys.getFieldSize() == 0) { + throw new TableException("You must specify non-empty order by."); + } + + SortSpec.SortFieldSpec timeOrderField = orderKeys.getFieldSpec(0); + int timeOrderFieldIdx = timeOrderField.getFieldIndex(); + LogicalType timeOrderFieldType = inputRowType.getTypeAt(timeOrderFieldIdx); + + if (!TypeCheckUtils.isTimePoint(timeOrderFieldType)) { + throw new TableException("You must specify time point for order by as the first one."); + } + + // time ordering needs to be ascending + if (!orderKeys.getAscendingOrders()[0]) { + throw new TableException("Primary sort order of a table must be ascending on time."); + } + } + + @Override + protected Transformation<RowData> translateOrder( + PlannerBase planner, + Transformation<RowData> inputTransform, + RowType inputRowType, + ExecEdge inputEdge, + ExecNodeConfig config) { + if (isProcTime(inputRowType)) { + // In proctime process records in the order they come. + return inputTransform; + } + + SortSpec sortSpec = matchSpec.getOrderKeys(); + RowType inputType = (RowType) inputEdge.getOutputType(); + SortCodeGenerator codeGen = + new SortCodeGenerator( + config, planner.getFlinkContext().getClassLoader(), inputType, sortSpec); + SortOperator operator = Review Comment: > Do we need the sorting before the MATCH operator? Won't the sorting of CepOperator be enough? Yes, you are right, sorting is not needed. Correct me if I'm wrong: In batch mode events for given key are buffered in `CepOperator` state and they all are processed when `onEventTime(MAX_WATERMARK)` is called. `onEventTime()` is called exactly once for each key. In `onEventTime()` `CepOperator` iterates `MapState<Long, List<IN>>` in ascending order by key (ties are resolved using `EventComparator<IN> comparator`). ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java: ########## @@ -567,7 +912,7 @@ void testUserDefinedFunctions() { "SELECT *\n" + "FROM MyTable\n" + "MATCH_RECOGNIZE (\n" - + " ORDER BY proctime\n" + + " ORDER BY ts\n" Review Comment: > As a question, do we need to change all of the tests from using PROCTIME() to ts? > > E.g., can the existing behavior remain? If you ask me, MATCH_RECOGNIZE using proctime() in batch mode does not make any sense so I think there is no need to have so many tests for it. But maybe I don't understand the big picture. If you wish, I can revert my changes and have similar tests for both proctime and event-time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org