Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112003041 --- Diff: flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java --- @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dist; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.taskexecutor.TaskManagerServices; +import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.TestLogger; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Random; + +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** + * Unit test that verifies that the task manager heap size calculation used by the bash script + * <tt>taskmanager.sh</tt> returns the same values as the heap size calculation of + * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}. + * + * NOTE: the shell script uses <tt>awk</tt> to perform floating-point arithmetic which uses + * <tt>double</tt> precision but our Java code restrains to <tt>float</tt> because we actually do + * not need high precision. + */ +public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger { + + /** Key that is used by <tt>config.sh</tt>. */ + private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb"; + + /** + * Number of tests with random values. + * + * NOTE: calling the external test script is slow and thus low numbers are preferred for general + * testing. + */ + private static final int NUM_RANDOM_TESTS = 20; + + @Before + public void checkOperatingSystem() { + Assume.assumeTrue("This test checks shell scripts not available on Windows.", + !OperatingSystem.isWindows()); + } + + /** + * Tests that {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} has the same + * result as the shell script. + */ + @Test + public void compareNetworkBufShellScriptWithJava() throws Exception { + int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue(); + float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(); + + // manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB() + + compareNetworkBufJavaVsScript( + getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, managedMemSize, managedMemFrac), 0.0f); + + compareNetworkBufJavaVsScript( + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 /*MB*/, managedMemFrac), 0.0f); + + compareNetworkBufJavaVsScript( + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, managedMemSize, 0.1f), 0.0f); + + // some automated tests with random (but valid) values + + Random ran = new Random(); + for (int i = 0; i < NUM_RANDOM_TESTS; ++i) { + // tolerate that values differ by 1% (due to different floating point precisions) + compareNetworkBufJavaVsScript(getRandomConfig(ran), 0.01f); + } + } + + /** + * Tests that {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)} has the same + * result as the shell script. + */ + @Test + public void compareHeapSizeShellScriptWithJava() throws Exception { + int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue(); + float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(); + + // manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB() + + compareHeapSizeJavaVsScript( + getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, managedMemSize, managedMemFrac), 0.0f); + + compareHeapSizeJavaVsScript( + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 /*MB*/, managedMemFrac), 0.0f); + + compareHeapSizeJavaVsScript( + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, managedMemSize, 0.1f), 0.0f); + + // some automated tests with random (but valid) values + + Random ran = new Random(); + for (int i = 0; i < NUM_RANDOM_TESTS; ++i) { + // tolerate that values differ by 1% (due to different floating point precisions) + compareHeapSizeJavaVsScript(getRandomConfig(ran), 0.01f); + } + } + + /** + * Returns a flink configuration object with the given values. + * + * @param javaMemMB + * total JVM memory to use (in megabytes) + * @param useOffHeap + * whether to use off-heap memory (<tt>true</tt>) or not (<tt>false</tt>) + * @param netBufMemFrac + * fraction of JVM memory to use for network buffers + * @param netBufMemMin + * minimum memory size for network buffers (in bytes) + * @param netBufMemMax + * maximum memory size for network buffers (in bytes) + * @param managedMemSizeMB + * amount of managed memory (in megabytes) + * @param managedMemFrac + * fraction of free memory to use for managed memory (if <tt>managedMemSizeMB</tt> is + * <tt>-1</tt>) + * + * @return flink configuration + */ + private static Configuration getConfig( + final int javaMemMB, final boolean useOffHeap, final float netBufMemFrac, + final long netBufMemMin, final long netBufMemMax, final int managedMemSizeMB, + final float managedMemFrac) { + + Configuration config = new Configuration(); + + config.setLong(KEY_TASKM_MEM_SIZE, javaMemMB); + config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, useOffHeap); + + config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, netBufMemFrac); + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, netBufMemMin); + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, netBufMemMax); + + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemSizeMB); + config.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, managedMemFrac); + + return config; + } + + /** + * Returns a flink configuration object with random values (only those relevant to the tests in + * this class. + * + * @param ran random number generator + * + * @return flink configuration + */ + private static Configuration getRandomConfig(final Random ran) { + + float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE); + +// long min = Math.max(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), ran.nextLong()); --- End diff -- these lines can be removed i guess
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---