Hi, Xia Thanks for your reply. It looks good to me.
Best, Ron Xia Sun <xingbe...@gmail.com> 于2024年7月26日周五 10:49写道: > Hi Ron, > > Thanks for your feedback! > > -> creation of the join operators until runtime > > > That means when creating the AdaptiveJoinOperatorFactory, we will not > immediately create the JoinOperator. Instead, we only pass in the necessary > parameters for creating the JoinOperator. The appropriate JoinOperator will > be created during the StreamGraphOptimizationStrategy optimization phase. > > You mentioned that the runtime's visibility into the table planner is > indeed an issue. It includes two aspects, > (1) we plan to place both implementations of the > AdaptiveBroadcastJoinOptimizationStrategy and AdaptiveJoinOperatorFactory > in the table layer. During the runtime phase, we will obtain the > AdaptiveBroadcastJoinOptimizationStrategy through class loading. Therefore, > the flink-runtime does not need to be aware of the table layer's > implementation. > (2) Since the dynamic codegen in the AdaptiveJoinOperatorFactory needs to > be aware of the table planner, we will consider placing the > AdaptiveJoinOperatorFactory in the table planner module as well. > > > -> When did you configure these optimization strategies uniformly into > > `execution.batch.adaptive.stream-graph-optimization.strategies` > > > Thank you for pointing out this issue. When there are multiple > StreamGraphOptimizationStrategies, the optimization order at the runtime > phase will strictly follow the order specified in the configuration option > `execution.batch.adaptive.stream-graph-optimization.strategies`. Therefore, > it is necessary to have a unified configuration during the sql planner > phase to ensure the correct optimization order. Currently, we are > considering performing this unified configuration in > BatchPlanner#afterTranslation(). > > For simplicity, as long as the adaptive broadcast join/skewed join > optimization features are enabled (e.g., > `table.optimizer.adaptive.join.broadcast-threshold` is not -1), the > corresponding strategy will be configured. This optimization is independent > of the specific SQL query, although it might not produce any actual effect. > > Best, > Xia > > Ron Liu <ron9....@gmail.com> 于2024年7月24日周三 14:10写道: > > > Hi, Xia > > > > This FLIP looks good to me, +1. > > > > I've two questions: > > > > 1. > > >> Accordingly, in terms of implementation, we will delay the codegen and > > creation of the join operators until runtime. > > > > How are you delaying codegen to runtime, the current runtime is not SQL > > planner aware. in other words, how do I understand this sentence? > > > > 2. FLIP-469 mentions passing StreamGraphOptimizationStrategy to runtime > via > > option `execution.batch.adaptive.stream-graph-optimization.strategies`. > In > > SQL planner if you have multiple different optimization strategies like > > broadcast join, skew join, etc... When did you configure these > > optimization strategies uniformly into > > `execution.batch.adaptive.stream-graph-optimization.strategies`? > > > > > > > > Zhu Zhu <reed...@gmail.com> 于2024年7月19日周五 17:41写道: > > > > > +1 for the FLIP > > > > > > It's a good start to adaptively optimize the logical execution plan > with > > > runtime information. > > > > > > Thanks, > > > Zhu > > > > > > Xia Sun <xingbe...@gmail.com> 于2024年7月18日周四 18:23写道: > > > > > > > Hi devs, > > > > > > > > Junrui Lee, Lei Yang, and I would like to initiate a discussion about > > > > FLIP-470: Support Adaptive Broadcast Join[1]. > > > > > > > > In general, Broadcast Hash Join is currently the most efficient join > > > > strategy available in Flink. However, its prerequisite is that the > > input > > > > data on one side must be sufficiently small; otherwise, it may lead > to > > > > memory overuse or other issues. Currently, due to the lack of precise > > > > statistics, it is difficult to make accurate estimations during the > > Flink > > > > SQL Planning phase. For example, when an upstream Filter operator is > > > > present, it is easy to overestimate the size of the table, whereas > with > > > > an expansion operator, the table size tends to be underestimated. > > > Moreover, > > > > once the join operator is determined, it cannot be modified at > runtime. > > > > > > > > To address this issue, we plan to introduce Adaptive Broadcast Join > > > > capability based on FLIP-468: Introducing StreamGraph-Based Job > > > > Submission[2] > > > > and FLIP-469: Supports Adaptive Optimization of StreamGraph[3]. This > > will > > > > allow the join operator to be dynamically optimized to Broadcast Join > > > based > > > > on the actual input data volume at runtime and fallback when the > > > > optimization > > > > conditions are not met. > > > > > > > > For more details, please refer to FLIP-470[1]. We look forward to > your > > > > feedback. > > > > > > > > Best, > > > > Junrui Lee, Lei Yang and Xia Sun > > > > > > > > [1] > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-470%3A+Support+Adaptive+Broadcast+Join > > > > [2] > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-468%3A+Introducing+StreamGraph-Based+Job+Submission > > > > [3] > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+Supports+Adaptive+Optimization+of+StreamGraph > > > > > > > > > >