Thanks Xia for your explanation! I can understand your concern, but considering the design of this FLIP, which already covers compile-time inaccurate optimization for runtime de-optimization, is it necessary to make the user manually turn off 'table.optimizer.join.broadcast-threshold' and set the new 'table.optimizer.adaptive.join.broadcast-threshold' again? Another option is that users only need to focus on the existing broadcast size threshold, and accept the reality that 100% accurate optimization cannot be done at compile time, and adopt the new capability of dynamic optimization at runtime, and ultimately, users will trust that flink will always optimize accurately, and from this point of view, I would prefer a generic parameter 'table.optimizer. adaptive-optimization.enabled', which would allow for more dynamic optimization in the future, not limited to broadcast join scenarios and will not continuously bring more new options, WDYT?
Best, Lincoln Lee Xia Sun <xingbe...@gmail.com> 于2024年7月30日周二 11:27写道: > Hi Lincoln, > > Thank you for your input and participation in the discussion! > > Compared to introducing the 'table.optimizer.adaptive-join.enabled' option, > introducing the "table.optimizer.adaptive.join.broadcast-threshold" can > also cover the need to disable static broadcast optimization while only > enabling dynamic broadcast optimization. From this perspective, introducing > a new threshold configuration might be more appropriate. What do you think? > > Best, > Xia > > Lincoln Lee <lincoln.8...@gmail.com> 于2024年7月29日周一 23:12写道: > > > +1 for this useful optimization! > > > > I have a question about the new optoin, do we really need two broadcast > > join thresholds? IIUC, this adaptive broadcast join is a complement to > > compile-time optimization, there is no need for the user to configure two > > different thresholds (not the off represented by -1), so we just want to > > control the adaptive optimization itself, should we provide a > configuration > > option like 'table.optimizer.adaptive-join.enabled' or a more general one > > 'table.optimizer.adaptive-optimization.enabled' for such related > > optimizations? > > > > > > Best, > > Lincoln Lee > > > > > > Ron Liu <ron9....@gmail.com> 于2024年7月26日周五 11:59写道: > > > > > Hi, Xia > > > > > > Thanks for your reply. It looks good to me. > > > > > > > > > Best, > > > Ron > > > > > > Xia Sun <xingbe...@gmail.com> 于2024年7月26日周五 10:49写道: > > > > > > > Hi Ron, > > > > > > > > Thanks for your feedback! > > > > > > > > -> creation of the join operators until runtime > > > > > > > > > > > > That means when creating the AdaptiveJoinOperatorFactory, we will not > > > > immediately create the JoinOperator. Instead, we only pass in the > > > necessary > > > > parameters for creating the JoinOperator. The appropriate > JoinOperator > > > will > > > > be created during the StreamGraphOptimizationStrategy optimization > > phase. > > > > > > > > You mentioned that the runtime's visibility into the table planner is > > > > indeed an issue. It includes two aspects, > > > > (1) we plan to place both implementations of the > > > > AdaptiveBroadcastJoinOptimizationStrategy and > > AdaptiveJoinOperatorFactory > > > > in the table layer. During the runtime phase, we will obtain the > > > > AdaptiveBroadcastJoinOptimizationStrategy through class loading. > > > Therefore, > > > > the flink-runtime does not need to be aware of the table layer's > > > > implementation. > > > > (2) Since the dynamic codegen in the AdaptiveJoinOperatorFactory > needs > > to > > > > be aware of the table planner, we will consider placing the > > > > AdaptiveJoinOperatorFactory in the table planner module as well. > > > > > > > > > > > > -> When did you configure these optimization strategies uniformly > into > > > > > `execution.batch.adaptive.stream-graph-optimization.strategies` > > > > > > > > > > > > Thank you for pointing out this issue. When there are multiple > > > > StreamGraphOptimizationStrategies, the optimization order at the > > runtime > > > > phase will strictly follow the order specified in the configuration > > > option > > > > `execution.batch.adaptive.stream-graph-optimization.strategies`. > > > Therefore, > > > > it is necessary to have a unified configuration during the sql > planner > > > > phase to ensure the correct optimization order. Currently, we are > > > > considering performing this unified configuration in > > > > BatchPlanner#afterTranslation(). > > > > > > > > For simplicity, as long as the adaptive broadcast join/skewed join > > > > optimization features are enabled (e.g., > > > > `table.optimizer.adaptive.join.broadcast-threshold` is not -1), the > > > > corresponding strategy will be configured. This optimization is > > > independent > > > > of the specific SQL query, although it might not produce any actual > > > effect. > > > > > > > > Best, > > > > Xia > > > > > > > > Ron Liu <ron9....@gmail.com> 于2024年7月24日周三 14:10写道: > > > > > > > > > Hi, Xia > > > > > > > > > > This FLIP looks good to me, +1. > > > > > > > > > > I've two questions: > > > > > > > > > > 1. > > > > > >> Accordingly, in terms of implementation, we will delay the > codegen > > > and > > > > > creation of the join operators until runtime. > > > > > > > > > > How are you delaying codegen to runtime, the current runtime is not > > SQL > > > > > planner aware. in other words, how do I understand this sentence? > > > > > > > > > > 2. FLIP-469 mentions passing StreamGraphOptimizationStrategy to > > runtime > > > > via > > > > > option > > `execution.batch.adaptive.stream-graph-optimization.strategies`. > > > > In > > > > > SQL planner if you have multiple different optimization strategies > > like > > > > > broadcast join, skew join, etc... When did you configure these > > > > > optimization strategies uniformly into > > > > > `execution.batch.adaptive.stream-graph-optimization.strategies`? > > > > > > > > > > > > > > > > > > > > Zhu Zhu <reed...@gmail.com> 于2024年7月19日周五 17:41写道: > > > > > > > > > > > +1 for the FLIP > > > > > > > > > > > > It's a good start to adaptively optimize the logical execution > plan > > > > with > > > > > > runtime information. > > > > > > > > > > > > Thanks, > > > > > > Zhu > > > > > > > > > > > > Xia Sun <xingbe...@gmail.com> 于2024年7月18日周四 18:23写道: > > > > > > > > > > > > > Hi devs, > > > > > > > > > > > > > > Junrui Lee, Lei Yang, and I would like to initiate a discussion > > > about > > > > > > > FLIP-470: Support Adaptive Broadcast Join[1]. > > > > > > > > > > > > > > In general, Broadcast Hash Join is currently the most efficient > > > join > > > > > > > strategy available in Flink. However, its prerequisite is that > > the > > > > > input > > > > > > > data on one side must be sufficiently small; otherwise, it may > > lead > > > > to > > > > > > > memory overuse or other issues. Currently, due to the lack of > > > precise > > > > > > > statistics, it is difficult to make accurate estimations during > > the > > > > > Flink > > > > > > > SQL Planning phase. For example, when an upstream Filter > operator > > > is > > > > > > > present, it is easy to overestimate the size of the table, > > whereas > > > > with > > > > > > > an expansion operator, the table size tends to be > underestimated. > > > > > > Moreover, > > > > > > > once the join operator is determined, it cannot be modified at > > > > runtime. > > > > > > > > > > > > > > To address this issue, we plan to introduce Adaptive Broadcast > > Join > > > > > > > capability based on FLIP-468: Introducing StreamGraph-Based Job > > > > > > > Submission[2] > > > > > > > and FLIP-469: Supports Adaptive Optimization of StreamGraph[3]. > > > This > > > > > will > > > > > > > allow the join operator to be dynamically optimized to > Broadcast > > > Join > > > > > > based > > > > > > > on the actual input data volume at runtime and fallback when > the > > > > > > > optimization > > > > > > > conditions are not met. > > > > > > > > > > > > > > For more details, please refer to FLIP-470[1]. We look forward > to > > > > your > > > > > > > feedback. > > > > > > > > > > > > > > Best, > > > > > > > Junrui Lee, Lei Yang and Xia Sun > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-470%3A+Support+Adaptive+Broadcast+Join > > > > > > > [2] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-468%3A+Introducing+StreamGraph-Based+Job+Submission > > > > > > > [3] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+Supports+Adaptive+Optimization+of+StreamGraph > > > > > > > > > > > > > > > > > > > > > > > > > > > >