[ https://issues.apache.org/jira/browse/FLINK-37161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Junrui Lee reassigned FLINK-37161: ---------------------------------- Assignee: (was: Lei Yang) > Cross-team verification for "Adaptive skewed join optimization for batch jobs" > ------------------------------------------------------------------------------ > > Key: FLINK-37161 > URL: https://issues.apache.org/jira/browse/FLINK-37161 > Project: Flink > Issue Type: Sub-task > Reporter: Junrui Lee > Priority: Blocker > Fix For: 2.0.0 > > > In Flink 2.0, we support the capability of adaptive skewed join optimization > for batch jobs, which will allow the Join operator to dynamically split > skewed and splittable partitions based on runtime input statistics, thereby > mitigating the long-tail problem caused by skewed data. > We may need the following tests: > # > Test the case where {{table.optimizer.skewed-join-optimization.strategy}} is > set to {{{}auto{}}}. We need to construct a simple join case with data skewed > on a single key (e.g., making the data of a specified join key N times larger > than other join keys, where N is defined by > {{{}table.optimizer.skewed-join-optimization.skewed-factor{}}}). And ensuring > the data volume for the skewed join key exceeds the skewed-threshold (defined > by {{{}table.optimizer.skewed-join-optimization.skewed-threshold{}}}). > Finally, observe whether the ratio of the maximum data volume to the median > data volume processed by concurrent join tasks is less than the skew factor. > # > Test the case where {{table.optimizer.skewed-join-optimization.strategy}} is > set to {{{}forced{}}}. Construct a skewed join instance similar to Test 1, > but with the following difference: the join case should be connected to a > downstream operator that performs hashing on the same field (e.g., hash > aggregation or group by). It is recommended to set different parallelisms for > the join operator and the downstream operator to prevent the out edge from > being optimized to a forward edge. Finally, observe whether the ratio of the > maximum data volume to the median data volume processed by concurrent join > tasks is less than the skew factor. > # > Test the case where > {{{}table.optimizer.skewed-join-optimization.strategy{}}}as none, and verify > that the join operator will not be optimized into an adaptive join operator > under any circumstances. > # Test the case with customized > {{{}table.optimizer.skewed-join-optimization.skewed-factor{}}}. We need to > construct a skewed join instance similar to Test 1, setting different skewed > factors and observing whether the ratio of the maximum data volume to the > median data volume processed by concurrent join tasks is less than the skew > factor. Note that currently, Flink can only reduce the ratio to 2.0, and > please ensure that the skewed-factor is greater than 2.0 during testing. > # Test the case with customized > {{{}table.optimizer.skewed-join-optimization.skewed-threshold{}}}. We need to > construct a skewed join instance similar to Test 1, setting different > skewed-threshold and observing whether the optimization is effective only > when the data volume processed by the skewed join instance is greater than > the skewed threshold. > -- This message was sent by Atlassian Jira (v8.20.10#820010)