libenchao commented on code in PR #24567:
URL: https://github.com/apache/flink/pull/24567#discussion_r1554809730


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala:
##########
@@ -438,9 +438,36 @@ class FlinkRelMdUniqueKeys private extends 
MetadataHandler[BuiltInMetadata.Uniqu
       rel: Window,
       mq: RelMetadataQuery,
       ignoreNulls: Boolean): JSet[ImmutableBitSet] = {
+    if (rel.groups.length == 1) {
+      val group = rel.groups.get(0)
+      getUniqueKeysOfWindowGroup(group, rel) match {
+        case Some(uniqueKeys) =>
+          val retSet = new JHashSet[ImmutableBitSet]
+          retSet.add(uniqueKeys)
+          val inputKeys = mq.getUniqueKeys(rel.getInput, ignoreNulls)
+          if (inputKeys != null && inputKeys.nonEmpty) {
+            inputKeys.foreach(uniqueKey => retSet.add(uniqueKey))
+          }
+          return retSet
+        case _ =>
+      }
+    }
     getUniqueKeysOfOverAgg(rel, mq, ignoreNulls)
   }
 
+  def getUniqueKeysOfWindowGroup(group: Window.Group, rel: Window): 
Option[ImmutableBitSet] = {
+    // If it's a ROW_NUMBER window, then the unique keys are partition by key 
and row number.
+    val aggCalls = group.aggCalls
+    if (
+      aggCalls.length == 1 && 
aggCalls.get(0).getOperator.equals(SqlStdOperatorTable.ROW_NUMBER)

Review Comment:
   `aggCalls` could be more than one, and any of the aggCalls is `ROW_NUMBER` 
would make it a unique key?



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala:
##########
@@ -438,9 +438,36 @@ class FlinkRelMdUniqueKeys private extends 
MetadataHandler[BuiltInMetadata.Uniqu
       rel: Window,
       mq: RelMetadataQuery,
       ignoreNulls: Boolean): JSet[ImmutableBitSet] = {
+    if (rel.groups.length == 1) {

Review Comment:
   `groups` can be more than one, and any of the `groups` could have 
`ROW_NUMBER`, and all of them are valid unique key candidates?



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala:
##########
@@ -186,6 +187,25 @@ class FlinkRelMdUpsertKeys private extends 
MetadataHandler[UpsertKeys] {
   }
 
   def getUpsertKeys(rel: Window, mq: RelMetadataQuery): JSet[ImmutableBitSet] 
= {
+    // If it's a ROW_NUMBER Rank, then the upsert keys are partition by key 
and row number.
+    if (rel.groups.length == 1) {

Review Comment:
   Again, limiting the number of `groups` to 1 is weird. If in streaming, we 
can only support one `group` for now (I'm not 100 percent sure about it), you 
should at least add a comment about it for future maintainability.



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala:
##########
@@ -172,6 +172,8 @@ class FlinkRelMdUniqueKeysTest extends 
FlinkRelMdHandlerTestBase {
 
   @Test
   def testGetUniqueKeysOnRank(): Unit = {
+    assertEquals(uniqueKeys(Array(7), Array(0)), 
mq.getUniqueKeys(logicalWindow).toSet)

Review Comment:
   Isn't this being tested below?



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala:
##########
@@ -328,6 +329,11 @@ object RankUtil {
     }
   }
 
+  def getRankNumberColumnIndex(window: Window): Int = {

Review Comment:
   It would be good to check if the window function is `ROW_NUMBER` in case of 
misusage.



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniquenessTest.scala:
##########
@@ -193,26 +193,26 @@ class FlinkRelMdColumnUniquenessTest extends 
FlinkRelMdHandlerTestBase {
 
   @Test
   def testAreColumnsUniqueOnRank(): Unit = {
-    Array(
-      logicalRank,
-      flinkLogicalRank,
-      batchLocalRank,
-      batchGlobalRank,
-      streamRank,
-      logicalRankWithVariableRange,
-      flinkLogicalRankWithVariableRange,
-      streamRankWithVariableRange
-    )
-      .foreach {
-        rank =>
-          assertTrue(mq.areColumnsUnique(rank, ImmutableBitSet.of(0)))
-          (1 until rank.getRowType.getFieldCount).foreach {
-            idx => assertFalse(mq.areColumnsUnique(rank, 
ImmutableBitSet.of(idx)))
-          }
-          assertTrue(mq.areColumnsUnique(rank, ImmutableBitSet.of(0, 1)))
-          assertTrue(mq.areColumnsUnique(rank, ImmutableBitSet.of(0, 2)))
-          assertFalse(mq.areColumnsUnique(rank, ImmutableBitSet.of(1, 2)))
-      }
+//    Array(

Review Comment:
   Why this is commented out?



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala:
##########
@@ -186,6 +187,25 @@ class FlinkRelMdUpsertKeys private extends 
MetadataHandler[UpsertKeys] {
   }
 
   def getUpsertKeys(rel: Window, mq: RelMetadataQuery): JSet[ImmutableBitSet] 
= {
+    // If it's a ROW_NUMBER Rank, then the upsert keys are partition by key 
and row number.
+    if (rel.groups.length == 1) {
+      val group = rel.groups.get(0)
+      FlinkRelMdUniqueKeys.INSTANCE.getUniqueKeysOfWindowGroup(group, rel) 
match {
+        case Some(uniqueKeys) =>
+          val inputKeys = filterKeys(
+            FlinkRelMetadataQuery
+              .reuseOrCreate(mq)
+              .getUpsertKeys(rel.getInput),
+            group.keys)
+          val retSet = new JHashSet[ImmutableBitSet]
+          retSet.add(uniqueKeys)
+          if (inputKeys != null && inputKeys.nonEmpty) {
+            inputKeys.foreach(uniqueKey => retSet.add(uniqueKey))

Review Comment:
   `uniqueKey` is better renamed as `upsertKey`



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala:
##########
@@ -186,6 +187,25 @@ class FlinkRelMdUpsertKeys private extends 
MetadataHandler[UpsertKeys] {
   }
 
   def getUpsertKeys(rel: Window, mq: RelMetadataQuery): JSet[ImmutableBitSet] 
= {
+    // If it's a ROW_NUMBER Rank, then the upsert keys are partition by key 
and row number.
+    if (rel.groups.length == 1) {
+      val group = rel.groups.get(0)
+      FlinkRelMdUniqueKeys.INSTANCE.getUniqueKeysOfWindowGroup(group, rel) 
match {
+        case Some(uniqueKeys) =>
+          val inputKeys = filterKeys(
+            FlinkRelMetadataQuery
+              .reuseOrCreate(mq)
+              .getUpsertKeys(rel.getInput),
+            group.keys)

Review Comment:
   This is duplicated with `getUpsertKeysOnOver`. Since you always call 
`getUpsertKeysOnOver` to get upsert keys derived from child, you can put this 
at the beginning of the method, and use the result in various places.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to