JingsongLi commented on a change in pull request #17605:
URL: https://github.com/apache/flink/pull/17605#discussion_r741566910



##########
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
##########
@@ -75,6 +77,13 @@
     private final InternalTypeInfo<RowData> rowKeyType;
     private final long cacheSize;
 
+    // flag to skip records with non-exist error instead to fail, true by 
default.
+    private final boolean lenient = true;
+
+    // data converter for logging only.
+    private final DataStructureConverter rowConverter;

Review comment:
       This will break compatibility. We can just introduce two transient 
fields.

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala
##########
@@ -220,4 +221,75 @@ class RankHarnessTest(mode: StateBackendMode) extends 
HarnessTestBase(mode) {
     assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, 
result)
     testHarness.close()
   }
+
+  @Test
+  def testUpdateRankWithRowNumber(): Unit = {
+    val data = new mutable.MutableList[(String, Int, Int)]
+    val t = env.fromCollection(data).toTable(tEnv, 'word, 'cnt, 'type)
+    tEnv.createTemporaryView("T", t)
+    tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(1))
+
+    val sql =
+      """
+        |SELECT word, cnt, rank_num
+        |FROM (
+        |  SELECT word, cnt,
+        |      ROW_NUMBER() OVER (PARTITION BY type ORDER BY cnt DESC) as 
rank_num
+        |  FROM (
+        |     select word, type, sum(cnt) filter (where cnt > 0) cnt from T 
group by word, type
+        |   )
+        |  )
+        |WHERE rank_num <= 6
+      """.stripMargin
+
+    val t1 = tEnv.sqlQuery(sql)
+
+    val testHarness = createHarnessTester(
+      t1.toRetractStream[Row],
+      "Rank(strategy=[UpdateFastStrategy")
+    val assertor = new RowDataHarnessAssertor(
+      Array(
+        DataTypes.STRING().getLogicalType,
+        DataTypes.INT().getLogicalType,
+        DataTypes.INT().getLogicalType,
+        DataTypes.BIGINT().getLogicalType))
+
+    testHarness.open()
+
+    testHarness.processElement(binaryRecord(INSERT, "a", 1: JInt, 100: JInt))
+    testHarness.processElement(binaryRecord(INSERT, "b", 1: JInt, 90: JInt))
+    testHarness.processElement(binaryRecord(INSERT, "c", 1: JInt, 90: JInt))
+    testHarness.processElement(binaryRecord(INSERT, "d", 1: JInt, 80: JInt))
+    testHarness.processElement(binaryRecord(INSERT, "e", 1: JInt, 80: JInt))
+    testHarness.processElement(binaryRecord(INSERT, "f", 1: JInt, 70: JInt))
+
+    testHarness.processElement(binaryRecord(UPDATE_AFTER, "b", 1: JInt, 10: 
JInt))
+

Review comment:
       Just left one line is OK




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to