[ 
https://issues.apache.org/jira/browse/FLINK-10820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681482#comment-16681482
 ] 

ASF GitHub Bot commented on FLINK-10820:
----------------------------------------

pnowojski commented on a change in pull request #7051: [FLINK-10820][network] 
Simplify the RebalancePartitioner implementation
URL: https://github.com/apache/flink/pull/7051#discussion_r232263469
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
 ##########
 @@ -33,29 +31,14 @@
 public class RebalancePartitioner<T> extends StreamPartitioner<T> {
        private static final long serialVersionUID = 1L;
 
-       private final int[] returnArray = {Integer.MAX_VALUE - 1};
+       private final int[] returnArray = new int[] { -1 };
 
        @Override
-       public int[] selectChannels(
-                       SerializationDelegate<StreamRecord<T>> record,
-                       int numChannels) {
-               int newChannel = ++returnArray[0];
-               if (newChannel >= numChannels) {
-                       returnArray[0] = resetValue(numChannels, newChannel);
-               }
+       public int[] selectChannels(SerializationDelegate<StreamRecord<T>> 
record, int numChannels) {
+               returnArray[0] = (returnArray[0] + 1) % numChannels;
                return returnArray;
        }
 
-       private static int resetValue(
-                       int numChannels,
-                       int newChannel) {
-               if (newChannel == Integer.MAX_VALUE) {
-                       // Initializes the first partition, this branch is only 
entered when initializing.
-                       return ThreadLocalRandom.current().nextInt(numChannels);
 
 Review comment:
   Lack of this random changes the actual semantic of the code. With your 
change the round robin will always start with the same partition. If you follow 
the annotations when it was introduced, it leads to this ticket:
   https://issues.apache.org/jira/browse/FLINK-8532
   
   I don't think that this if condition should effect performance in a 
meaningful way anyway (but who knows). Nevertheless this random was introduced 
for some reason and I would be inclined to keep it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Simplify the RebalancePartitioner implementation
> ------------------------------------------------
>
>                 Key: FLINK-10820
>                 URL: https://issues.apache.org/jira/browse/FLINK-10820
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Network
>    Affects Versions: 1.8.0
>            Reporter: zhijiang
>            Assignee: zhijiang
>            Priority: Minor
>              Labels: pull-request-available
>
> The current {{RebalancePartitioner}} implementations seems a little hacky for 
> selecting a random number as the first channel index, and the following 
> selections based on this random index in round-robin fashion.
> We can define a constant as the first channel index to make the 
> implementation simple and readable. To do so, it will not change the 
> rebalance semantics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to