[ https://issues.apache.org/jira/browse/FLINK-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Guibo Pan updated FLINK-8532: ----------------------------- Priority: Major (was: Minor) > RebalancePartitioner should use Random value for its first partition > -------------------------------------------------------------------- > > Key: FLINK-8532 > URL: https://issues.apache.org/jira/browse/FLINK-8532 > Project: Flink > Issue Type: Improvement > Components: DataStream API > Reporter: Yuta Morisawa > Assignee: Guibo Pan > Priority: Major > Labels: pull-request-available > > In some conditions, RebalancePartitioner doesn't balance data correctly > because it use the same value for selecting next operators. > RebalancePartitioner initializes its partition id using the same value in > every threads, so it indeed balances data, but at one moment the amount of > data in each operator is skew. > Particularly, when the data rate of former operators is equal , data skew > becomes severe. > > > Example: > Consider a simple operator chain. > -> map1 -> rebalance -> map2 -> > Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, > 6). > map1 map2 > st1 st4 > st2 st5 > st3 st6 > > At the beginning, every subtasks in map1 sends data to st4 in map2 because > they use the same initial parition id. > Next time the map1 receive data st1,2,3 send data to st5 because they > increment its partition id when they processed former data. > In my environment, it takes twice the time to process data when I use > RebalancePartitioner as long as I use other partitioners(rescale, keyby). > > To solve this problem, in my opinion, RebalancePartitioner should use its own > operator id for the initial value. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)