libenchao commented on code in PR #24567: URL: https://github.com/apache/flink/pull/24567#discussion_r1554809730
########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala: ########## @@ -438,9 +438,36 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu rel: Window, mq: RelMetadataQuery, ignoreNulls: Boolean): JSet[ImmutableBitSet] = { + if (rel.groups.length == 1) { + val group = rel.groups.get(0) + getUniqueKeysOfWindowGroup(group, rel) match { + case Some(uniqueKeys) => + val retSet = new JHashSet[ImmutableBitSet] + retSet.add(uniqueKeys) + val inputKeys = mq.getUniqueKeys(rel.getInput, ignoreNulls) + if (inputKeys != null && inputKeys.nonEmpty) { + inputKeys.foreach(uniqueKey => retSet.add(uniqueKey)) + } + return retSet + case _ => + } + } getUniqueKeysOfOverAgg(rel, mq, ignoreNulls) } + def getUniqueKeysOfWindowGroup(group: Window.Group, rel: Window): Option[ImmutableBitSet] = { + // If it's a ROW_NUMBER window, then the unique keys are partition by key and row number. + val aggCalls = group.aggCalls + if ( + aggCalls.length == 1 && aggCalls.get(0).getOperator.equals(SqlStdOperatorTable.ROW_NUMBER) Review Comment: `aggCalls` could be more than one, and any of the aggCalls is `ROW_NUMBER` would make it a unique key? ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala: ########## @@ -438,9 +438,36 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu rel: Window, mq: RelMetadataQuery, ignoreNulls: Boolean): JSet[ImmutableBitSet] = { + if (rel.groups.length == 1) { Review Comment: `groups` can be more than one, and any of the `groups` could have `ROW_NUMBER`, and all of them are valid unique key candidates? ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala: ########## @@ -186,6 +187,25 @@ class FlinkRelMdUpsertKeys private extends MetadataHandler[UpsertKeys] { } def getUpsertKeys(rel: Window, mq: RelMetadataQuery): JSet[ImmutableBitSet] = { + // If it's a ROW_NUMBER Rank, then the upsert keys are partition by key and row number. + if (rel.groups.length == 1) { Review Comment: Again, limiting the number of `groups` to 1 is weird. If in streaming, we can only support one `group` for now (I'm not 100 percent sure about it), you should at least add a comment about it for future maintainability. ########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala: ########## @@ -172,6 +172,8 @@ class FlinkRelMdUniqueKeysTest extends FlinkRelMdHandlerTestBase { @Test def testGetUniqueKeysOnRank(): Unit = { + assertEquals(uniqueKeys(Array(7), Array(0)), mq.getUniqueKeys(logicalWindow).toSet) Review Comment: Isn't this being tested below? ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala: ########## @@ -328,6 +329,11 @@ object RankUtil { } } + def getRankNumberColumnIndex(window: Window): Int = { Review Comment: It would be good to check if the window function is `ROW_NUMBER` in case of misusage. ########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniquenessTest.scala: ########## @@ -193,26 +193,26 @@ class FlinkRelMdColumnUniquenessTest extends FlinkRelMdHandlerTestBase { @Test def testAreColumnsUniqueOnRank(): Unit = { - Array( - logicalRank, - flinkLogicalRank, - batchLocalRank, - batchGlobalRank, - streamRank, - logicalRankWithVariableRange, - flinkLogicalRankWithVariableRange, - streamRankWithVariableRange - ) - .foreach { - rank => - assertTrue(mq.areColumnsUnique(rank, ImmutableBitSet.of(0))) - (1 until rank.getRowType.getFieldCount).foreach { - idx => assertFalse(mq.areColumnsUnique(rank, ImmutableBitSet.of(idx))) - } - assertTrue(mq.areColumnsUnique(rank, ImmutableBitSet.of(0, 1))) - assertTrue(mq.areColumnsUnique(rank, ImmutableBitSet.of(0, 2))) - assertFalse(mq.areColumnsUnique(rank, ImmutableBitSet.of(1, 2))) - } +// Array( Review Comment: Why this is commented out? ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala: ########## @@ -186,6 +187,25 @@ class FlinkRelMdUpsertKeys private extends MetadataHandler[UpsertKeys] { } def getUpsertKeys(rel: Window, mq: RelMetadataQuery): JSet[ImmutableBitSet] = { + // If it's a ROW_NUMBER Rank, then the upsert keys are partition by key and row number. + if (rel.groups.length == 1) { + val group = rel.groups.get(0) + FlinkRelMdUniqueKeys.INSTANCE.getUniqueKeysOfWindowGroup(group, rel) match { + case Some(uniqueKeys) => + val inputKeys = filterKeys( + FlinkRelMetadataQuery + .reuseOrCreate(mq) + .getUpsertKeys(rel.getInput), + group.keys) + val retSet = new JHashSet[ImmutableBitSet] + retSet.add(uniqueKeys) + if (inputKeys != null && inputKeys.nonEmpty) { + inputKeys.foreach(uniqueKey => retSet.add(uniqueKey)) Review Comment: `uniqueKey` is better renamed as `upsertKey` ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala: ########## @@ -186,6 +187,25 @@ class FlinkRelMdUpsertKeys private extends MetadataHandler[UpsertKeys] { } def getUpsertKeys(rel: Window, mq: RelMetadataQuery): JSet[ImmutableBitSet] = { + // If it's a ROW_NUMBER Rank, then the upsert keys are partition by key and row number. + if (rel.groups.length == 1) { + val group = rel.groups.get(0) + FlinkRelMdUniqueKeys.INSTANCE.getUniqueKeysOfWindowGroup(group, rel) match { + case Some(uniqueKeys) => + val inputKeys = filterKeys( + FlinkRelMetadataQuery + .reuseOrCreate(mq) + .getUpsertKeys(rel.getInput), + group.keys) Review Comment: This is duplicated with `getUpsertKeysOnOver`. Since you always call `getUpsertKeysOnOver` to get upsert keys derived from child, you can put this at the beginning of the method, and use the result in various places. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org