pnowojski closed pull request #6697: [hotfix][benchmarks] Add network broadcast 
benchmark
URL: https://github.com/apache/flink/pull/6697
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
index 05ae276973a..ba3294b3571 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
@@ -35,6 +35,7 @@
  */
 public class LongRecordWriterThread extends CheckedThread {
        private final StreamRecordWriter<LongValue> recordWriter;
+       private final boolean broadcastMode;
 
        /**
         * Future to wait on a definition of the number of records to send.
@@ -43,8 +44,11 @@
 
        private volatile boolean running = true;
 
-       public LongRecordWriterThread(StreamRecordWriter<LongValue> 
recordWriter) {
+       public LongRecordWriterThread(
+                       StreamRecordWriter<LongValue> recordWriter,
+                       boolean broadcastMode) {
                this.recordWriter = checkNotNull(recordWriter);
+               this.broadcastMode = broadcastMode;
        }
 
        public synchronized void shutdown() {
@@ -89,7 +93,12 @@ private void sendRecords(long records) throws IOException, 
InterruptedException
                LongValue value = new LongValue(0);
 
                for (int i = 1; i < records; i++) {
-                       recordWriter.emit(value);
+                       if (broadcastMode) {
+                               recordWriter.broadcastEmit(value);
+                       }
+                       else {
+                               recordWriter.emit(value);
+                       }
                }
                value.setValue(records);
                recordWriter.broadcastEmit(value);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index bfaed437797..1b769c80f0a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -87,6 +87,7 @@
        protected IOManager ioManager;
 
        protected int channels;
+       protected boolean broadcastMode = false;
        protected boolean localMode = false;
 
        protected ResultPartitionID[] partitionIds;
@@ -100,6 +101,7 @@ public void setUp(
                setUp(
                        writers,
                        channels,
+                       false,
                        localMode,
                        senderBufferPoolSize,
                        receiverBufferPoolSize,
@@ -123,10 +125,12 @@ public void setUp(
        public void setUp(
                        int writers,
                        int channels,
+                       boolean broadcastMode,
                        boolean localMode,
                        int senderBufferPoolSize,
                        int receiverBufferPoolSize,
                        Configuration config) throws Exception {
+               this.broadcastMode = broadcastMode;
                this.localMode = localMode;
                this.channels = channels;
                this.partitionIds = new ResultPartitionID[writers];
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBroadcastThroughputBenchmark.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBroadcastThroughputBenchmark.java
new file mode 100644
index 00000000000..f4854d6b3e8
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBroadcastThroughputBenchmark.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.benchmark;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Network throughput benchmarks executed by the external
+ * <a 
href="https://github.com/dataArtisans/flink-benchmarks";>flink-benchmarks</a> 
project.
+ */
+public class StreamNetworkBroadcastThroughputBenchmark extends 
StreamNetworkThroughputBenchmark {
+
+       /**
+        * Same as {@link StreamNetworkThroughputBenchmark#setUp(int, int, int, 
boolean, int, int)}
+        * but also setups broadcast mode.
+        */
+       @Override
+       public void setUp(
+                       int recordWriters,
+                       int channels,
+                       int flushTimeout,
+                       boolean localMode,
+                       int senderBufferPoolSize,
+                       int receiverBufferPoolSize) throws Exception {
+               setUp(
+                       recordWriters,
+                       channels,
+                       flushTimeout,
+                       true,
+                       localMode,
+                       senderBufferPoolSize,
+                       receiverBufferPoolSize,
+                       new Configuration()
+               );
+       }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBroadcastThroughputBenchmarkTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBroadcastThroughputBenchmarkTest.java
new file mode 100644
index 00000000000..acee9fe6ef3
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBroadcastThroughputBenchmarkTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.benchmark;
+
+/**
+ * Tests for various network benchmarks based on {@link 
StreamNetworkBroadcastThroughputBenchmark}.
+ */
+public class StreamNetworkBroadcastThroughputBenchmarkTest extends 
StreamNetworkThroughputBenchmarkTest {
+       @Override
+       protected StreamNetworkThroughputBenchmark createBenchmark() {
+               return new StreamNetworkBroadcastThroughputBenchmark();
+       }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
index a8d18e4ef10..bf6fdc4fb60 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
@@ -75,7 +75,7 @@ public void setUp(long flushTimeout) throws Exception {
         */
        public void setUp(long flushTimeout, Configuration config) throws 
Exception {
                environment = new StreamNetworkBenchmarkEnvironment<>();
-               environment.setUp(1, 1, false, -1, -1, config);
+               environment.setUp(1, 1, false, false, -1, -1, config);
 
                receiver = environment.createReceiver();
                recordWriter = environment.createRecordWriter(0, flushTimeout);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
index 28d7f3556a0..0586f54eeda 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
@@ -29,9 +29,9 @@
  * <a 
href="https://github.com/dataArtisans/flink-benchmarks";>flink-benchmarks</a> 
project.
  */
 public class StreamNetworkThroughputBenchmark {
-       private StreamNetworkBenchmarkEnvironment<LongValue> environment;
-       private ReceiverThread receiver;
-       private LongRecordWriterThread[] writerThreads;
+       protected StreamNetworkBenchmarkEnvironment<LongValue> environment;
+       protected ReceiverThread receiver;
+       protected LongRecordWriterThread[] writerThreads;
 
        public void executeBenchmark(long records) throws Exception {
                executeBenchmark(records, Long.MAX_VALUE);
@@ -75,6 +75,7 @@ public void setUp(
                        recordWriters,
                        channels,
                        flushTimeout,
+                       false,
                        localMode,
                        senderBufferPoolSize,
                        receiverBufferPoolSize,
@@ -95,16 +96,26 @@ public void setUp(
                        int recordWriters,
                        int channels,
                        int flushTimeout,
+                       boolean broadcastMode,
                        boolean localMode,
                        int senderBufferPoolSize,
                        int receiverBufferPoolSize,
                        Configuration config) throws Exception {
                environment = new StreamNetworkBenchmarkEnvironment<>();
-               environment.setUp(recordWriters, channels, localMode, 
senderBufferPoolSize, receiverBufferPoolSize, config);
+               environment.setUp(
+                       recordWriters,
+                       channels,
+                       broadcastMode,
+                       localMode,
+                       senderBufferPoolSize,
+                       receiverBufferPoolSize,
+                       config);
                receiver = environment.createReceiver();
                writerThreads = new LongRecordWriterThread[recordWriters];
                for (int writer = 0; writer < recordWriters; writer++) {
-                       writerThreads[writer] = new 
LongRecordWriterThread(environment.createRecordWriter(writer, flushTimeout));
+                       writerThreads[writer] = new LongRecordWriterThread(
+                               environment.createRecordWriter(writer, 
flushTimeout),
+                               broadcastMode);
                        writerThreads[writer].start();
                }
        }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java
index dac8ee24eba..97675dbfeb6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java
@@ -33,9 +33,13 @@
        @Rule
        public ExpectedException expectedException = ExpectedException.none();
 
+       protected StreamNetworkThroughputBenchmark createBenchmark() {
+               return new StreamNetworkThroughputBenchmark();
+       }
+
        @Test
        public void pointToPointBenchmark() throws Exception {
-               StreamNetworkThroughputBenchmark benchmark = new 
StreamNetworkThroughputBenchmark();
+               StreamNetworkThroughputBenchmark benchmark = createBenchmark();
                benchmark.setUp(1, 1, 100);
                try {
                        benchmark.executeBenchmark(1_000);
@@ -107,7 +111,7 @@ public void remoteModeMinimumBuffers() throws Exception {
 
        @Test
        public void pointToMultiPointBenchmark() throws Exception {
-               StreamNetworkThroughputBenchmark benchmark = new 
StreamNetworkThroughputBenchmark();
+               StreamNetworkThroughputBenchmark benchmark = createBenchmark();
                benchmark.setUp(1, 100, 100);
                try {
                        benchmark.executeBenchmark(1_000);
@@ -119,7 +123,7 @@ public void pointToMultiPointBenchmark() throws Exception {
 
        @Test
        public void multiPointToPointBenchmark() throws Exception {
-               StreamNetworkThroughputBenchmark benchmark = new 
StreamNetworkThroughputBenchmark();
+               StreamNetworkThroughputBenchmark benchmark = createBenchmark();
                benchmark.setUp(4, 1, 100);
                try {
                        benchmark.executeBenchmark(1_000);
@@ -131,7 +135,7 @@ public void multiPointToPointBenchmark() throws Exception {
 
        @Test
        public void multiPointToMultiPointBenchmark() throws Exception {
-               StreamNetworkThroughputBenchmark benchmark = new 
StreamNetworkThroughputBenchmark();
+               StreamNetworkThroughputBenchmark benchmark = createBenchmark();
                benchmark.setUp(4, 100, 100);
                try {
                        benchmark.executeBenchmark(1_000);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to