+1 for this useful optimization!

I have a question about the new optoin, do we really need two broadcast
join thresholds? IIUC, this adaptive broadcast join is a complement to
compile-time optimization, there is no need for the user to configure two
different thresholds (not the off represented by -1), so we just want to
control the adaptive optimization itself, should we provide a configuration
option like 'table.optimizer.adaptive-join.enabled' or a more general one
'table.optimizer.adaptive-optimization.enabled' for such related
optimizations?


Best,
Lincoln Lee


Ron Liu <ron9....@gmail.com> 于2024年7月26日周五 11:59写道:

> Hi, Xia
>
> Thanks for your reply. It looks good to me.
>
>
> Best,
> Ron
>
> Xia Sun <xingbe...@gmail.com> 于2024年7月26日周五 10:49写道:
>
> > Hi Ron,
> >
> > Thanks for your feedback!
> >
> > -> creation of the join operators until runtime
> >
> >
> > That means when creating the AdaptiveJoinOperatorFactory, we will not
> > immediately create the JoinOperator. Instead, we only pass in the
> necessary
> > parameters for creating the JoinOperator. The appropriate JoinOperator
> will
> > be created during the StreamGraphOptimizationStrategy optimization phase.
> >
> > You mentioned that the runtime's visibility into the table planner is
> > indeed an issue. It includes two aspects,
> > (1) we plan to place both implementations of the
> > AdaptiveBroadcastJoinOptimizationStrategy and AdaptiveJoinOperatorFactory
> > in the table layer. During the runtime phase, we will obtain the
> > AdaptiveBroadcastJoinOptimizationStrategy through class loading.
> Therefore,
> > the flink-runtime does not need to be aware of the table layer's
> > implementation.
> > (2) Since the dynamic codegen in the AdaptiveJoinOperatorFactory needs to
> > be aware of the table planner, we will consider placing the
> > AdaptiveJoinOperatorFactory in the table planner module as well.
> >
> >
> >  -> When did you configure these optimization strategies uniformly into
> > > `execution.batch.adaptive.stream-graph-optimization.strategies`
> >
> >
> > Thank you for pointing out this issue. When there are multiple
> > StreamGraphOptimizationStrategies, the optimization order at the runtime
> > phase will strictly follow the order specified in the configuration
> option
> > `execution.batch.adaptive.stream-graph-optimization.strategies`.
> Therefore,
> > it is necessary to have a unified configuration during the sql planner
> > phase to ensure the correct optimization order. Currently, we are
> > considering performing this unified configuration in
> > BatchPlanner#afterTranslation().
> >
> > For simplicity, as long as the adaptive broadcast join/skewed join
> > optimization features are enabled (e.g.,
> > `table.optimizer.adaptive.join.broadcast-threshold` is not -1), the
> > corresponding strategy will be configured. This optimization is
> independent
> > of the specific SQL query, although it might not produce any actual
> effect.
> >
> > Best,
> > Xia
> >
> > Ron Liu <ron9....@gmail.com> 于2024年7月24日周三 14:10写道:
> >
> > > Hi, Xia
> > >
> > > This FLIP looks good to me, +1.
> > >
> > > I've two questions:
> > >
> > > 1.
> > > >> Accordingly, in terms of implementation, we will delay the codegen
> and
> > > creation of the join operators until runtime.
> > >
> > > How are you delaying codegen to runtime, the current runtime is not SQL
> > > planner aware. in other words, how do I understand this sentence?
> > >
> > > 2. FLIP-469 mentions passing StreamGraphOptimizationStrategy to runtime
> > via
> > > option `execution.batch.adaptive.stream-graph-optimization.strategies`.
> > In
> > > SQL planner if you have multiple different optimization strategies like
> > > broadcast join, skew join, etc...  When did you configure these
> > > optimization strategies uniformly into
> > > `execution.batch.adaptive.stream-graph-optimization.strategies`?
> > >
> > >
> > >
> > > Zhu Zhu <reed...@gmail.com> 于2024年7月19日周五 17:41写道:
> > >
> > > > +1 for the FLIP
> > > >
> > > > It's a good start to adaptively optimize the logical execution plan
> > with
> > > > runtime information.
> > > >
> > > > Thanks,
> > > > Zhu
> > > >
> > > > Xia Sun <xingbe...@gmail.com> 于2024年7月18日周四 18:23写道:
> > > >
> > > > > Hi devs,
> > > > >
> > > > > Junrui Lee, Lei Yang, and I would like to initiate a discussion
> about
> > > > > FLIP-470: Support Adaptive Broadcast Join[1].
> > > > >
> > > > > In general, Broadcast Hash Join is currently the most efficient
> join
> > > > > strategy available in Flink. However, its prerequisite is that the
> > > input
> > > > > data on one side must be sufficiently small; otherwise, it may lead
> > to
> > > > > memory overuse or other issues. Currently, due to the lack of
> precise
> > > > > statistics, it is difficult to make accurate estimations during the
> > > Flink
> > > > > SQL Planning phase. For example, when an upstream Filter operator
> is
> > > > > present, it is easy to overestimate the size of the table, whereas
> > with
> > > > > an expansion operator, the table size tends to be underestimated.
> > > > Moreover,
> > > > > once the join operator is determined, it cannot be modified at
> > runtime.
> > > > >
> > > > > To address this issue, we plan to introduce Adaptive Broadcast Join
> > > > > capability based on FLIP-468: Introducing StreamGraph-Based Job
> > > > > Submission[2]
> > > > > and FLIP-469: Supports Adaptive Optimization of StreamGraph[3].
> This
> > > will
> > > > > allow the join operator to be dynamically optimized to Broadcast
> Join
> > > > based
> > > > > on the actual input data volume at runtime and fallback when the
> > > > > optimization
> > > > > conditions are not met.
> > > > >
> > > > > For more details, please refer to FLIP-470[1]. We look forward to
> > your
> > > > > feedback.
> > > > >
> > > > > Best,
> > > > > Junrui Lee, Lei Yang and Xia Sun
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-470%3A+Support+Adaptive+Broadcast+Join
> > > > > [2]
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-468%3A+Introducing+StreamGraph-Based+Job+Submission
> > > > > [3]
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+Supports+Adaptive+Optimization+of+StreamGraph
> > > > >
> > > >
> > >
> >
>

Reply via email to