wuchong commented on a change in pull request #8109: 
[FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to 
StreamTransformation
URL: https://github.com/apache/flink/pull/8109#discussion_r275106890
 
 

 ##########
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/AbstractRankFunction.java
 ##########
 @@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.rank;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.GenericRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataformat.util.BaseRowUtil;
+import org.apache.flink.table.generated.GeneratedRecordComparator;
+import org.apache.flink.table.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.generated.RecordEqualiser;
+import 
org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
+import org.apache.flink.table.runtime.keyselector.BaseRowKeySelector;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.type.InternalTypes;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.Collector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Map;
+
+/**
+ * Base class for Rank Function.
+ */
+public abstract class AbstractRankFunction extends 
KeyedProcessFunctionWithCleanupState<BaseRow, BaseRow, BaseRow> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(AbstractRankFunction.class);
+
+       private static final String RANK_UNSUPPORTED_MSG = "RANK() on streaming 
table is not supported currently";
+
+       private static final String DENSE_RANK_UNSUPPORTED_MSG = "DENSE_RANK() 
on streaming table is not supported currently";
+
+       private static final String WITHOUT_RANK_END_UNSUPPORTED_MSG = "Rank 
end is not specified. Currently rank only support TopN, which means the rank 
end must be specified.";
+
+       // we set default topN size to 100
+       private static final long DEFAULT_TOPN_SIZE = 100;
+
+       /**
+        * The util to compare two BaseRow equals to each other.
+        * As different BaseRow can't be equals directly, we use a code 
generated util to handle this.
+        */
+       private GeneratedRecordEqualiser generatedEqualiser;
+       protected RecordEqualiser equaliser;
+
+       /**
+        * The util to compare two sortKey equals to each other.
+        */
+       private GeneratedRecordComparator generatedSortKeyComparator;
+       protected Comparator<BaseRow> sortKeyComparator;
+
+       private final boolean generateRetraction;
+       protected final boolean outputRankNumber;
+       protected final BaseRowTypeInfo inputRowType;
+       protected final KeySelector<BaseRow, BaseRow> sortKeySelector;
+
+       protected KeyContext keyContext;
+       private final boolean isConstantRankEnd;
+       private final long rankStart;
+       protected long rankEnd;
+       private final int rankEndIndex;
+       private ValueState<Long> rankEndState;
+       private Counter invalidCounter;
+       private JoinedRow outputRow;
+
+       // metrics
+       protected long hitCount = 0L;
+       protected long requestCount = 0L;
+
+       AbstractRankFunction(long minRetentionTime, long maxRetentionTime, 
BaseRowTypeInfo inputRowType,
+                       GeneratedRecordComparator generatedSortKeyComparator, 
BaseRowKeySelector sortKeySelector,
+                       RankType rankType, RankRange rankRange, 
GeneratedRecordEqualiser generatedEqualiser,
+                       boolean generateRetraction, boolean outputRankNumber) {
+               super(minRetentionTime, maxRetentionTime);
+               // TODO support RANK and DENSE_RANK
+               switch (rankType) {
+                       case ROW_NUMBER:
+                               break;
+                       case RANK:
+                               LOG.error(RANK_UNSUPPORTED_MSG);
+                               throw new 
UnsupportedOperationException(RANK_UNSUPPORTED_MSG);
+                       case DENSE_RANK:
+                               LOG.error(DENSE_RANK_UNSUPPORTED_MSG);
+                               throw new 
UnsupportedOperationException(DENSE_RANK_UNSUPPORTED_MSG);
+                       default:
+                               LOG.error("Streaming tables do not support {}", 
rankType.name());
+                               throw new 
UnsupportedOperationException("Streaming tables do not support " + 
rankType.toString());
+               }
+
+               if (rankRange instanceof ConstantRankRange) {
+                       ConstantRankRange constantRankRange = 
(ConstantRankRange) rankRange;
+                       isConstantRankEnd = true;
+                       rankStart = constantRankRange.getRankStart();
+                       rankEnd = constantRankRange.getRankEnd();
+                       rankEndIndex = -1;
+               } else if (rankRange instanceof VariableRankRange) {
+                       VariableRankRange variableRankRange = 
(VariableRankRange) rankRange;
+                       int rankEndIdx = variableRankRange.getRankEndIndex();
+                       InternalType rankEndIdxType = 
inputRowType.getInternalTypes()[rankEndIdx];
+                       if (!rankEndIdxType.equals(InternalTypes.LONG)) {
+                               LOG.error("variable rank index column must be 
long type, while input type is {}",
+                                               
rankEndIdxType.getClass().getName());
+                               throw new UnsupportedOperationException(
+                                               "variable rank index column 
must be long type, while input type is " +
+                                                               
rankEndIdxType.getClass().getName());
+                       }
+                       rankEndIndex = rankEndIdx;
+                       isConstantRankEnd = false;
+                       rankStart = -1;
+                       rankEnd = -1;
+
+               } else {
+                       LOG.error(WITHOUT_RANK_END_UNSUPPORTED_MSG);
+                       throw new 
UnsupportedOperationException(WITHOUT_RANK_END_UNSUPPORTED_MSG);
+               }
+               this.generatedEqualiser = generatedEqualiser;
+               this.generatedSortKeyComparator = generatedSortKeyComparator;
+               this.generateRetraction = generateRetraction;
+               this.inputRowType = inputRowType;
+               this.outputRankNumber = outputRankNumber;
+               this.sortKeySelector = sortKeySelector;
+       }
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+               initCleanupTimeState("RankFunctionCleanupTime");
+               outputRow = new JoinedRow();
+
+               if (!isConstantRankEnd) {
+                       ValueStateDescriptor<Long> rankStateDesc = new 
ValueStateDescriptor("rankEnd", Types.LONG);
+                       rankEndState = 
getRuntimeContext().getState(rankStateDesc);
+               }
+               // compile equaliser
+               equaliser = 
generatedEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader());
+               generatedEqualiser = null;
+               // compile comparator
+               sortKeyComparator = 
generatedSortKeyComparator.newInstance(getRuntimeContext().getUserCodeClassLoader());
+               generatedSortKeyComparator = null;
+               invalidCounter = 
getRuntimeContext().getMetricGroup().counter("topn.invalidTopSize");
+       }
+
+       /**
+        * Gets default topN size.
+        *
+        * @return default topN size
+        */
+       protected long getDefaultTopNSize() {
+               return isConstantRankEnd ? rankEnd : DEFAULT_TOPN_SIZE;
+       }
+
+       /**
+        * Initialize rank end.
+        *
+        * @param row input record
+        * @return rank end
+        * @throws Exception
+        */
+       protected long initRankEnd(BaseRow row) throws Exception {
+               if (isConstantRankEnd) {
+                       return rankEnd;
+               } else {
+                       Long rankEndValue = rankEndState.value();
+                       long curRankEnd = row.getLong(rankEndIndex);
+                       if (rankEndValue == null) {
+                               rankEnd = curRankEnd;
+                               rankEndState.update(rankEnd);
+                               return rankEnd;
+                       } else {
+                               rankEnd = rankEndValue;
+                               if (rankEnd != curRankEnd) {
+                                       // increment the invalid counter when 
the current rank end not equal to previous rank end
+                                       invalidCounter.inc();
+                               }
+                               return rankEnd;
+                       }
+               }
+       }
+
+       /**
+        * Checks whether the record should be put into the buffer.
+        *
+        * @param sortKey sortKey to test
+        * @param buffer buffer to add
+        * @return true if the record should be put into the buffer.
+        */
+       protected boolean checkSortKeyInBufferRange(BaseRow sortKey, TopNBuffer 
buffer) {
+               Comparator<BaseRow> comparator = buffer.getSortKeyComparator();
+               Map.Entry<BaseRow, Collection<BaseRow>> worstEntry = 
buffer.lastEntry();
+               if (worstEntry == null) {
+                       // return true if the buffer is empty.
+                       return true;
+               } else {
+                       BaseRow worstKey = worstEntry.getKey();
+                       int compare = comparator.compare(sortKey, worstKey);
+                       if (compare < 0) {
+                               return true;
+                       } else {
+                               return buffer.getCurrentTopNum() < 
getMaxSizeOfBuffer();
+                       }
+               }
+       }
+
+       protected void registerMetric(long heapSize) {
+               getRuntimeContext().getMetricGroup().<Double, 
Gauge<Double>>gauge(
+                               "topn.cache.hitRate",
+                               new Gauge<Double>() {
+
+                                       @Override
+                                       public Double getValue() {
+                                               return requestCount == 0 ? 1.0 :
+                                                               
Long.valueOf(hitCount).doubleValue() / requestCount;
+                                       }
+                               });
+
+               getRuntimeContext().getMetricGroup().<Long, Gauge<Long>>gauge(
+                               "topn.cache.size",
+                               new Gauge<Long>() {
 
 Review comment:
   can use lambda here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to