godfreyhe commented on a change in pull request #14502:
URL: https://github.com/apache/flink/pull/14502#discussion_r551158780



##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSort.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.operators.sort.StreamSortOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.stream.IntStream;
+
+/**
+ * Stream exec RelNode for Sort.
+ *
+ * <p><b>NOTES:</b> This class is used for testing with bounded source now. If 
a query is converted
+ * to this node in product environment, an exception will be thrown.
+ */
+public class StreamExecSort extends ExecNodeBase<RowData> implements 
StreamExecNode<RowData> {
+
+    @Experimental
+    public static final ConfigOption<Boolean> 
TABLE_EXEC_SORT_NON_TEMPORAL_ENABLED =
+            ConfigOptions.key("table.exec.non-temporal-sort.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Set whether to enable universal sort for stream. 
When it is false, "
+                                    + "universal sort can't use for stream, 
default false. Just for testing.");
+
+    private final int[] keys;
+    private final boolean[] orders;
+    private final boolean[] nullsIsLast;

Review comment:
       add some comments to explain the fields? especial explain the relation 
of each field.   rename `keys ` to `sortFields`?   rename `orders` to 
`ascendingOrders` ? (please also update `StreamExecRank#sortDirections` and add 
some comments to each fields)
   
   or we add a class to represent each sort field, such as
   ```
   class SortFieldSpec {
       int key;
       boolean isAscending;
       boolean nullIsLast;
   }
   ```

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSort.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.runtime.operators.sort.SortOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.stream.IntStream;
+
+/**
+ * Batch {@link ExecNode} for Sort.
+ *
+ * <p> This node will output all data rather than `limit` records. </p>
+ */
+public class BatchExecSort extends ExecNodeBase<RowData> implements 
BatchExecNode<RowData> {
+
+       private final int[] keys;
+       private final boolean[] orders;
+       private final boolean[] nullsIsLast;
+
+       public BatchExecSort(
+                       int[] keys,
+                       boolean[] orders,
+                       boolean[] nullsIsLast,
+                       ExecEdge inputEdge,
+                       LogicalType outputType,

Review comment:
       RowType

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalSort.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector;
+import org.apache.flink.table.runtime.operators.sort.ProcTimeSortOperator;
+import org.apache.flink.table.runtime.operators.sort.RowTimeSortOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.IntStream;
+
+/** Stream exec RelNode for time-ascending-order Sort without `limit`. */
+public class StreamExecTemporalSort extends ExecNodeBase<RowData>
+        implements StreamExecNode<RowData> {
+
+    private final int[] keys;
+    private final boolean[] orders;
+    private final boolean[] nullsIsLast;

Review comment:
       ditto

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSort.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.runtime.operators.sort.SortOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.stream.IntStream;
+
+/**
+ * Batch {@link ExecNode} for Sort.
+ *
+ * <p>This node will output all data rather than `limit` records.
+ */
+public class BatchExecSort extends ExecNodeBase<RowData> implements 
BatchExecNode<RowData> {
+
+    private final int[] keys;
+    private final boolean[] orders;
+    private final boolean[] nullsIsLast;
+
+    public BatchExecSort(
+            int[] keys,
+            boolean[] orders,
+            boolean[] nullsIsLast,
+            ExecEdge inputEdge,
+            LogicalType outputType,
+            String description) {
+        super(Collections.singletonList(inputEdge), outputType, description);
+        this.keys = keys;
+        this.orders = orders;
+        this.nullsIsLast = nullsIsLast;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase 
planner) {
+
+        Transformation<RowData> input =
+                (Transformation<RowData>) 
getInputNodes().get(0).translateToPlan(planner);
+
+        TableConfig conf = planner.getTableConfig();
+        RowType inputType = (RowType) getInputNodes().get(0).getOutputType();
+        RowType outputType = (RowType) getOutputType();
+
+        // sort code gen
+        LogicalType[] keyTypes =
+                
IntStream.of(keys).mapToObj(inputType::getTypeAt).toArray(LogicalType[]::new);
+        SortCodeGenerator codeGen =
+                new SortCodeGenerator(conf, keys, keyTypes, orders, 
nullsIsLast);
+
+        SortOperator operator =
+                new SortOperator(
+                        
codeGen.generateNormalizedKeyComputer("BatchExecSortComputer"),
+                        
codeGen.generateRecordComparator("BatchExecSortComparator"));
+
+        long sortMemory =
+                MemorySize.parse(
+                                conf.getConfiguration()
+                                        .getString(
+                                                ExecutionConfigOptions
+                                                        
.TABLE_EXEC_RESOURCE_SORT_MEMORY))
+                        .getBytes();

Review comment:
       just complain: ugly format

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSort.scala
##########
@@ -92,36 +80,17 @@ class BatchExecSort(
 
   //~ ExecNode methods 
-----------------------------------------------------------

Review comment:
       remove it

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSortLimit.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.utils.RankProcessStrategy;
+import org.apache.flink.table.runtime.operators.rank.ConstantRankRange;
+import org.apache.flink.table.runtime.operators.rank.RankType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+/** Stream {@link ExecNode} for Sort. */

Review comment:
       Stream {@link ExecNode} for Sort with limit.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSort.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.operators.sort.StreamSortOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.stream.IntStream;
+
+/**
+ * Stream exec RelNode for Sort.
+ *
+ * <p><b>NOTES:</b> This class is used for testing with bounded source now. If 
a query is converted
+ * to this node in product environment, an exception will be thrown.
+ */
+public class StreamExecSort extends ExecNodeBase<RowData> implements 
StreamExecNode<RowData> {
+
+    @Experimental
+    public static final ConfigOption<Boolean> 
TABLE_EXEC_SORT_NON_TEMPORAL_ENABLED =

Review comment:
       rename `TABLE_EXEC_SORT_NON_TEMPORAL_ENABLED` to 
`TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED`

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalSort.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector;
+import org.apache.flink.table.runtime.operators.sort.ProcTimeSortOperator;
+import org.apache.flink.table.runtime.operators.sort.RowTimeSortOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.IntStream;
+
+/** Stream exec RelNode for time-ascending-order Sort without `limit`. */

Review comment:
       correct the comments

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSort.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.operators.sort.StreamSortOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.stream.IntStream;
+
+/**
+ * Stream exec RelNode for Sort.
+ *
+ * <p><b>NOTES:</b> This class is used for testing with bounded source now. If 
a query is converted
+ * to this node in product environment, an exception will be thrown.
+ */
+public class StreamExecSort extends ExecNodeBase<RowData> implements 
StreamExecNode<RowData> {
+
+    @Experimental
+    public static final ConfigOption<Boolean> 
TABLE_EXEC_SORT_NON_TEMPORAL_ENABLED =
+            ConfigOptions.key("table.exec.non-temporal-sort.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Set whether to enable universal sort for stream. 
When it is false, "
+                                    + "universal sort can't use for stream, 
default false. Just for testing.");
+
+    private final int[] keys;
+    private final boolean[] orders;
+    private final boolean[] nullsIsLast;
+
+    public StreamExecSort(
+            int[] keys,
+            boolean[] orders,
+            boolean[] nullIsLast,
+            ExecEdge inputEdge,
+            LogicalType outputType,

Review comment:
       Please use RowType, which corresponds to the type parameter `RowData` 
for `StreamExecNode` 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSort.scala
##########
@@ -28,18 +28,18 @@ import org.apache.flink.table.data.RowData
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator
 import org.apache.flink.table.planner.delegation.StreamPlanner
-import org.apache.flink.table.planner.plan.nodes.exec.LegacyStreamExecNode
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode, 
LegacyStreamExecNode}
 import org.apache.flink.table.planner.plan.utils.{RelExplainUtil, SortUtil}
 import org.apache.flink.table.runtime.operators.sort.StreamSortOperator
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
-
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel._
 import org.apache.calcite.rel.core.Sort
 import org.apache.calcite.rex.RexNode
-
 import java.lang.{Boolean => JBoolean}
 
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSort

Review comment:
       please remove the unused imports

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSort.scala
##########
@@ -82,39 +82,20 @@ class StreamExecSort(
 
   //~ ExecNode methods 
-----------------------------------------------------------
 
-  override protected def translateToPlanInternal(
-      planner: StreamPlanner): Transformation[RowData] = {
-    val config = planner.getTableConfig
-    if 
(!config.getConfiguration.getBoolean(StreamExecSort.TABLE_EXEC_SORT_NON_TEMPORAL_ENABLED))
 {
-      throw new TableException("Sort on a non-time-attribute field is not 
supported.")
-    }
-
-    val inputType = FlinkTypeFactory.toLogicalRowType(getInput.getRowType)
+  override def translateToExecNode(): ExecNode[_] = {
     val (keys, orders, nullsIsLast) = 
SortUtil.getKeysAndOrders(sortCollation.getFieldCollations)
-    // sort code gen
-    val keyTypes = keys.map(inputType.getTypeAt)
-    val rowComparator = ComparatorCodeGenerator.gen(config, 
"StreamExecSortComparator",
-      keys, keyTypes, orders, nullsIsLast)
-    val sortOperator = new StreamSortOperator(InternalTypeInfo.of(inputType), 
rowComparator)
-    val input = getInputNodes.get(0).translateToPlan(planner)
-      .asInstanceOf[Transformation[RowData]]
-    val outputRowTypeInfo = 
InternalTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType))
-
-    // as input node is singleton exchange, its parallelism is 1.
-    val ret = new OneInputTransformation(
-      input,
-      getRelDetailedDescription,
-      sortOperator,
-      outputRowTypeInfo,
-      input.getParallelism)
-    if (inputsContainSingleton()) {
-      ret.setParallelism(1)
-      ret.setMaxParallelism(1)
-    }
-    ret
+    new StreamExecSort(
+      keys,
+      orders,
+      nullsIsLast,
+      ExecEdge.DEFAULT,
+      FlinkTypeFactory.toLogicalRowType(getRowType),
+      getRelDetailedDescription
+    )
   }
+
 }
-object StreamExecSort {
+object StreamPhysicalSort {

Review comment:
       remove it, use `StreamExecSort.TABLE_EXEC_SORT_NON_TEMPORAL_ENABLED` 
instead

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSort.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.operators.sort.StreamSortOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.stream.IntStream;
+
+/**
+ * Stream exec RelNode for Sort.

Review comment:
       please correct the comments,
   
   Stream {@link ExecNode} for Sort without limit.
   
   

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalSort.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector;
+import org.apache.flink.table.runtime.operators.sort.ProcTimeSortOperator;
+import org.apache.flink.table.runtime.operators.sort.RowTimeSortOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.IntStream;
+
+/** Stream exec RelNode for time-ascending-order Sort without `limit`. */
+public class StreamExecTemporalSort extends ExecNodeBase<RowData>
+        implements StreamExecNode<RowData> {
+
+    private final int[] keys;
+    private final boolean[] orders;
+    private final boolean[] nullsIsLast;
+
+    public StreamExecTemporalSort(
+            int[] keys,
+            boolean[] orders,
+            boolean[] nullIsLast,
+            ExecEdge inputEdges,
+            LogicalType outputType,
+            String description) {
+        super(Collections.singletonList(inputEdges), outputType, description);
+        this.keys = keys;
+        this.orders = orders;
+        this.nullsIsLast = nullIsLast;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase 
planner) {
+        TableConfig config = planner.getTableConfig();
+        RowType inputType = (RowType) getInputNodes().get(0).getOutputType();

Review comment:
       nit: move the field definition closed to its usage.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalSort.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector;
+import org.apache.flink.table.runtime.operators.sort.ProcTimeSortOperator;
+import org.apache.flink.table.runtime.operators.sort.RowTimeSortOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.IntStream;
+
+/** Stream exec RelNode for time-ascending-order Sort without `limit`. */
+public class StreamExecTemporalSort extends ExecNodeBase<RowData>
+        implements StreamExecNode<RowData> {
+
+    private final int[] keys;
+    private final boolean[] orders;
+    private final boolean[] nullsIsLast;
+
+    public StreamExecTemporalSort(
+            int[] keys,
+            boolean[] orders,
+            boolean[] nullIsLast,
+            ExecEdge inputEdges,

Review comment:
       `inputEdges` => `inputEdge`

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalSort.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector;
+import org.apache.flink.table.runtime.operators.sort.ProcTimeSortOperator;
+import org.apache.flink.table.runtime.operators.sort.RowTimeSortOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.IntStream;
+
+/** Stream exec RelNode for time-ascending-order Sort without `limit`. */
+public class StreamExecTemporalSort extends ExecNodeBase<RowData>
+        implements StreamExecNode<RowData> {
+
+    private final int[] keys;
+    private final boolean[] orders;
+    private final boolean[] nullsIsLast;
+
+    public StreamExecTemporalSort(
+            int[] keys,
+            boolean[] orders,
+            boolean[] nullIsLast,
+            ExecEdge inputEdges,
+            LogicalType outputType,

Review comment:
       `RowType`

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSort.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.operators.sort.StreamSortOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.stream.IntStream;
+
+/**
+ * Stream exec RelNode for Sort.
+ *
+ * <p><b>NOTES:</b> This class is used for testing with bounded source now. If 
a query is converted
+ * to this node in product environment, an exception will be thrown.
+ */
+public class StreamExecSort extends ExecNodeBase<RowData> implements 
StreamExecNode<RowData> {
+
+    @Experimental
+    public static final ConfigOption<Boolean> 
TABLE_EXEC_SORT_NON_TEMPORAL_ENABLED =
+            ConfigOptions.key("table.exec.non-temporal-sort.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Set whether to enable universal sort for stream. 
When it is false, "
+                                    + "universal sort can't use for stream, 
default false. Just for testing.");
+
+    private final int[] keys;
+    private final boolean[] orders;
+    private final boolean[] nullsIsLast;
+
+    public StreamExecSort(
+            int[] keys,
+            boolean[] orders,
+            boolean[] nullIsLast,
+            ExecEdge inputEdge,
+            LogicalType outputType,
+            String description) {
+        super(Collections.singletonList(inputEdge), outputType, description);
+        this.keys = keys;
+        this.orders = orders;
+        this.nullsIsLast = nullIsLast;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase 
planner) {
+        TableConfig config = planner.getTableConfig();
+        if 
(!config.getConfiguration().getBoolean(TABLE_EXEC_SORT_NON_TEMPORAL_ENABLED)) {
+            throw new TableException("Sort on a non-time-attribute field is 
not supported.");
+        }
+
+        RowType inputType = (RowType) getInputNodes().get(0).getOutputType();
+        // sort code gen
+        LogicalType[] sortKeyTypes =
+                
IntStream.of(keys).mapToObj(inputType::getTypeAt).toArray(LogicalType[]::new);
+        GeneratedRecordComparator rowComparator =
+                ComparatorCodeGenerator.gen(
+                        config,
+                        "StreamExecSortComparator",
+                        keys,
+                        sortKeyTypes,
+                        orders,
+                        nullsIsLast);
+        StreamSortOperator sortOperator =
+                new StreamSortOperator(InternalTypeInfo.of(inputType), 
rowComparator);
+        Transformation<RowData> input =
+                (Transformation<RowData>) 
getInputNodes().get(0).translateToPlan(planner);
+
+        // as input node is singleton exchange, its parallelism is 1.

Review comment:
       this line should be moved closed to `if (inputsContainSingleton())`

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalSort.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector;
+import org.apache.flink.table.runtime.operators.sort.ProcTimeSortOperator;
+import org.apache.flink.table.runtime.operators.sort.RowTimeSortOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.IntStream;
+
+/** Stream exec RelNode for time-ascending-order Sort without `limit`. */
+public class StreamExecTemporalSort extends ExecNodeBase<RowData>
+        implements StreamExecNode<RowData> {
+
+    private final int[] keys;
+    private final boolean[] orders;
+    private final boolean[] nullsIsLast;
+
+    public StreamExecTemporalSort(
+            int[] keys,
+            boolean[] orders,
+            boolean[] nullIsLast,
+            ExecEdge inputEdges,
+            LogicalType outputType,
+            String description) {
+        super(Collections.singletonList(inputEdges), outputType, description);
+        this.keys = keys;
+        this.orders = orders;
+        this.nullsIsLast = nullIsLast;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase 
planner) {
+        TableConfig config = planner.getTableConfig();
+        RowType inputType = (RowType) getInputNodes().get(0).getOutputType();
+        // time ordering needs to be ascending
+        if (orders.length == 0 || !orders[0]) {
+            throw new TableException(
+                    "Sort: Primary sort order of a streaming table must be 
ascending on time.\n"
+                            + "please re-check sort statement according to the 
description above");
+        }
+        LogicalType timeType = inputType.getTypeAt(keys[0]);
+        Transformation<RowData> input =
+                (Transformation<RowData>) 
getInputNodes().get(0).translateToPlan(planner);
+
+        if (timeType instanceof TimestampType) {
+            TimestampType keyType = (TimestampType) timeType;
+            if (keyType.getKind() == TimestampKind.ROWTIME) {
+                return createSortRowTime(inputType, input, config);
+            } else if (keyType.getKind() == TimestampKind.PROCTIME) {
+                return createSortProcTime(inputType, input, config);
+            } else {
+                throw new TableException(
+                        "Sort: Internal Error\n"
+                                + "Normally, this happens unlikely. please 
contact customer support for this");
+            }
+        } else {
+            throw new TableException(
+                    "Sort: Internal Error\n"
+                            + "Normally, this happens unlikely. please contact 
customer support for this");
+        }

Review comment:
       merge them into one? and improve the exception message. There is no 
useful info in the current message.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortRule.scala
##########
@@ -68,16 +68,16 @@ class BatchExecSortRule extends ConverterRule(
       .replace(FlinkConventions.BATCH_PHYSICAL)
 
     val newInput = RelOptRule.convert(input, requiredTraitSet)
-    new BatchExecSort(
+    new BatchPhysicalSort(
       sort.getCluster,
       providedTraitSet,
       newInput,
       sort.getCollation)
   }
 }
 
-object BatchExecSortRule {
-  val INSTANCE: RelOptRule = new BatchExecSortRule
+object BatchPhysicalSortRule {
+  val INSTANCE: RelOptRule = new BatchPhysicalSortRule

Review comment:
       rename `TABLE_EXEC_SORT_RANGE_ENABLED ` to 
`TABLE_EXEC_RANGE_SORT_ENABLED`

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalSort.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector;
+import org.apache.flink.table.runtime.operators.sort.ProcTimeSortOperator;
+import org.apache.flink.table.runtime.operators.sort.RowTimeSortOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.IntStream;
+
+/** Stream exec RelNode for time-ascending-order Sort without `limit`. */
+public class StreamExecTemporalSort extends ExecNodeBase<RowData>
+        implements StreamExecNode<RowData> {
+
+    private final int[] keys;
+    private final boolean[] orders;
+    private final boolean[] nullsIsLast;
+
+    public StreamExecTemporalSort(
+            int[] keys,
+            boolean[] orders,
+            boolean[] nullIsLast,
+            ExecEdge inputEdges,
+            LogicalType outputType,
+            String description) {
+        super(Collections.singletonList(inputEdges), outputType, description);
+        this.keys = keys;
+        this.orders = orders;
+        this.nullsIsLast = nullIsLast;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase 
planner) {
+        TableConfig config = planner.getTableConfig();
+        RowType inputType = (RowType) getInputNodes().get(0).getOutputType();
+        // time ordering needs to be ascending
+        if (orders.length == 0 || !orders[0]) {
+            throw new TableException(
+                    "Sort: Primary sort order of a streaming table must be 
ascending on time.\n"
+                            + "please re-check sort statement according to the 
description above");
+        }
+        LogicalType timeType = inputType.getTypeAt(keys[0]);
+        Transformation<RowData> input =
+                (Transformation<RowData>) 
getInputNodes().get(0).translateToPlan(planner);
+
+        if (timeType instanceof TimestampType) {
+            TimestampType keyType = (TimestampType) timeType;
+            if (keyType.getKind() == TimestampKind.ROWTIME) {
+                return createSortRowTime(inputType, input, config);
+            } else if (keyType.getKind() == TimestampKind.PROCTIME) {
+                return createSortProcTime(inputType, input, config);
+            } else {
+                throw new TableException(
+                        "Sort: Internal Error\n"
+                                + "Normally, this happens unlikely. please 
contact customer support for this");
+            }
+        } else {
+            throw new TableException(
+                    "Sort: Internal Error\n"
+                            + "Normally, this happens unlikely. please contact 
customer support for this");
+        }
+    }
+
+    /** Create Sort logic based on processing time. */
+    private Transformation<RowData> createSortProcTime(
+            RowType inputType, Transformation<RowData> input, TableConfig 
tableConfig) {
+        // if the order has secondary sorting fields in addition to the 
proctime
+        if (keys.length > 1) {
+            // sort code gen
+            LogicalType[] sortKeyTypes =
+                    
IntStream.of(keys).mapToObj(inputType::getTypeAt).toArray(LogicalType[]::new);
+            GeneratedRecordComparator rowComparator =
+                    ComparatorCodeGenerator.gen(
+                            tableConfig,
+                            "ProcTimeSortComparator",
+                            Arrays.copyOfRange(keys, 1, keys.length),
+                            Arrays.copyOfRange(sortKeyTypes, 1, keys.length),
+                            Arrays.copyOfRange(orders, 1, keys.length),
+                            Arrays.copyOfRange(nullsIsLast, 1, keys.length));

Review comment:
       add some comments to explain these code ?
   
   There is one comment `strip off time collation` to explain it in old scala 
class 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSort.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.runtime.operators.sort.SortOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.stream.IntStream;
+
+/**
+ * Batch {@link ExecNode} for Sort.

Review comment:
       Batch {@link ExecNode} for Sort without limit ?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSortLimit.scala
##########
@@ -113,35 +104,19 @@ class BatchExecSortLimit(
 
   //~ ExecNode methods 
-----------------------------------------------------------

Review comment:
       remove it

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalTemporalSort.scala
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.stream
+
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalSort
+import org.apache.flink.table.planner.plan.utils.{RelExplainUtil, SortUtil}
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.core.Sort
+import org.apache.calcite.rex.RexNode
+
+import scala.collection.JavaConversions._
+
+/**
+  * Stream physical RelNode for time-ascending-order [[Sort]] without `limit`.
+  *
+  * @see [[StreamPhysicalRank]] which must be with `limit` order by.
+  * @see [[StreamPhysicalSort]] which can be used for testing now, its sort 
key can be any type.
+  */
+class StreamPhysicalTemporalSort(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputRel: RelNode,
+    sortCollation: RelCollation)
+  extends Sort(cluster, traitSet, inputRel, sortCollation)
+  with StreamPhysicalRel {
+
+  override def requireWatermark: Boolean = false
+
+  override def copy(
+      traitSet: RelTraitSet,
+      input: RelNode,
+      newCollation: RelCollation,
+      offset: RexNode,
+      fetch: RexNode): Sort = {
+    new StreamPhysicalTemporalSort(cluster, traitSet, input, newCollation)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    pw.input("input", getInput())
+      .item("orderBy", RelExplainUtil.collationToString(sortCollation, 
getRowType))
+  }
+
+  //~ ExecNode methods 
-----------------------------------------------------------
+

Review comment:
       remove it

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalSort.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector;
+import org.apache.flink.table.runtime.operators.sort.ProcTimeSortOperator;
+import org.apache.flink.table.runtime.operators.sort.RowTimeSortOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.IntStream;
+
+/** Stream exec RelNode for time-ascending-order Sort without `limit`. */
+public class StreamExecTemporalSort extends ExecNodeBase<RowData>
+        implements StreamExecNode<RowData> {
+
+    private final int[] keys;
+    private final boolean[] orders;
+    private final boolean[] nullsIsLast;
+
+    public StreamExecTemporalSort(
+            int[] keys,
+            boolean[] orders,
+            boolean[] nullIsLast,
+            ExecEdge inputEdges,
+            LogicalType outputType,
+            String description) {
+        super(Collections.singletonList(inputEdges), outputType, description);
+        this.keys = keys;
+        this.orders = orders;
+        this.nullsIsLast = nullIsLast;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase 
planner) {
+        TableConfig config = planner.getTableConfig();
+        RowType inputType = (RowType) getInputNodes().get(0).getOutputType();
+        // time ordering needs to be ascending
+        if (orders.length == 0 || !orders[0]) {
+            throw new TableException(
+                    "Sort: Primary sort order of a streaming table must be 
ascending on time.\n"
+                            + "please re-check sort statement according to the 
description above");
+        }
+        LogicalType timeType = inputType.getTypeAt(keys[0]);
+        Transformation<RowData> input =
+                (Transformation<RowData>) 
getInputNodes().get(0).translateToPlan(planner);
+
+        if (timeType instanceof TimestampType) {
+            TimestampType keyType = (TimestampType) timeType;
+            if (keyType.getKind() == TimestampKind.ROWTIME) {
+                return createSortRowTime(inputType, input, config);
+            } else if (keyType.getKind() == TimestampKind.PROCTIME) {
+                return createSortProcTime(inputType, input, config);
+            } else {
+                throw new TableException(
+                        "Sort: Internal Error\n"
+                                + "Normally, this happens unlikely. please 
contact customer support for this");
+            }
+        } else {
+            throw new TableException(
+                    "Sort: Internal Error\n"
+                            + "Normally, this happens unlikely. please contact 
customer support for this");
+        }
+    }
+
+    /** Create Sort logic based on processing time. */
+    private Transformation<RowData> createSortProcTime(
+            RowType inputType, Transformation<RowData> input, TableConfig 
tableConfig) {
+        // if the order has secondary sorting fields in addition to the 
proctime
+        if (keys.length > 1) {
+            // sort code gen
+            LogicalType[] sortKeyTypes =
+                    
IntStream.of(keys).mapToObj(inputType::getTypeAt).toArray(LogicalType[]::new);
+            GeneratedRecordComparator rowComparator =
+                    ComparatorCodeGenerator.gen(
+                            tableConfig,
+                            "ProcTimeSortComparator",
+                            Arrays.copyOfRange(keys, 1, keys.length),
+                            Arrays.copyOfRange(sortKeyTypes, 1, keys.length),
+                            Arrays.copyOfRange(orders, 1, keys.length),
+                            Arrays.copyOfRange(nullsIsLast, 1, keys.length));
+            ProcTimeSortOperator sortOperator =
+                    new ProcTimeSortOperator(InternalTypeInfo.of(inputType), 
rowComparator);
+
+            // as input node is singleton exchange, its parallelism is 1.

Review comment:
       we need to add 
   ```
    if (inputsContainSingleton()) {
               transform.setParallelism(1);
               transform.setMaxParallelism(1);
           }
   ```
   

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalSort.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector;
+import org.apache.flink.table.runtime.operators.sort.ProcTimeSortOperator;
+import org.apache.flink.table.runtime.operators.sort.RowTimeSortOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.IntStream;
+
+/** Stream exec RelNode for time-ascending-order Sort without `limit`. */
+public class StreamExecTemporalSort extends ExecNodeBase<RowData>
+        implements StreamExecNode<RowData> {
+
+    private final int[] keys;
+    private final boolean[] orders;
+    private final boolean[] nullsIsLast;
+
+    public StreamExecTemporalSort(
+            int[] keys,
+            boolean[] orders,
+            boolean[] nullIsLast,
+            ExecEdge inputEdges,
+            LogicalType outputType,
+            String description) {
+        super(Collections.singletonList(inputEdges), outputType, description);
+        this.keys = keys;
+        this.orders = orders;
+        this.nullsIsLast = nullIsLast;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase 
planner) {
+        TableConfig config = planner.getTableConfig();
+        RowType inputType = (RowType) getInputNodes().get(0).getOutputType();
+        // time ordering needs to be ascending
+        if (orders.length == 0 || !orders[0]) {
+            throw new TableException(
+                    "Sort: Primary sort order of a streaming table must be 
ascending on time.\n"
+                            + "please re-check sort statement according to the 
description above");
+        }
+        LogicalType timeType = inputType.getTypeAt(keys[0]);
+        Transformation<RowData> input =
+                (Transformation<RowData>) 
getInputNodes().get(0).translateToPlan(planner);
+
+        if (timeType instanceof TimestampType) {
+            TimestampType keyType = (TimestampType) timeType;
+            if (keyType.getKind() == TimestampKind.ROWTIME) {
+                return createSortRowTime(inputType, input, config);
+            } else if (keyType.getKind() == TimestampKind.PROCTIME) {
+                return createSortProcTime(inputType, input, config);
+            } else {
+                throw new TableException(
+                        "Sort: Internal Error\n"
+                                + "Normally, this happens unlikely. please 
contact customer support for this");
+            }
+        } else {
+            throw new TableException(
+                    "Sort: Internal Error\n"
+                            + "Normally, this happens unlikely. please contact 
customer support for this");
+        }
+    }
+
+    /** Create Sort logic based on processing time. */
+    private Transformation<RowData> createSortProcTime(
+            RowType inputType, Transformation<RowData> input, TableConfig 
tableConfig) {
+        // if the order has secondary sorting fields in addition to the 
proctime
+        if (keys.length > 1) {
+            // sort code gen
+            LogicalType[] sortKeyTypes =
+                    
IntStream.of(keys).mapToObj(inputType::getTypeAt).toArray(LogicalType[]::new);
+            GeneratedRecordComparator rowComparator =
+                    ComparatorCodeGenerator.gen(
+                            tableConfig,
+                            "ProcTimeSortComparator",
+                            Arrays.copyOfRange(keys, 1, keys.length),
+                            Arrays.copyOfRange(sortKeyTypes, 1, keys.length),
+                            Arrays.copyOfRange(orders, 1, keys.length),
+                            Arrays.copyOfRange(nullsIsLast, 1, keys.length));
+            ProcTimeSortOperator sortOperator =
+                    new ProcTimeSortOperator(InternalTypeInfo.of(inputType), 
rowComparator);
+
+            // as input node is singleton exchange, its parallelism is 1.
+            OneInputTransformation<RowData, RowData> ret =
+                    new OneInputTransformation<>(
+                            input,
+                            getDesc(),
+                            sortOperator,
+                            InternalTypeInfo.of(inputType),
+                            input.getParallelism());
+
+            EmptyRowDataKeySelector selector = 
EmptyRowDataKeySelector.INSTANCE;
+            ret.setStateKeySelector(selector);
+            ret.setStateKeyType(selector.getProducedType());
+            return ret;
+        } else {
+            // if the order is done only on proctime we only need to forward 
the elements
+            return input;
+        }
+    }
+
+    /** Create Sort logic based on row time. */
+    private Transformation<RowData> createSortRowTime(
+            RowType inputType, Transformation<RowData> input, TableConfig 
tableConfig) {
+        int rowTimeIdx = keys[0];
+        GeneratedRecordComparator rowComparator = null;
+        if (keys.length > 1) {
+            // comparator code gen
+            LogicalType[] sortKeyTypes =
+                    
IntStream.of(keys).mapToObj(inputType::getTypeAt).toArray(LogicalType[]::new);
+            rowComparator =
+                    ComparatorCodeGenerator.gen(
+                            tableConfig,
+                            "RowTimeSortComparator",
+                            Arrays.copyOfRange(keys, 1, keys.length),
+                            Arrays.copyOfRange(sortKeyTypes, 1, keys.length),
+                            Arrays.copyOfRange(orders, 1, keys.length),
+                            Arrays.copyOfRange(nullsIsLast, 1, keys.length));

Review comment:
       ditto

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortLimit.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.operators.sort.SortLimitOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.stream.IntStream;
+
+/**
+ * Batch {@link ExecNode} for Sort.

Review comment:
       Batch {@link ExecNode} for Sort with limit.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalTemporalSort.scala
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.stream
+
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalSort
+import org.apache.flink.table.planner.plan.utils.{RelExplainUtil, SortUtil}
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.core.Sort
+import org.apache.calcite.rex.RexNode
+
+import scala.collection.JavaConversions._
+
+/**
+  * Stream physical RelNode for time-ascending-order [[Sort]] without `limit`.
+  *
+  * @see [[StreamPhysicalRank]] which must be with `limit` order by.
+  * @see [[StreamPhysicalSort]] which can be used for testing now, its sort 
key can be any type.
+  */
+class StreamPhysicalTemporalSort(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputRel: RelNode,
+    sortCollation: RelCollation)
+  extends Sort(cluster, traitSet, inputRel, sortCollation)
+  with StreamPhysicalRel {
+
+  override def requireWatermark: Boolean = false
+
+  override def copy(
+      traitSet: RelTraitSet,
+      input: RelNode,
+      newCollation: RelCollation,
+      offset: RexNode,
+      fetch: RexNode): Sort = {
+    new StreamPhysicalTemporalSort(cluster, traitSet, input, newCollation)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    pw.input("input", getInput())
+      .item("orderBy", RelExplainUtil.collationToString(sortCollation, 
getRowType))
+  }
+
+  //~ ExecNode methods 
-----------------------------------------------------------
+
+  override def translateToExecNode(): ExecNode[_] = {
+    // strip off time collation

Review comment:
       wrong comments

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSortLimit.scala
##########
@@ -113,35 +104,19 @@ class BatchExecSortLimit(
 
   //~ ExecNode methods 
-----------------------------------------------------------
 
-  override def getInputEdges: util.List[ExecEdge] = List(
-    ExecEdge.builder()
-      .damBehavior(ExecEdge.DamBehavior.END_INPUT)
-      .build())
-
-  override protected def translateToPlanInternal(
-      planner: BatchPlanner): Transformation[RowData] = {
-    if (limitEnd == Long.MaxValue) {
-      throw new TableException("Not support limitEnd is max value now!")
-    }
-
-    val input = getInputNodes.get(0).translateToPlan(planner)
-        .asInstanceOf[Transformation[RowData]]
-    val inputType = input.getOutputType.asInstanceOf[InternalTypeInfo[RowData]]
-    val types = inputType.toRowFieldTypes
-
-    // generate comparator
-    val genComparator = ComparatorCodeGenerator.gen(
-      planner.getTableConfig, "SortLimitComparator", keys, keys.map(types(_)), 
orders, nullsIsLast)
-
-    // TODO If input is ordered, there is no need to use the heap.
-    val operator = new SortLimitOperator(isGlobal, limitStart, limitEnd, 
genComparator)
-
-    ExecNodeUtil.createOneInputTransformation(
-      input,
-      getRelDetailedDescription,
-      SimpleOperatorFactory.of(operator),
-      inputType,
-      input.getParallelism,
-      0)
+  override def translateToExecNode(): ExecNode[_] = {
+    new BatchExecSortLimit(

Review comment:
       move `private val (keys, orders, nullsIsLast) = 
SortUtil.getKeysAndOrders(
       sortCollation.getFieldCollations)` into this method




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to