[ 
https://issues.apache.org/jira/browse/FLINK-34702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17883868#comment-17883868
 ] 

Jim Hughes commented on FLINK-34702:
------------------------------------

Sergey, correct me if I'm wrong, but this approach could be categorized as:

*Option 4: * Deny situations where the optimizer chooses `StreamExecDuplicate`. 
 

You've covered a number of situations in your draft PR.  I doubt it is 
exhaustive.  Even if it is complete today, in the future, things could drift.  
(If I'm missing something, let me know.)

Inverting the idea, we could have:

*Option 5:* Create an allow list of when the optimizer can choose 
`StreamExecDuplicate`.  With this approach, any failures in coverage result in 
a optimization gap rather than SQL which cannot be planned.

Sergey and I were talking about this bug offline some.  My initial idea was:

*Option 6*: Add a try-catch at the beginning of optimization, and re-try 
optimization with a ThreadLocal set indicating how/that we failed the last 
time.  This allows the optimizer to pick a Rank operator instead of the 
Deduplicate one.

After thinking about this, I have another idea:

*Option 7*: Roll the logic for Rank and Deduplicate into one operator.  Have 
the FlinkChangelogModeInferenceProgram configure the changelog mode for the 
operator and have it choose which "function" to bind.  (I don't like this 
option since it'd involve rewriting the operators.)

> Rank should not convert to StreamExecDuplicate when the input is not insert 
> only
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-34702
>                 URL: https://issues.apache.org/jira/browse/FLINK-34702
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.20.0
>            Reporter: Jacky Lau
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 2.0.0
>
>
> {code:java}
> @Test
> def testSimpleFirstRowOnBuiltinProctime1(): Unit = {
>   val sqlQuery =
>     """
>       |SELECT *
>       |FROM (
>       |  SELECT *,
>       |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as 
> rowNum
>       |  FROM (select a, count(b) as b from MyTable group by a)
>       |)
>       |WHERE rowNum = 1
>     """.stripMargin
>   util.verifyExecPlan(sqlQuery)
> } {code}
> Exception:
> org.apache.flink.table.api.TableException: StreamPhysicalDeduplicate doesn't 
> support consuming update changes which is produced by node 
> GroupAggregate(groupBy=[a], select=[a, COUNT(b) AS b])
> because the StreamPhysicalDeduplicate can not consuming update changes now 
> while StreamExecRank can.
> so we should not convert the FlinkLogicalRank to StreamPhysicalDeduplicate in 
> this case. and we can defer whether input contains update change in the 
> "optimize the physical plan" phase. 
> so we can add an option to solve it. and when the StreamPhysicalDeduplicate 
> can support consuming update changes , we can deprecate it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to