Hi, Xia

Thanks for your reply. It looks good to me.


Best,
Ron

Xia Sun <xingbe...@gmail.com> 于2024年7月26日周五 10:49写道:

> Hi Ron,
>
> Thanks for your feedback!
>
> -> creation of the join operators until runtime
>
>
> That means when creating the AdaptiveJoinOperatorFactory, we will not
> immediately create the JoinOperator. Instead, we only pass in the necessary
> parameters for creating the JoinOperator. The appropriate JoinOperator will
> be created during the StreamGraphOptimizationStrategy optimization phase.
>
> You mentioned that the runtime's visibility into the table planner is
> indeed an issue. It includes two aspects,
> (1) we plan to place both implementations of the
> AdaptiveBroadcastJoinOptimizationStrategy and AdaptiveJoinOperatorFactory
> in the table layer. During the runtime phase, we will obtain the
> AdaptiveBroadcastJoinOptimizationStrategy through class loading. Therefore,
> the flink-runtime does not need to be aware of the table layer's
> implementation.
> (2) Since the dynamic codegen in the AdaptiveJoinOperatorFactory needs to
> be aware of the table planner, we will consider placing the
> AdaptiveJoinOperatorFactory in the table planner module as well.
>
>
>  -> When did you configure these optimization strategies uniformly into
> > `execution.batch.adaptive.stream-graph-optimization.strategies`
>
>
> Thank you for pointing out this issue. When there are multiple
> StreamGraphOptimizationStrategies, the optimization order at the runtime
> phase will strictly follow the order specified in the configuration option
> `execution.batch.adaptive.stream-graph-optimization.strategies`. Therefore,
> it is necessary to have a unified configuration during the sql planner
> phase to ensure the correct optimization order. Currently, we are
> considering performing this unified configuration in
> BatchPlanner#afterTranslation().
>
> For simplicity, as long as the adaptive broadcast join/skewed join
> optimization features are enabled (e.g.,
> `table.optimizer.adaptive.join.broadcast-threshold` is not -1), the
> corresponding strategy will be configured. This optimization is independent
> of the specific SQL query, although it might not produce any actual effect.
>
> Best,
> Xia
>
> Ron Liu <ron9....@gmail.com> 于2024年7月24日周三 14:10写道:
>
> > Hi, Xia
> >
> > This FLIP looks good to me, +1.
> >
> > I've two questions:
> >
> > 1.
> > >> Accordingly, in terms of implementation, we will delay the codegen and
> > creation of the join operators until runtime.
> >
> > How are you delaying codegen to runtime, the current runtime is not SQL
> > planner aware. in other words, how do I understand this sentence?
> >
> > 2. FLIP-469 mentions passing StreamGraphOptimizationStrategy to runtime
> via
> > option `execution.batch.adaptive.stream-graph-optimization.strategies`.
> In
> > SQL planner if you have multiple different optimization strategies like
> > broadcast join, skew join, etc...  When did you configure these
> > optimization strategies uniformly into
> > `execution.batch.adaptive.stream-graph-optimization.strategies`?
> >
> >
> >
> > Zhu Zhu <reed...@gmail.com> 于2024年7月19日周五 17:41写道:
> >
> > > +1 for the FLIP
> > >
> > > It's a good start to adaptively optimize the logical execution plan
> with
> > > runtime information.
> > >
> > > Thanks,
> > > Zhu
> > >
> > > Xia Sun <xingbe...@gmail.com> 于2024年7月18日周四 18:23写道:
> > >
> > > > Hi devs,
> > > >
> > > > Junrui Lee, Lei Yang, and I would like to initiate a discussion about
> > > > FLIP-470: Support Adaptive Broadcast Join[1].
> > > >
> > > > In general, Broadcast Hash Join is currently the most efficient join
> > > > strategy available in Flink. However, its prerequisite is that the
> > input
> > > > data on one side must be sufficiently small; otherwise, it may lead
> to
> > > > memory overuse or other issues. Currently, due to the lack of precise
> > > > statistics, it is difficult to make accurate estimations during the
> > Flink
> > > > SQL Planning phase. For example, when an upstream Filter operator is
> > > > present, it is easy to overestimate the size of the table, whereas
> with
> > > > an expansion operator, the table size tends to be underestimated.
> > > Moreover,
> > > > once the join operator is determined, it cannot be modified at
> runtime.
> > > >
> > > > To address this issue, we plan to introduce Adaptive Broadcast Join
> > > > capability based on FLIP-468: Introducing StreamGraph-Based Job
> > > > Submission[2]
> > > > and FLIP-469: Supports Adaptive Optimization of StreamGraph[3]. This
> > will
> > > > allow the join operator to be dynamically optimized to Broadcast Join
> > > based
> > > > on the actual input data volume at runtime and fallback when the
> > > > optimization
> > > > conditions are not met.
> > > >
> > > > For more details, please refer to FLIP-470[1]. We look forward to
> your
> > > > feedback.
> > > >
> > > > Best,
> > > > Junrui Lee, Lei Yang and Xia Sun
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-470%3A+Support+Adaptive+Broadcast+Join
> > > > [2]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-468%3A+Introducing+StreamGraph-Based+Job+Submission
> > > > [3]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+Supports+Adaptive+Optimization+of+StreamGraph
> > > >
> > >
> >
>

Reply via email to