Hi devs,

I want to discuss about a new way to optimize the multi-sink query in Flink.

Currently, Flink use the RelNodeBlock mechanism to do optimization for the
mutl-sink query.

It has following steps:


   1. Multi-sink query will be parsed and validated to multi RelNode tree
   2. Merge the multi RelNode's common node into a single node if
   table.optimizer.reuse-optimize-block-with-digest-enabledis enabled
   3. Split the multi RelNode tree to multi RelNodeBlock.
   4. Feed the each RelNodeBlock to Calcite Planner to do the optimization
   5. Reconstruct back to the original structure with optimized RelNodeBlock

As far as I know (Please correct me if I'm wrong). The main purpose of
RelNodeBlock is doing the following two thing:


   - Calcite not support DAG optimization, So the RelNodeBlock can split
   the multi-tree to much single tree, then we can leverage calcite to do the
   optimization
   - In the Multi-sink query, we need to reduce the repeat calculation of
   the same node. So, if
   table.optimizer.reuse-optimize-block-with-digest-enabledis enabled, we
   can preserve the common node from being optimized to different results and
   lead to the repeat calculation

However, In our production, We found the ability of the RelNodeBlock
optimization is not enough. As shown in CommonSubGraphBasedOptimizer's
comments: The optimization of the RelNodeBlock is local optimization. There
are no optimization way between the RelNodeBlock. Take a simple example:

Sink                    Sink

|                             |

Project(a,b)        Project (a,b,c)

|                             |

Scan (a,b,c,d,e)  Scan (a,b,c,d,e)

It scan from the same table, In the current optimization, we can only
choose whether or not merge the Scan to a RelNodeBlock before optimization.

If merged, the Scan can not leverage the optimization of ProjectPushDown
and so on.

If not merged, during the optimization, two RelNodeBlock will generated two
different scan {a, b} and {a,b,c}.

So I'm proposing a new way to improve the CTE optimization of the
multi-query (or single query).


   1. Insert a VirtualMultiSink to pack the sink nodes together. described
   in [2]. Which is inspired by the [3]
   2. Insert a new Spool node (which is means produced once, consumed multi
   times) to the RelNode who has multi output.
   3. Implementing several rules around the Spool node


   1. PushProjectToSpool to pass away the unused the fields from all the
      Spool node's parents
      2. PushFilterToSpool to push down the DNF conditions of all the Spool
      node's parents
      3. ...


   1. Further more, we can implement the rule to discard the Spool, then
   let the Planner to decide whether to reuse or not based on the cost of each.
   2. After the physcial rewrite, we can remove the
   PhysicalVirtualMultiSink and the Spool node.

The benefits of the new way is:

   1. It can do the optimization in a single tree, So the local
   optimization can be avoid
   2. The cost based CTE optimization is available in the new solution.
   3. The new solution can optimize for the CTE in both multi-query and
   single-query, So the problem of [1] can also be resolved
   4. Avoid the trait propagate between the RelNodeBlocks in the current
   solution. as described in [4]

Looking forward to your inputs.

Best,

Aitozi.

[1]: https://issues.apache.org/jira/projects/FLINK/issues/FLINK-29088

[2]: https://issues.apache.org/jira/projects/FLINK/issues/FLINK-31205

[3]: Efficient and Extensible Algorithms for Multi Query Optimization
https://arxiv.org/pdf/cs/9910021.pdf

[4]: https://issues.apache.org/jira/browse/FLINK-24048

Reply via email to