[ 
https://issues.apache.org/jira/browse/FLINK-37161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Junrui Lee updated FLINK-37161:
-------------------------------
    Description: 
In Flink 2.0, we support the capability of adaptive skewed join optimization 
for batch jobs, which will allow the Join operator to dynamically split skewed 
and splittable partitions based on runtime input statistics, thereby mitigating 
the long-tail problem caused by skewed data.

We may need the following tests:
 # 
Test the case where {{table.optimizer.skewed-join-optimization.strategy}} is 
set to {{{}auto{}}}. We need to construct a simple join case with data skewed 
on a single key (e.g., making the data of a specified join key N times larger 
than other join keys, where N is defined by 
{{{}table.optimizer.skewed-join-optimization.skewed-factor{}}}). And ensuring 
the data volume for the skewed join key exceeds the skewed-threshold (defined 
by {{{}table.optimizer.skewed-join-optimization.skewed-threshold{}}}). Finally, 
observe whether the ratio of the maximum data volume to the median data volume 
processed by concurrent join tasks is less than the skew factor.
 # 
Test the case where {{table.optimizer.skewed-join-optimization.strategy}} is 
set to {{{}forced{}}}. Construct a skewed join instance similar to Test 1, but 
with the following difference: the join case should be connected to a 
downstream operator that performs hashing on the same field (e.g., hash 
aggregation or group by). It is recommended to set different parallelisms for 
the join operator and the downstream operator to prevent the out edge from 
being optimized to a forward edge. Finally, observe whether the ratio of the 
maximum data volume to the median data volume processed by concurrent join 
tasks is less than the skew factor.
 # 
Test the case where {{{}table.optimizer.skewed-join-optimization.strategy{}}}as 
none, and verify that the join operator will not be optimized into an adaptive 
join operator under any circumstances.
 # Test the case with customized 
{{{}table.optimizer.skewed-join-optimization.skewed-factor{}}}. We need to 
construct a skewed join instance similar to Test 1, setting different skewed 
factors and observing whether the ratio of the maximum data volume to the 
median data volume processed by concurrent join tasks is less than the skew 
factor. Note that currently, Flink can only reduce the ratio to 2.0, and please 
ensure that the skewed-factor is greater than 2.0 during testing.
 # Test the case with customized 
{{{}table.optimizer.skewed-join-optimization.skewed-threshold{}}}. We need to 
construct a skewed join instance similar to Test 1, setting different 
skewed-threshold and observing whether the optimization is effective only when 
the data volume processed by the skewed join instance is greater than the 
skewed threshold.

 

  was:
In Flink 2.0, we support the capability of adaptive skewed join optimization 
for batch jobs, which will allow the Join operator to dynamically split skewed 
and splittable partitions based on runtime input statistics, thereby mitigating 
the long-tail problem caused by skewed data.

We may need the following tests:
 # 
Test the case where {{table.optimizer.skewed-join-optimization.strategy}} is 
set to {{{}auto{}}}. We need to construct a simple join case with data skewed 
on a single key (e.g., making the data of a specified join key N times larger 
than other join keys, where N is defined by 
{{{}table.optimizer.skewed-join-optimization.skewed-factor{}}}). And ensuring 
the data volume for the skewed join key exceeds the skewed-threshold (defined 
by {{{}table.optimizer.skewed-join-optimization.skewed-threshold{}}}). Finally, 
observe whether the ratio of the maximum data volume to the median data volume 
processed by concurrent join tasks is less than the skew factor.
 # 
Test the case where {{table.optimizer.skewed-join-optimization.strategy}} is 
set to {{{}forced{}}}. Construct a skewed join instance similar to Test 1, but 
with the following difference: the join case should be connected to a 
downstream operator that performs hashing on the same field (e.g., hash 
aggregation or group by). It is recommended to set different parallelisms for 
the join operator and the downstream operator to prevent the out edge from 
being optimized to a forward edge. Finally, observe whether the ratio of the 
maximum data volume to the median data volume processed by concurrent join 
tasks is less than the skew factor.
 # 
Test the case where {{{}table.optimizer.skewed-join-optimization.strategy{}}}as 
none, and verify that the join operator will not be optimized into an adaptive 
join operator under any circumstances.

 # 
Test the case with customized 
{{{}table.optimizer.skewed-join-optimization.skewed-factor{}}}. We need to 
construct a skewed join instance similar to Test 1, setting different skewed 
factors and observing whether the ratio of the maximum data volume to the 
median data volume processed by concurrent join tasks is less than the skew 
factor. Note that currently, Flink can only reduce the ratio to 2.0, and please 
ensure that the skewed-factor is greater than 2.0 during testing.
 # 
Test the case with customized 
{{{}table.optimizer.skewed-join-optimization.skewed-threshold{}}}. We need to 
construct a skewed join instance similar to Test 1, setting different 
skewed-threshold and observing whether the optimization is effective only when 
the data volume processed by the skewed join instance is greater than the 
skewed threshold.

 


> Cross-team verification for "Adaptive skewed join optimization for batch jobs"
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-37161
>                 URL: https://issues.apache.org/jira/browse/FLINK-37161
>             Project: Flink
>          Issue Type: Sub-task
>            Reporter: Junrui Lee
>            Assignee: Lei Yang
>            Priority: Blocker
>             Fix For: 2.0.0
>
>
> In Flink 2.0, we support the capability of adaptive skewed join optimization 
> for batch jobs, which will allow the Join operator to dynamically split 
> skewed and splittable partitions based on runtime input statistics, thereby 
> mitigating the long-tail problem caused by skewed data.
> We may need the following tests:
>  # 
> Test the case where {{table.optimizer.skewed-join-optimization.strategy}} is 
> set to {{{}auto{}}}. We need to construct a simple join case with data skewed 
> on a single key (e.g., making the data of a specified join key N times larger 
> than other join keys, where N is defined by 
> {{{}table.optimizer.skewed-join-optimization.skewed-factor{}}}). And ensuring 
> the data volume for the skewed join key exceeds the skewed-threshold (defined 
> by {{{}table.optimizer.skewed-join-optimization.skewed-threshold{}}}). 
> Finally, observe whether the ratio of the maximum data volume to the median 
> data volume processed by concurrent join tasks is less than the skew factor.
>  # 
> Test the case where {{table.optimizer.skewed-join-optimization.strategy}} is 
> set to {{{}forced{}}}. Construct a skewed join instance similar to Test 1, 
> but with the following difference: the join case should be connected to a 
> downstream operator that performs hashing on the same field (e.g., hash 
> aggregation or group by). It is recommended to set different parallelisms for 
> the join operator and the downstream operator to prevent the out edge from 
> being optimized to a forward edge. Finally, observe whether the ratio of the 
> maximum data volume to the median data volume processed by concurrent join 
> tasks is less than the skew factor.
>  # 
> Test the case where 
> {{{}table.optimizer.skewed-join-optimization.strategy{}}}as none, and verify 
> that the join operator will not be optimized into an adaptive join operator 
> under any circumstances.
>  # Test the case with customized 
> {{{}table.optimizer.skewed-join-optimization.skewed-factor{}}}. We need to 
> construct a skewed join instance similar to Test 1, setting different skewed 
> factors and observing whether the ratio of the maximum data volume to the 
> median data volume processed by concurrent join tasks is less than the skew 
> factor. Note that currently, Flink can only reduce the ratio to 2.0, and 
> please ensure that the skewed-factor is greater than 2.0 during testing.
>  # Test the case with customized 
> {{{}table.optimizer.skewed-join-optimization.skewed-threshold{}}}. We need to 
> construct a skewed join instance similar to Test 1, setting different 
> skewed-threshold and observing whether the optimization is effective only 
> when the data volume processed by the skewed join instance is greater than 
> the skewed threshold.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to