Hi devs, I want to discuss about a new way to optimize the multi-sink query in Flink.
Currently, Flink use the RelNodeBlock mechanism to do optimization for the mutl-sink query. It has following steps: 1. Multi-sink query will be parsed and validated to multi RelNode tree 2. Merge the multi RelNode's common node into a single node if table.optimizer.reuse-optimize-block-with-digest-enabledis enabled 3. Split the multi RelNode tree to multi RelNodeBlock. 4. Feed the each RelNodeBlock to Calcite Planner to do the optimization 5. Reconstruct back to the original structure with optimized RelNodeBlock As far as I know (Please correct me if I'm wrong). The main purpose of RelNodeBlock is doing the following two thing: - Calcite not support DAG optimization, So the RelNodeBlock can split the multi-tree to much single tree, then we can leverage calcite to do the optimization - In the Multi-sink query, we need to reduce the repeat calculation of the same node. So, if table.optimizer.reuse-optimize-block-with-digest-enabledis enabled, we can preserve the common node from being optimized to different results and lead to the repeat calculation However, In our production, We found the ability of the RelNodeBlock optimization is not enough. As shown in CommonSubGraphBasedOptimizer's comments: The optimization of the RelNodeBlock is local optimization. There are no optimization way between the RelNodeBlock. Take a simple example: Sink Sink | | Project(a,b) Project (a,b,c) | | Scan (a,b,c,d,e) Scan (a,b,c,d,e) It scan from the same table, In the current optimization, we can only choose whether or not merge the Scan to a RelNodeBlock before optimization. If merged, the Scan can not leverage the optimization of ProjectPushDown and so on. If not merged, during the optimization, two RelNodeBlock will generated two different scan {a, b} and {a,b,c}. So I'm proposing a new way to improve the CTE optimization of the multi-query (or single query). 1. Insert a VirtualMultiSink to pack the sink nodes together. described in [2]. Which is inspired by the [3] 2. Insert a new Spool node (which is means produced once, consumed multi times) to the RelNode who has multi output. 3. Implementing several rules around the Spool node 1. PushProjectToSpool to pass away the unused the fields from all the Spool node's parents 2. PushFilterToSpool to push down the DNF conditions of all the Spool node's parents 3. ... 1. Further more, we can implement the rule to discard the Spool, then let the Planner to decide whether to reuse or not based on the cost of each. 2. After the physcial rewrite, we can remove the PhysicalVirtualMultiSink and the Spool node. The benefits of the new way is: 1. It can do the optimization in a single tree, So the local optimization can be avoid 2. The cost based CTE optimization is available in the new solution. 3. The new solution can optimize for the CTE in both multi-query and single-query, So the problem of [1] can also be resolved 4. Avoid the trait propagate between the RelNodeBlocks in the current solution. as described in [4] Looking forward to your inputs. Best, Aitozi. [1]: https://issues.apache.org/jira/projects/FLINK/issues/FLINK-29088 [2]: https://issues.apache.org/jira/projects/FLINK/issues/FLINK-31205 [3]: Efficient and Extensible Algorithms for Multi Query Optimization https://arxiv.org/pdf/cs/9910021.pdf [4]: https://issues.apache.org/jira/browse/FLINK-24048