pnowojski closed pull request #7069: [FLINK-10835][network] Remove duplicated 
round-robin ChannelSelector implementation
URL: https://github.com/apache/flink/pull/7069
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java
index c707d47d9c6..96a4e1a081f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java
@@ -29,26 +29,12 @@
  */
 public class RoundRobinChannelSelector<T extends IOReadableWritable> 
implements ChannelSelector<T> {
 
-       /**
-        * Stores the index of the channel to send the next record to.
-        */
-       private final int[] nextChannelToSendTo = new int[1];
-
-       /**
-        * Constructs a new default channel selector.
-        */
-       public RoundRobinChannelSelector() {
-               this.nextChannelToSendTo[0] = 0;
-       }
+       /** Stores the index of the channel to send the next record to. */
+       private final int[] nextChannelToSendTo = new int[] { -1 };
 
        @Override
        public int[] selectChannels(final T record, final int 
numberOfOutputChannels) {
-
-               int newChannel = ++this.nextChannelToSendTo[0];
-               if (newChannel >= numberOfOutputChannels) {
-                       this.nextChannelToSendTo[0] = 0;
-               }
-
-               return this.nextChannelToSendTo;
+               nextChannelToSendTo[0] = (nextChannelToSendTo[0] + 1) % 
numberOfOutputChannels;
+               return nextChannelToSendTo;
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java
index e090c7fd27f..61bcde9e6cb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network;
 
+import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
 import 
org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
 import org.apache.flink.types.StringValue;
 
@@ -35,17 +36,22 @@
         */
        @Test
        public void channelSelect() {
-
                final StringValue dummyRecord = new StringValue("abc");
-               final RoundRobinChannelSelector<StringValue> selector = new 
RoundRobinChannelSelector<StringValue>();
-               // Test with two channels
-               final int numberOfOutputChannels = 2;
-               int[] selectedChannels = selector.selectChannels(dummyRecord, 
numberOfOutputChannels);
-               assertEquals(1, selectedChannels.length);
-               assertEquals(1, selectedChannels[0]);
-               selectedChannels = selector.selectChannels(dummyRecord, 
numberOfOutputChannels);
-               assertEquals(1, selectedChannels.length);
-               assertEquals(0, selectedChannels[0]);
+               final RoundRobinChannelSelector<StringValue> selector = new 
RoundRobinChannelSelector<>();
+               final int numberOfChannels = 2;
+
+               assertSelectedChannel(selector, dummyRecord, numberOfChannels, 
0);
+               assertSelectedChannel(selector, dummyRecord, numberOfChannels, 
1);
        }
 
+       private void assertSelectedChannel(
+               ChannelSelector<StringValue> selector,
+               StringValue record,
+               int numberOfChannels,
+               int expectedChannel) {
+
+               int[] actualResult = selector.selectChannels(record, 
numberOfChannels);
+               assertEquals(1, actualResult.length);
+               assertEquals(expectedChannel, actualResult[0]);
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index ed9f4cc3026..52796024ac9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -210,7 +210,7 @@ public void testBroadcastEventNoRecords() throws Exception {
                TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
 
                ResultPartitionWriter partitionWriter = new 
CollectingPartitionWriter(queues, bufferProvider);
-               RecordWriter<ByteArrayIO> writer = new 
RecordWriter<>(partitionWriter, new RoundRobin<ByteArrayIO>());
+               RecordWriter<ByteArrayIO> writer = new 
RecordWriter<>(partitionWriter, new RoundRobinChannelSelector<>());
                CheckpointBarrier barrier = new 
CheckpointBarrier(Integer.MAX_VALUE + 919192L, Integer.MAX_VALUE + 18828228L, 
CheckpointOptions.forCheckpointWithDefaultLocation());
 
                // No records emitted yet, broadcast should not request a buffer
@@ -247,7 +247,7 @@ public void testBroadcastEventMixedRecords() throws 
Exception {
                TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
 
                ResultPartitionWriter partitionWriter = new 
CollectingPartitionWriter(queues, bufferProvider);
-               RecordWriter<ByteArrayIO> writer = new 
RecordWriter<>(partitionWriter, new RoundRobin<ByteArrayIO>());
+               RecordWriter<ByteArrayIO> writer = new 
RecordWriter<>(partitionWriter, new RoundRobinChannelSelector<>());
                CheckpointBarrier barrier = new 
CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L, 
CheckpointOptions.forCheckpointWithDefaultLocation());
 
                // Emit records on some channels first (requesting buffers), 
then
@@ -593,20 +593,6 @@ public void read(DataInputView in) throws IOException {
                }
        }
 
-       /**
-        * RoundRobin channel selector starting at 0 ({@link 
RoundRobinChannelSelector} starts at 1).
-        */
-       private static class RoundRobin<T extends IOReadableWritable> 
implements ChannelSelector<T> {
-
-               private int[] nextChannel = new int[] { -1 };
-
-               @Override
-               public int[] selectChannels(final T record, final int 
numberOfOutputChannels) {
-                       nextChannel[0] = (nextChannel[0] + 1) % 
numberOfOutputChannels;
-                       return nextChannel;
-               }
-       }
-
        /**
         * Broadcast channel selector that selects all the output channels.
         */


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to