godfreyhe commented on a change in pull request #14472:
URL: https://github.com/apache/flink/pull/14472#discussion_r548365445



##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLimit.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.runtime.operators.sort.LimitOperator;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.Collections;
+
+/**
+ * Batch {@link ExecNode} for Calc.
+ */
+public class BatchExecLimit extends ExecNodeBase<RowData> implements 
BatchExecNode<RowData> {
+
+       private boolean isGlobal;
+       private long limitStart;
+       private long limitEnd;

Review comment:
       mark them as final

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
+import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.utils.AppendFastStrategy$;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.plan.utils.RankProcessStrategy;
+import org.apache.flink.table.planner.plan.utils.RetractStrategy$;
+import org.apache.flink.table.planner.plan.utils.UpdateFastStrategy;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.rank.AbstractTopNFunction;
+import org.apache.flink.table.runtime.operators.rank.AppendOnlyTopNFunction;
+import 
org.apache.flink.table.runtime.operators.rank.ComparableRecordComparator;
+import org.apache.flink.table.runtime.operators.rank.RankRange;
+import org.apache.flink.table.runtime.operators.rank.RankType;
+import org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction;
+import org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.stream.IntStream;
+
+/**
+ * Stream {@link ExecNode} for Rank.
+ */
+public class StreamExecRank extends ExecNodeBase<RowData> implements 
StreamExecNode<RowData> {
+
+       // It is a experimental config, will may be removed later.
+       @Experimental
+       public static final ConfigOption<Long> TABLE_EXEC_TOPN_CACHE_SIZE =
+               ConfigOptions.key("table.exec.topn.cache-size")
+                       .longType()
+                       .defaultValue(10000L)
+                       .withDescription("TopN operator has a cache which 
caches partial state contents to reduce" +
+                               " state access. Cache size is the number of 
records in each TopN task.");
+
+       private RankType rankType;
+       private RankProcessStrategy rankStrategy;
+       private RankRange rankRange;
+       private boolean generateUpdateBefore;
+       private boolean outputRankNumber;
+       private int[] partitionFields;
+       private int[] sortFields;
+       private boolean[] sortDirections;
+       private boolean[] nullsIsLast;

Review comment:
       mark them as final. and I would like to suggest reorder the fields as: 
type part, partition part, order by part, range part, and other part. which 
could make read easier to understand, because it matches the order in sql 
definition.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
+import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.utils.AppendFastStrategy$;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.plan.utils.RankProcessStrategy;
+import org.apache.flink.table.planner.plan.utils.RetractStrategy$;
+import org.apache.flink.table.planner.plan.utils.UpdateFastStrategy;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.rank.AbstractTopNFunction;
+import org.apache.flink.table.runtime.operators.rank.AppendOnlyTopNFunction;
+import 
org.apache.flink.table.runtime.operators.rank.ComparableRecordComparator;
+import org.apache.flink.table.runtime.operators.rank.RankRange;
+import org.apache.flink.table.runtime.operators.rank.RankType;
+import org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction;
+import org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.stream.IntStream;
+
+/**
+ * Stream {@link ExecNode} for Rank.
+ */
+public class StreamExecRank extends ExecNodeBase<RowData> implements 
StreamExecNode<RowData> {
+
+       // It is a experimental config, will may be removed later.
+       @Experimental
+       public static final ConfigOption<Long> TABLE_EXEC_TOPN_CACHE_SIZE =
+               ConfigOptions.key("table.exec.topn.cache-size")
+                       .longType()
+                       .defaultValue(10000L)
+                       .withDescription("TopN operator has a cache which 
caches partial state contents to reduce" +
+                               " state access. Cache size is the number of 
records in each TopN task.");
+
+       private RankType rankType;
+       private RankProcessStrategy rankStrategy;
+       private RankRange rankRange;
+       private boolean generateUpdateBefore;
+       private boolean outputRankNumber;
+       private int[] partitionFields;
+       private int[] sortFields;
+       private boolean[] sortDirections;
+       private boolean[] nullsIsLast;
+
+       public StreamExecRank(
+                       RankType rankType,
+                       RankProcessStrategy rankStrategy,
+                       RankRange rankRange,
+                       boolean generateUpdateBefore,
+                       boolean outputRankNumber,
+                       int[] partitionFields,
+                       int[] sortFields,
+                       boolean[] sortDirections,
+                       boolean[] nullsIsLast,
+                       ExecEdge inputEdge,
+                       LogicalType outputType,
+                       String description) {
+               super(Collections.singletonList(inputEdge), outputType, 
description);
+               this.rankType = rankType;
+               this.rankStrategy = rankStrategy;
+               this.rankRange = rankRange;
+               this.generateUpdateBefore = generateUpdateBefore;
+               this.outputRankNumber = outputRankNumber;
+               this.partitionFields = partitionFields;
+               this.sortFields = sortFields;
+               this.sortDirections = sortDirections;
+               this.nullsIsLast = nullsIsLast;
+       }
+
+       @Override
+       protected Transformation<RowData> translateToPlanInternal(PlannerBase 
planner) {
+               switch (rankType) {
+                       case ROW_NUMBER:
+                               break;
+                       case RANK:
+                               throw new TableException("RANK() on streaming 
table is not supported currently");
+                       case DENSE_RANK:
+                               throw new TableException("DENSE_RANK() on 
streaming table is not supported currently");
+                       default:
+                               throw new 
TableException(String.format("Streaming tables do not support %s rank 
function.", rankType));
+               }
+
+               TableConfig tableConfig = planner.getTableConfig();
+
+               RowType inputType = (RowType) 
getInputNodes().get(0).getOutputType();
+               InternalTypeInfo<RowData> inputRowTypeInfo = 
InternalTypeInfo.of(inputType);
+               RowDataKeySelector sortKeySelector = 
KeySelectorUtil.getRowDataSelector(sortFields, inputRowTypeInfo);
+               LogicalType[] sortKeyTypes = 
IntStream.of(sortFields).mapToObj(inputType::getTypeAt).toArray(LogicalType[]::new);
+               int[] sortKeyPositions = IntStream.range(0, 
sortFields.length).toArray();
+               GeneratedRecordComparator sortKeyComparator = 
ComparatorCodeGenerator.gen(
+                               tableConfig,
+                               "StreamExecSortComparator",
+                               sortKeyPositions,
+                               sortKeyTypes,
+                               sortDirections,
+                               nullsIsLast);
+               long cacheSize = 
tableConfig.getConfiguration().getLong(TABLE_EXEC_TOPN_CACHE_SIZE);
+               long minIdleStateRetentionTime = 
tableConfig.getMinIdleStateRetentionTime();
+               long maxIdleStateRetentionTime = 
tableConfig.getMaxIdleStateRetentionTime();
+
+               AbstractTopNFunction processFunction;
+               if (rankStrategy instanceof AppendFastStrategy$) {

Review comment:
       it's better we could port `AppendFastStrategy` to Java ? it's a little 
strange to see `$`

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLimit.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.runtime.operators.sort.LimitOperator;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.Collections;
+
+/**
+ * Batch {@link ExecNode} for Calc.

Review comment:
       incorrect comment

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLimit.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.utils.AppendFastStrategy$;
+import org.apache.flink.table.planner.plan.utils.RankProcessStrategy;
+import org.apache.flink.table.planner.plan.utils.RetractStrategy$;
+import org.apache.flink.table.runtime.operators.rank.ConstantRankRange;
+import org.apache.flink.table.runtime.operators.rank.RankType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+/**
+ * Stream {@link ExecNode} for Limit.
+ */
+public class StreamExecLimit extends StreamExecRank implements 
StreamExecNode<RowData> {

Review comment:
       `implements StreamExecNode<RowData>` is unnecessary

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLimit.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.runtime.operators.sort.LimitOperator;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.Collections;
+
+/**
+ * Batch {@link ExecNode} for Calc.
+ */
+public class BatchExecLimit extends ExecNodeBase<RowData> implements 
BatchExecNode<RowData> {
+
+       private boolean isGlobal;
+       private long limitStart;
+       private long limitEnd;
+
+       public BatchExecLimit(
+               boolean isGlobal,
+               long limitStart,
+               long limitEnd,
+               ExecEdge inputEdge,
+               LogicalType outputType,
+               String description) {
+               super(Collections.singletonList(inputEdge), outputType, 
description);
+               this.isGlobal = isGlobal;
+               this.limitStart = limitStart;
+               this.limitEnd = limitEnd;
+       }
+
+       @Override

Review comment:
       add `@SuppressWarnings("unchecked")` to make IDE happy

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
+import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.utils.AppendFastStrategy$;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.plan.utils.RankProcessStrategy;
+import org.apache.flink.table.planner.plan.utils.RetractStrategy$;
+import org.apache.flink.table.planner.plan.utils.UpdateFastStrategy;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.rank.AbstractTopNFunction;
+import org.apache.flink.table.runtime.operators.rank.AppendOnlyTopNFunction;
+import 
org.apache.flink.table.runtime.operators.rank.ComparableRecordComparator;
+import org.apache.flink.table.runtime.operators.rank.RankRange;
+import org.apache.flink.table.runtime.operators.rank.RankType;
+import org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction;
+import org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.stream.IntStream;
+
+/**
+ * Stream {@link ExecNode} for Rank.
+ */
+public class StreamExecRank extends ExecNodeBase<RowData> implements 
StreamExecNode<RowData> {
+
+       // It is a experimental config, will may be removed later.
+       @Experimental
+       public static final ConfigOption<Long> TABLE_EXEC_TOPN_CACHE_SIZE =
+               ConfigOptions.key("table.exec.topn.cache-size")
+                       .longType()
+                       .defaultValue(10000L)
+                       .withDescription("TopN operator has a cache which 
caches partial state contents to reduce" +
+                               " state access. Cache size is the number of 
records in each TopN task.");
+
+       private RankType rankType;
+       private RankProcessStrategy rankStrategy;
+       private RankRange rankRange;
+       private boolean generateUpdateBefore;
+       private boolean outputRankNumber;
+       private int[] partitionFields;
+       private int[] sortFields;
+       private boolean[] sortDirections;
+       private boolean[] nullsIsLast;
+
+       public StreamExecRank(
+                       RankType rankType,
+                       RankProcessStrategy rankStrategy,
+                       RankRange rankRange,
+                       boolean generateUpdateBefore,
+                       boolean outputRankNumber,
+                       int[] partitionFields,
+                       int[] sortFields,
+                       boolean[] sortDirections,
+                       boolean[] nullsIsLast,
+                       ExecEdge inputEdge,
+                       LogicalType outputType,
+                       String description) {
+               super(Collections.singletonList(inputEdge), outputType, 
description);
+               this.rankType = rankType;
+               this.rankStrategy = rankStrategy;
+               this.rankRange = rankRange;
+               this.generateUpdateBefore = generateUpdateBefore;
+               this.outputRankNumber = outputRankNumber;
+               this.partitionFields = partitionFields;
+               this.sortFields = sortFields;
+               this.sortDirections = sortDirections;
+               this.nullsIsLast = nullsIsLast;
+       }
+
+       @Override
+       protected Transformation<RowData> translateToPlanInternal(PlannerBase 
planner) {
+               switch (rankType) {
+                       case ROW_NUMBER:
+                               break;
+                       case RANK:
+                               throw new TableException("RANK() on streaming 
table is not supported currently");
+                       case DENSE_RANK:
+                               throw new TableException("DENSE_RANK() on 
streaming table is not supported currently");
+                       default:
+                               throw new 
TableException(String.format("Streaming tables do not support %s rank 
function.", rankType));
+               }
+
+               TableConfig tableConfig = planner.getTableConfig();
+
+               RowType inputType = (RowType) 
getInputNodes().get(0).getOutputType();
+               InternalTypeInfo<RowData> inputRowTypeInfo = 
InternalTypeInfo.of(inputType);
+               RowDataKeySelector sortKeySelector = 
KeySelectorUtil.getRowDataSelector(sortFields, inputRowTypeInfo);
+               LogicalType[] sortKeyTypes = 
IntStream.of(sortFields).mapToObj(inputType::getTypeAt).toArray(LogicalType[]::new);
+               int[] sortKeyPositions = IntStream.range(0, 
sortFields.length).toArray();
+               GeneratedRecordComparator sortKeyComparator = 
ComparatorCodeGenerator.gen(
+                               tableConfig,
+                               "StreamExecSortComparator",
+                               sortKeyPositions,
+                               sortKeyTypes,
+                               sortDirections,
+                               nullsIsLast);
+               long cacheSize = 
tableConfig.getConfiguration().getLong(TABLE_EXEC_TOPN_CACHE_SIZE);
+               long minIdleStateRetentionTime = 
tableConfig.getMinIdleStateRetentionTime();
+               long maxIdleStateRetentionTime = 
tableConfig.getMaxIdleStateRetentionTime();
+
+               AbstractTopNFunction processFunction;
+               if (rankStrategy instanceof AppendFastStrategy$) {
+                       processFunction = new AppendOnlyTopNFunction(
+                                       minIdleStateRetentionTime,
+                                       maxIdleStateRetentionTime,
+                                       inputRowTypeInfo,
+                                       sortKeyComparator,
+                                       sortKeySelector,
+                                       rankType,
+                                       rankRange,
+                                       generateUpdateBefore,
+                                       outputRankNumber,
+                                       cacheSize);
+               } else if (rankStrategy instanceof UpdateFastStrategy) {
+                       UpdateFastStrategy updateFastStrategy = 
(UpdateFastStrategy) rankStrategy;
+                       int[] primaryKeys = updateFastStrategy.primaryKeys();
+                       RowDataKeySelector rowKeySelector = 
KeySelectorUtil.getRowDataSelector(
+                                       primaryKeys,
+                                       inputRowTypeInfo);
+                       processFunction = new UpdatableTopNFunction(
+                                       minIdleStateRetentionTime,
+                                       maxIdleStateRetentionTime,
+                                       inputRowTypeInfo,
+                                       rowKeySelector,
+                                       sortKeyComparator,
+                                       sortKeySelector,
+                                       rankType,
+                                       rankRange,
+                                       generateUpdateBefore,
+                                       outputRankNumber,
+                                       cacheSize);
+               // TODO Use UnaryUpdateTopNFunction after SortedMapState is 
merged
+               } else if (rankStrategy instanceof RetractStrategy$) {
+                       EqualiserCodeGenerator equaliserCodeGen = new 
EqualiserCodeGenerator(
+                                       
inputType.getFields().stream().map(field -> 
field.getType()).toArray(LogicalType[]::new));
+                       GeneratedRecordEqualiser generatedEqualiser = 
equaliserCodeGen.generateRecordEqualiser("RankValueEqualiser");
+                       ComparableRecordComparator comparator = new 
ComparableRecordComparator(
+                                       sortKeyComparator,
+                                       sortKeyPositions,
+                                       sortKeyTypes,
+                                       sortDirections,
+                                       nullsIsLast);
+                       processFunction = new RetractableTopNFunction(
+                                       minIdleStateRetentionTime,
+                                       maxIdleStateRetentionTime,
+                                       inputRowTypeInfo,
+                                       comparator,
+                                       sortKeySelector,
+                                       rankType,
+                                       rankRange,
+                                       generatedEqualiser,
+                                       generateUpdateBefore,
+                                       outputRankNumber);
+               } else {
+                       throw new TableException(String.format("rank 
strategy:%s is not supported.", rankStrategy));
+               }
+
+               KeyedProcessOperator operator = new 
KeyedProcessOperator(processFunction);

Review comment:
       nit: add type parameter ?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLimit.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.utils.AppendFastStrategy$;
+import org.apache.flink.table.planner.plan.utils.RankProcessStrategy;
+import org.apache.flink.table.planner.plan.utils.RetractStrategy$;
+import org.apache.flink.table.runtime.operators.rank.ConstantRankRange;
+import org.apache.flink.table.runtime.operators.rank.RankType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+/**
+ * Stream {@link ExecNode} for Limit.
+ */
+public class StreamExecLimit extends StreamExecRank implements 
StreamExecNode<RowData> {
+
+       private final long limitEnd;
+
+       public StreamExecLimit(
+               long limitStart,
+               long limitEnd,
+               boolean generateUpdateBefore,
+               boolean insertOnly,
+               ExecEdge inputEdge,
+               LogicalType outputType,
+               String description) {
+               super(
+                       RankType.ROW_NUMBER,
+                       getRankStrategy(insertOnly),
+                       new ConstantRankRange(limitStart + 1, limitEnd),
+                       generateUpdateBefore,
+                       false,
+                       new int[0],
+                       new int[0],
+                       new boolean[0],
+                       new boolean[0],
+                       inputEdge,
+                       outputType,
+                       description);
+               this.limitEnd = limitEnd;
+       }
+
+       private static RankProcessStrategy getRankStrategy(boolean insertOnly) {
+               if (insertOnly) {
+                       return AppendFastStrategy$.MODULE$;
+               } else {
+                       return RetractStrategy$.MODULE$;

Review comment:
       port them to java?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
+import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.utils.AppendFastStrategy$;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.plan.utils.RankProcessStrategy;
+import org.apache.flink.table.planner.plan.utils.RetractStrategy$;
+import org.apache.flink.table.planner.plan.utils.UpdateFastStrategy;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.rank.AbstractTopNFunction;
+import org.apache.flink.table.runtime.operators.rank.AppendOnlyTopNFunction;
+import 
org.apache.flink.table.runtime.operators.rank.ComparableRecordComparator;
+import org.apache.flink.table.runtime.operators.rank.RankRange;
+import org.apache.flink.table.runtime.operators.rank.RankType;
+import org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction;
+import org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.stream.IntStream;
+
+/**
+ * Stream {@link ExecNode} for Rank.
+ */
+public class StreamExecRank extends ExecNodeBase<RowData> implements 
StreamExecNode<RowData> {
+
+       // It is a experimental config, will may be removed later.
+       @Experimental
+       public static final ConfigOption<Long> TABLE_EXEC_TOPN_CACHE_SIZE =
+               ConfigOptions.key("table.exec.topn.cache-size")
+                       .longType()
+                       .defaultValue(10000L)
+                       .withDescription("TopN operator has a cache which 
caches partial state contents to reduce" +
+                               " state access. Cache size is the number of 
records in each TopN task.");
+
+       private RankType rankType;
+       private RankProcessStrategy rankStrategy;
+       private RankRange rankRange;
+       private boolean generateUpdateBefore;
+       private boolean outputRankNumber;
+       private int[] partitionFields;
+       private int[] sortFields;
+       private boolean[] sortDirections;
+       private boolean[] nullsIsLast;
+
+       public StreamExecRank(
+                       RankType rankType,
+                       RankProcessStrategy rankStrategy,
+                       RankRange rankRange,
+                       boolean generateUpdateBefore,
+                       boolean outputRankNumber,
+                       int[] partitionFields,
+                       int[] sortFields,
+                       boolean[] sortDirections,
+                       boolean[] nullsIsLast,
+                       ExecEdge inputEdge,
+                       LogicalType outputType,
+                       String description) {
+               super(Collections.singletonList(inputEdge), outputType, 
description);
+               this.rankType = rankType;
+               this.rankStrategy = rankStrategy;
+               this.rankRange = rankRange;
+               this.generateUpdateBefore = generateUpdateBefore;
+               this.outputRankNumber = outputRankNumber;
+               this.partitionFields = partitionFields;
+               this.sortFields = sortFields;
+               this.sortDirections = sortDirections;
+               this.nullsIsLast = nullsIsLast;
+       }
+
+       @Override

Review comment:
       nit: add `@SuppressWarnings("unchecked")`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to