Jacky Lau created FLINK-34665:
---------------------------------

             Summary: Add streaming rule for union to Rand and it convert to 
StreamExecDeduplicate finally
                 Key: FLINK-34665
                 URL: https://issues.apache.org/jira/browse/FLINK-34665
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.20.0
            Reporter: Jacky Lau
             Fix For: 1.20.0


The semantics of a union in SQL involves deduplication, and in Calcite, when 
converting a SQL node to a RelNode, a Distinct Aggregate is inserted above the 
Union to achieve this deduplication. In Flink, the Distinct Aggregate 
eventually gets converted into a StreamExecGroupAggregate operator. This 
operator accesses the state multiple times, and from our observations of 
numerous jobs, we can see that the stack often gets stuck at state access. This 
is because the key for the distinct aggregate is all the fields of the union, 
meaning that for the state, the key will be relatively large, and repeated 
access and comparisons to the state can be time-consuming.

In fact, a potential optimization is to add a rule to convert the Union into a 
Rank with processing time, which then ultimately gets converted into a 
StreamExecDeduplicate. Currently, we have users rewrite their SQL to use 
Row_number for deduplication, and this approach works very well. Therefore, it 
is possible to add a rule at the engine level to support this optimization.

 

and it will break the change of plan, it will cause user upgrade flink version 
failed. so i suggest add a flag.default value is not change the behavior



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to