pnowojski closed pull request #7069: [FLINK-10835][network] Remove duplicated round-robin ChannelSelector implementation URL: https://github.com/apache/flink/pull/7069
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java index c707d47d9c6..96a4e1a081f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java @@ -29,26 +29,12 @@ */ public class RoundRobinChannelSelector<T extends IOReadableWritable> implements ChannelSelector<T> { - /** - * Stores the index of the channel to send the next record to. - */ - private final int[] nextChannelToSendTo = new int[1]; - - /** - * Constructs a new default channel selector. - */ - public RoundRobinChannelSelector() { - this.nextChannelToSendTo[0] = 0; - } + /** Stores the index of the channel to send the next record to. */ + private final int[] nextChannelToSendTo = new int[] { -1 }; @Override public int[] selectChannels(final T record, final int numberOfOutputChannels) { - - int newChannel = ++this.nextChannelToSendTo[0]; - if (newChannel >= numberOfOutputChannels) { - this.nextChannelToSendTo[0] = 0; - } - - return this.nextChannelToSendTo; + nextChannelToSendTo[0] = (nextChannelToSendTo[0] + 1) % numberOfOutputChannels; + return nextChannelToSendTo; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java index e090c7fd27f..61bcde9e6cb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network; +import org.apache.flink.runtime.io.network.api.writer.ChannelSelector; import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector; import org.apache.flink.types.StringValue; @@ -35,17 +36,22 @@ */ @Test public void channelSelect() { - final StringValue dummyRecord = new StringValue("abc"); - final RoundRobinChannelSelector<StringValue> selector = new RoundRobinChannelSelector<StringValue>(); - // Test with two channels - final int numberOfOutputChannels = 2; - int[] selectedChannels = selector.selectChannels(dummyRecord, numberOfOutputChannels); - assertEquals(1, selectedChannels.length); - assertEquals(1, selectedChannels[0]); - selectedChannels = selector.selectChannels(dummyRecord, numberOfOutputChannels); - assertEquals(1, selectedChannels.length); - assertEquals(0, selectedChannels[0]); + final RoundRobinChannelSelector<StringValue> selector = new RoundRobinChannelSelector<>(); + final int numberOfChannels = 2; + + assertSelectedChannel(selector, dummyRecord, numberOfChannels, 0); + assertSelectedChannel(selector, dummyRecord, numberOfChannels, 1); } + private void assertSelectedChannel( + ChannelSelector<StringValue> selector, + StringValue record, + int numberOfChannels, + int expectedChannel) { + + int[] actualResult = selector.selectChannels(record, numberOfChannels); + assertEquals(1, actualResult.length); + assertEquals(expectedChannel, actualResult[0]); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java index ed9f4cc3026..52796024ac9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java @@ -210,7 +210,7 @@ public void testBroadcastEventNoRecords() throws Exception { TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize); ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider); - RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new RoundRobin<ByteArrayIO>()); + RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new RoundRobinChannelSelector<>()); CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 919192L, Integer.MAX_VALUE + 18828228L, CheckpointOptions.forCheckpointWithDefaultLocation()); // No records emitted yet, broadcast should not request a buffer @@ -247,7 +247,7 @@ public void testBroadcastEventMixedRecords() throws Exception { TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize); ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider); - RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new RoundRobin<ByteArrayIO>()); + RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new RoundRobinChannelSelector<>()); CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L, CheckpointOptions.forCheckpointWithDefaultLocation()); // Emit records on some channels first (requesting buffers), then @@ -593,20 +593,6 @@ public void read(DataInputView in) throws IOException { } } - /** - * RoundRobin channel selector starting at 0 ({@link RoundRobinChannelSelector} starts at 1). - */ - private static class RoundRobin<T extends IOReadableWritable> implements ChannelSelector<T> { - - private int[] nextChannel = new int[] { -1 }; - - @Override - public int[] selectChannels(final T record, final int numberOfOutputChannels) { - nextChannel[0] = (nextChannel[0] + 1) % numberOfOutputChannels; - return nextChannel; - } - } - /** * Broadcast channel selector that selects all the output channels. */ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services