[ https://issues.apache.org/jira/browse/FLINK-34702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17883911#comment-17883911 ]
lincoln lee commented on FLINK-34702: ------------------------------------- Thank you guys bringing this up again! First of all, I think we can all agree that deduplicate is a special case of rank 2nd, the options we've discussed so far are not ideal(whether it's physicaldedup -> exec or logical -> physical dedup, to do something special to change the conversion) Going back to the problem itself, something new come to my mind: maybe we should consider removing the physical dedup node, and instead just keep the physical rank node(because dedup is just one specialized execution/operator which satisfies several conditions, the input changelog mode, rank range, sort spec and other traits) I'm preparing a draft pr based on this idea, maybe take some time(probably a day or two) but I think it can completely solve the problems. > Rank should not convert to StreamExecDuplicate when the input is not insert > only > -------------------------------------------------------------------------------- > > Key: FLINK-34702 > URL: https://issues.apache.org/jira/browse/FLINK-34702 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.20.0 > Reporter: Jacky Lau > Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > > {code:java} > @Test > def testSimpleFirstRowOnBuiltinProctime1(): Unit = { > val sqlQuery = > """ > |SELECT * > |FROM ( > | SELECT *, > | ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as > rowNum > | FROM (select a, count(b) as b from MyTable group by a) > |) > |WHERE rowNum = 1 > """.stripMargin > util.verifyExecPlan(sqlQuery) > } {code} > Exception: > org.apache.flink.table.api.TableException: StreamPhysicalDeduplicate doesn't > support consuming update changes which is produced by node > GroupAggregate(groupBy=[a], select=[a, COUNT(b) AS b]) > because the StreamPhysicalDeduplicate can not consuming update changes now > while StreamExecRank can. > so we should not convert the FlinkLogicalRank to StreamPhysicalDeduplicate in > this case. and we can defer whether input contains update change in the > "optimize the physical plan" phase. > so we can add an option to solve it. and when the StreamPhysicalDeduplicate > can support consuming update changes , we can deprecate it -- This message was sent by Atlassian Jira (v8.20.10#820010)