pnowojski closed pull request #6697: [hotfix][benchmarks] Add network broadcast benchmark URL: https://github.com/apache/flink/pull/6697
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java index 05ae276973a..ba3294b3571 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java @@ -35,6 +35,7 @@ */ public class LongRecordWriterThread extends CheckedThread { private final StreamRecordWriter<LongValue> recordWriter; + private final boolean broadcastMode; /** * Future to wait on a definition of the number of records to send. @@ -43,8 +44,11 @@ private volatile boolean running = true; - public LongRecordWriterThread(StreamRecordWriter<LongValue> recordWriter) { + public LongRecordWriterThread( + StreamRecordWriter<LongValue> recordWriter, + boolean broadcastMode) { this.recordWriter = checkNotNull(recordWriter); + this.broadcastMode = broadcastMode; } public synchronized void shutdown() { @@ -89,7 +93,12 @@ private void sendRecords(long records) throws IOException, InterruptedException LongValue value = new LongValue(0); for (int i = 1; i < records; i++) { - recordWriter.emit(value); + if (broadcastMode) { + recordWriter.broadcastEmit(value); + } + else { + recordWriter.emit(value); + } } value.setValue(records); recordWriter.broadcastEmit(value); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java index bfaed437797..1b769c80f0a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java @@ -87,6 +87,7 @@ protected IOManager ioManager; protected int channels; + protected boolean broadcastMode = false; protected boolean localMode = false; protected ResultPartitionID[] partitionIds; @@ -100,6 +101,7 @@ public void setUp( setUp( writers, channels, + false, localMode, senderBufferPoolSize, receiverBufferPoolSize, @@ -123,10 +125,12 @@ public void setUp( public void setUp( int writers, int channels, + boolean broadcastMode, boolean localMode, int senderBufferPoolSize, int receiverBufferPoolSize, Configuration config) throws Exception { + this.broadcastMode = broadcastMode; this.localMode = localMode; this.channels = channels; this.partitionIds = new ResultPartitionID[writers]; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBroadcastThroughputBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBroadcastThroughputBenchmark.java new file mode 100644 index 00000000000..f4854d6b3e8 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBroadcastThroughputBenchmark.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io.benchmark; + +import org.apache.flink.configuration.Configuration; + +/** + * Network throughput benchmarks executed by the external + * <a href="https://github.com/dataArtisans/flink-benchmarks">flink-benchmarks</a> project. + */ +public class StreamNetworkBroadcastThroughputBenchmark extends StreamNetworkThroughputBenchmark { + + /** + * Same as {@link StreamNetworkThroughputBenchmark#setUp(int, int, int, boolean, int, int)} + * but also setups broadcast mode. + */ + @Override + public void setUp( + int recordWriters, + int channels, + int flushTimeout, + boolean localMode, + int senderBufferPoolSize, + int receiverBufferPoolSize) throws Exception { + setUp( + recordWriters, + channels, + flushTimeout, + true, + localMode, + senderBufferPoolSize, + receiverBufferPoolSize, + new Configuration() + ); + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBroadcastThroughputBenchmarkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBroadcastThroughputBenchmarkTest.java new file mode 100644 index 00000000000..acee9fe6ef3 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBroadcastThroughputBenchmarkTest.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io.benchmark; + +/** + * Tests for various network benchmarks based on {@link StreamNetworkBroadcastThroughputBenchmark}. + */ +public class StreamNetworkBroadcastThroughputBenchmarkTest extends StreamNetworkThroughputBenchmarkTest { + @Override + protected StreamNetworkThroughputBenchmark createBenchmark() { + return new StreamNetworkBroadcastThroughputBenchmark(); + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java index a8d18e4ef10..bf6fdc4fb60 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java @@ -75,7 +75,7 @@ public void setUp(long flushTimeout) throws Exception { */ public void setUp(long flushTimeout, Configuration config) throws Exception { environment = new StreamNetworkBenchmarkEnvironment<>(); - environment.setUp(1, 1, false, -1, -1, config); + environment.setUp(1, 1, false, false, -1, -1, config); receiver = environment.createReceiver(); recordWriter = environment.createRecordWriter(0, flushTimeout); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java index 28d7f3556a0..0586f54eeda 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java @@ -29,9 +29,9 @@ * <a href="https://github.com/dataArtisans/flink-benchmarks">flink-benchmarks</a> project. */ public class StreamNetworkThroughputBenchmark { - private StreamNetworkBenchmarkEnvironment<LongValue> environment; - private ReceiverThread receiver; - private LongRecordWriterThread[] writerThreads; + protected StreamNetworkBenchmarkEnvironment<LongValue> environment; + protected ReceiverThread receiver; + protected LongRecordWriterThread[] writerThreads; public void executeBenchmark(long records) throws Exception { executeBenchmark(records, Long.MAX_VALUE); @@ -75,6 +75,7 @@ public void setUp( recordWriters, channels, flushTimeout, + false, localMode, senderBufferPoolSize, receiverBufferPoolSize, @@ -95,16 +96,26 @@ public void setUp( int recordWriters, int channels, int flushTimeout, + boolean broadcastMode, boolean localMode, int senderBufferPoolSize, int receiverBufferPoolSize, Configuration config) throws Exception { environment = new StreamNetworkBenchmarkEnvironment<>(); - environment.setUp(recordWriters, channels, localMode, senderBufferPoolSize, receiverBufferPoolSize, config); + environment.setUp( + recordWriters, + channels, + broadcastMode, + localMode, + senderBufferPoolSize, + receiverBufferPoolSize, + config); receiver = environment.createReceiver(); writerThreads = new LongRecordWriterThread[recordWriters]; for (int writer = 0; writer < recordWriters; writer++) { - writerThreads[writer] = new LongRecordWriterThread(environment.createRecordWriter(writer, flushTimeout)); + writerThreads[writer] = new LongRecordWriterThread( + environment.createRecordWriter(writer, flushTimeout), + broadcastMode); writerThreads[writer].start(); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java index dac8ee24eba..97675dbfeb6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java @@ -33,9 +33,13 @@ @Rule public ExpectedException expectedException = ExpectedException.none(); + protected StreamNetworkThroughputBenchmark createBenchmark() { + return new StreamNetworkThroughputBenchmark(); + } + @Test public void pointToPointBenchmark() throws Exception { - StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark(); + StreamNetworkThroughputBenchmark benchmark = createBenchmark(); benchmark.setUp(1, 1, 100); try { benchmark.executeBenchmark(1_000); @@ -107,7 +111,7 @@ public void remoteModeMinimumBuffers() throws Exception { @Test public void pointToMultiPointBenchmark() throws Exception { - StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark(); + StreamNetworkThroughputBenchmark benchmark = createBenchmark(); benchmark.setUp(1, 100, 100); try { benchmark.executeBenchmark(1_000); @@ -119,7 +123,7 @@ public void pointToMultiPointBenchmark() throws Exception { @Test public void multiPointToPointBenchmark() throws Exception { - StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark(); + StreamNetworkThroughputBenchmark benchmark = createBenchmark(); benchmark.setUp(4, 1, 100); try { benchmark.executeBenchmark(1_000); @@ -131,7 +135,7 @@ public void multiPointToPointBenchmark() throws Exception { @Test public void multiPointToMultiPointBenchmark() throws Exception { - StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark(); + StreamNetworkThroughputBenchmark benchmark = createBenchmark(); benchmark.setUp(4, 100, 100); try { benchmark.executeBenchmark(1_000); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services