Hi Lincoln Lee,
Thanks for your feedback!
For the 1st question, thank you for the reminder. This optimization is only
available for Table jobs in batch mode, and I have put these new options
into table module. I also replaced the "enable" and "force" configurations
with a new enum type configuration to avoid confusing. The new enum type
configuration has three values: "AUTO" means Flink will automatically apply
this optimization, "FORCED" means Flink will enforce this optimization even
if it introduces extra hash shuffle, and "NONE" means this optimization
will not be executed.
For 2nd question, the key group size (or median size) is calculated based
on statistical information from the upstream output and is used to
determine data skewness, so users do not need to know the specific values.
I specifically mentioned "the median key group size" just because I chose
to use it to represent the central tendency of data processing volumes
across all parallel instances.
For 3rd question, after my confirmation, this limitation also exists in
batch mode. Therefore, IntraInputKeyGroupCorrelation and
InterInputsKeyGroupCorrelation are necessary. I need them to determine
whether and how to split the skewed key group to ensure data correctness.
Additionally, adding these two correlations has other benefits: other
optimization strategies can also modify them to flexibly choose the data
distribution algorithm based on the operator’s specific situation.

Best, Lei Yang

Lincoln Lee <lincoln.8...@gmail.com> 于2024年8月29日周四 23:13写道:


Lincoln Lee <lincoln.8...@gmail.com> 于2024年8月29日周四 23:13写道:

> Thanks for bringing up this! It would be a useful feature for batch users.
>
> For the FLIP, I have some questions:
>
> 1st, the implementation plan is to rewrite the optimization based on the
> execnode of the table planner, but the config option for the optimization
> is under flink-core module, does it mean this optimization is available for
> datastream jobs as well? (I didn't see the details in the FLIP)
> If doesn't, my suggestion is to put these new options into table module.
>
> 2nd, the FLIP performs parameter control and optimization based on the
> size of the key group, how can users perceive the specific key group size
> (or the median key group size) from the job information provided by flink?
>
> 3rd, IIUC, the following limitation in the FLIP exists only for streaming
> executions. So, is the new IntraInputKeyGroupCorrelation /
> InterInputsKeyGroupCorrelation mentioned in FLIP still necessary?
> > “The existing data distribution algorithms in Flink impose strict
> limitations on joins, requiring that data within the same key group
> must be sent to the same downstream for processing. This restricts
> the adaptability of data distribution.”
>
>
> Best,
> Lincoln Lee
>
>
> Zhu Zhu <reed...@gmail.com> 于2024年8月19日周一 16:50写道:
>
> > +1 for the FLIP
> >
> > Long-tail tasks caused by skewed data usually pose significant
> > challenges for users. It's great that Flink can mitigate such
> > issues automatically.
> >
> > Thanks,
> > Zhu
> >
> > Lei Yang <leya5...@gmail.com> 于2024年8月16日周五 11:18写道:
> >
> > > Hi devs,
> > >
> > >
> > > Junrui Lee, Xia Sun and I would like to initiate a discussion about
> > > FLIP-475: Support Adaptive Skewed Join Optimization [1].
> > >
> > >
> > > In a Join query, when certain keys occur frequently, it can lead to an
> > > uneven distribution of data across partitions. This may affect the
> > > execution performance of Flink jobs, as a single partition with skewed
> > data
> > > can severely downgrade the performance of the entire job. To ensure
> data
> > is
> > > evenly distributed to downstream tasks, we can use the statistics of
> the
> > > input to split (and duplicate if needed) skewed and splittable
> partitions
> > > into balanced partitions at runtime. However, currently, Flink is
> unable
> > to
> > > accurately determine which partitions are skewed and eligible for
> > splitting
> > > at runtime, and it also lacks the capability to split data within the
> > same
> > > key group.
> > >
> > >
> > > To address this issue, we plan to introduce Adaptive Skewed Join
> > > Optimization capability. This will allow the Join operator to
> dynamically
> > > split partitions that are skewed and splittable based on the statistics
> > of
> > > the input at runtime, reducing the long-tail problem caused by skewed
> > data.
> > > This FLIP is based on FLIP-469 [2] and also leverages capabilities
> > > introduced in FLIP-470 [3].
> > >
> > >
> > > For more details, please refer to FLIP-475 [1]. We look forward to your
> > > feedback.
> > >
> > >
> > > Best,
> > >
> > >
> > > Junrui Lee, Xia Sun and Lei Yang
> > >
> > >
> > > [1]
> > >
> > > *
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-475%3A+Support+Adaptive+Skewed+Join+Optimization
> > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-475%3A+Support+Adaptive+Skewed+Join+Optimization
> > > >*
> > >
> > > [2]
> > >
> > > *
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+Supports+Adaptive+Optimization+of+StreamGraph
> > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+Supports+Adaptive+Optimization+of+StreamGraph
> > > >*
> > >
> > > [3]
> > >
> > > *
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-470%3A+Support+Adaptive+Broadcast+Join
> > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-470%3A+Support+Adaptive+Broadcast+Join
> > > >*
> > >
> >
>

Reply via email to