dawidwys commented on code in PR #24699:
URL: https://github.com/apache/flink/pull/24699#discussion_r1811139271


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java:
##########
@@ -51,7 +71,95 @@ public BatchExecMatch(
     }
 
     @Override
-    public boolean isProcTime(RowType inputRowType) {
-        return true;
+    public void checkOrderKeys(RowType inputRowType) {
+        SortSpec orderKeys = matchSpec.getOrderKeys();
+        if (orderKeys.getFieldSize() == 0) {
+            throw new TableException("You must specify non-empty order by.");
+        }
+
+        SortSpec.SortFieldSpec timeOrderField = orderKeys.getFieldSpec(0);
+        int timeOrderFieldIdx = timeOrderField.getFieldIndex();
+        LogicalType timeOrderFieldType = 
inputRowType.getTypeAt(timeOrderFieldIdx);
+
+        if (!TypeCheckUtils.isTimePoint(timeOrderFieldType)) {
+            throw new TableException("You must specify time point for order by 
as the first one.");
+        }
+
+        // time ordering needs to be ascending
+        if (!orderKeys.getAscendingOrders()[0]) {
+            throw new TableException("Primary sort order of a table must be 
ascending on time.");
+        }
+    }
+
+    @Override
+    protected Transformation<RowData> translateOrder(
+            PlannerBase planner,
+            Transformation<RowData> inputTransform,
+            RowType inputRowType,
+            ExecEdge inputEdge,
+            ExecNodeConfig config) {
+        if (isProcTime(inputRowType)) {
+            // In proctime process records in the order they come.
+            return inputTransform;
+        }
+
+        SortSpec sortSpec = matchSpec.getOrderKeys();
+        RowType inputType = (RowType) inputEdge.getOutputType();
+        SortCodeGenerator codeGen =
+                new SortCodeGenerator(
+                        config, planner.getFlinkContext().getClassLoader(), 
inputType, sortSpec);
+        SortOperator operator =

Review Comment:
   I know this sorting is more performant in batch. Maybe we could disable the 
sorting in the `CepOperator`? Could we always run the `CepOperator` in 
processing time without a comparator in batch and depend on this `SortOperator`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to