Jacky Lau created FLINK-34665: --------------------------------- Summary: Add streaming rule for union to Rand and it convert to StreamExecDeduplicate finally Key: FLINK-34665 URL: https://issues.apache.org/jira/browse/FLINK-34665 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.20.0 Reporter: Jacky Lau Fix For: 1.20.0
The semantics of a union in SQL involves deduplication, and in Calcite, when converting a SQL node to a RelNode, a Distinct Aggregate is inserted above the Union to achieve this deduplication. In Flink, the Distinct Aggregate eventually gets converted into a StreamExecGroupAggregate operator. This operator accesses the state multiple times, and from our observations of numerous jobs, we can see that the stack often gets stuck at state access. This is because the key for the distinct aggregate is all the fields of the union, meaning that for the state, the key will be relatively large, and repeated access and comparisons to the state can be time-consuming. In fact, a potential optimization is to add a rule to convert the Union into a Rank with processing time, which then ultimately gets converted into a StreamExecDeduplicate. Currently, we have users rewrite their SQL to use Row_number for deduplication, and this approach works very well. Therefore, it is possible to add a rule at the engine level to support this optimization. and it will break the change of plan, it will cause user upgrade flink version failed. so i suggest add a flag.default value is not change the behavior -- This message was sent by Atlassian Jira (v8.20.10#820010)