[ 
https://issues.apache.org/jira/browse/FLINK-37161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Junrui Lee reassigned FLINK-37161:
----------------------------------

    Assignee:     (was: Lei Yang)

> Cross-team verification for "Adaptive skewed join optimization for batch jobs"
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-37161
>                 URL: https://issues.apache.org/jira/browse/FLINK-37161
>             Project: Flink
>          Issue Type: Sub-task
>            Reporter: Junrui Lee
>            Priority: Blocker
>             Fix For: 2.0.0
>
>
> In Flink 2.0, we support the capability of adaptive skewed join optimization 
> for batch jobs, which will allow the Join operator to dynamically split 
> skewed and splittable partitions based on runtime input statistics, thereby 
> mitigating the long-tail problem caused by skewed data.
> We may need the following tests:
>  # 
> Test the case where {{table.optimizer.skewed-join-optimization.strategy}} is 
> set to {{{}auto{}}}. We need to construct a simple join case with data skewed 
> on a single key (e.g., making the data of a specified join key N times larger 
> than other join keys, where N is defined by 
> {{{}table.optimizer.skewed-join-optimization.skewed-factor{}}}). And ensuring 
> the data volume for the skewed join key exceeds the skewed-threshold (defined 
> by {{{}table.optimizer.skewed-join-optimization.skewed-threshold{}}}). 
> Finally, observe whether the ratio of the maximum data volume to the median 
> data volume processed by concurrent join tasks is less than the skew factor.
>  # 
> Test the case where {{table.optimizer.skewed-join-optimization.strategy}} is 
> set to {{{}forced{}}}. Construct a skewed join instance similar to Test 1, 
> but with the following difference: the join case should be connected to a 
> downstream operator that performs hashing on the same field (e.g., hash 
> aggregation or group by). It is recommended to set different parallelisms for 
> the join operator and the downstream operator to prevent the out edge from 
> being optimized to a forward edge. Finally, observe whether the ratio of the 
> maximum data volume to the median data volume processed by concurrent join 
> tasks is less than the skew factor.
>  # 
> Test the case where 
> {{{}table.optimizer.skewed-join-optimization.strategy{}}}as none, and verify 
> that the join operator will not be optimized into an adaptive join operator 
> under any circumstances.
>  # Test the case with customized 
> {{{}table.optimizer.skewed-join-optimization.skewed-factor{}}}. We need to 
> construct a skewed join instance similar to Test 1, setting different skewed 
> factors and observing whether the ratio of the maximum data volume to the 
> median data volume processed by concurrent join tasks is less than the skew 
> factor. Note that currently, Flink can only reduce the ratio to 2.0, and 
> please ensure that the skewed-factor is greater than 2.0 during testing.
>  # Test the case with customized 
> {{{}table.optimizer.skewed-join-optimization.skewed-threshold{}}}. We need to 
> construct a skewed join instance similar to Test 1, setting different 
> skewed-threshold and observing whether the optimization is effective only 
> when the data volume processed by the skewed join instance is greater than 
> the skewed threshold.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to