[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15973043#comment-15973043
 ] 

ASF GitHub Bot commented on FLINK-4545:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3721#discussion_r112000165
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
 ---
    @@ -0,0 +1,272 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.taskexecutor;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.TaskManagerOptions;
    +import org.apache.flink.core.memory.MemoryType;
    +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
    +import 
org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
    +import org.apache.flink.runtime.util.EnvironmentInformation;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.powermock.api.mockito.PowerMockito;
    +import org.powermock.core.classloader.annotations.PrepareForTest;
    +import org.powermock.modules.junit4.PowerMockRunner;
    +
    +import java.net.InetAddress;
    +import java.util.Random;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Mockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +
    +/**
    + * Unit test for {@link TaskManagerServices}.
    + */
    +@RunWith(PowerMockRunner.class)
    +@PrepareForTest(EnvironmentInformation.class)
    +public class TaskManagerServicesTest {
    +
    +   /**
    +    * Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using old
    +    * configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
    +    */
    +   @SuppressWarnings("deprecation")
    +   @Test
    +   public void calculateNetworkBufOld() throws Exception {
    +           Configuration config = new Configuration();
    +           config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
    +
    +           // note: actual network buffer memory size is independent of 
the totalJavaMemorySize
    +           
assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
    +                   TaskManagerServices.calculateNetworkBuf(10L << 20, 
config));
    +           
assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
    +                   TaskManagerServices.calculateNetworkBuf(64L << 20, 
config));
    +
    +           // test integer overflow in the memory size
    +           int numBuffers = (int) ((2L << 32) / 
TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33
    +           config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 
numBuffers);
    +           assertEquals(2L << 32, 
TaskManagerServices.calculateNetworkBuf(2L << 33, config));
    +   }
    +
    +   /**
    +    * Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using new
    +    * configurations via {@link 
TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
    +    * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and
    +    * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
    +    */
    +   @Test
    +   public void calculateNetworkBufNew() throws Exception {
    +           Configuration config = new Configuration();
    +
    +           // (1) defaults
    +           final Float defaultFrac = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
    +           final Long defaultMin = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
    +           final Long defaultMax = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
    +           assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) 
(defaultFrac * (10L << 20)))),
    +                   TaskManagerServices.calculateNetworkBuf((64L << 20 + 
1), config));
    +           assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) 
(defaultFrac * (10L << 30)))),
    +                   TaskManagerServices.calculateNetworkBuf((10L << 30), 
config));
    +
    +           calculateNetworkBufNew(config);
    +   }
    +
    +   /**
    +    * Helper to test {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} with the
    +    * new configuration parameters.
    +    *
    +    * @param config configuration object
    +    */
    +   private static void calculateNetworkBufNew(final Configuration config) {
    +           // (2) fixed size memory
    +           config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 
1L << 20); // 1MB
    +           config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 
1L << 20); // 1MB
    +
    +           // note: actual network buffer memory size is independent of 
the totalJavaMemorySize
    +           assertEquals(1 << 20, 
TaskManagerServices.calculateNetworkBuf(10L << 20, config));
    +           assertEquals(1 << 20, 
TaskManagerServices.calculateNetworkBuf(64L << 20, config));
    +           assertEquals(1 << 20, 
TaskManagerServices.calculateNetworkBuf(1L << 30, config));
    +
    +           // (3) random fraction, min, and max values
    +           Random ran = new Random();
    +           for (int i = 0; i < 1_000; ++i){
    +                   float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE);
    +                   
config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac);
    +
    +                   long min = 
Math.max(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), ran.nextLong());
    +                   
config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, min);
    +
    +                   long max = Math.max(min, ran.nextLong());
    +                   
config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, max);
    +
    +                   long javaMem = Math.max(max + 1, ran.nextLong());
    +
    +                   final long networkBufMem = 
TaskManagerServices.calculateNetworkBuf(javaMem, config);
    +                   assertTrue(networkBufMem >= min);
    +                   assertTrue(networkBufMem <= max);
    +                   if (networkBufMem > min && networkBufMem < max) {
    +                           assertEquals((long) (javaMem * frac), 
networkBufMem);
    +                   }
    +           }
    +   }
    +
    +   /**
    +    * Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using mixed
    +    * old/new configurations.
    +    */
    +   @SuppressWarnings("deprecation")
    +   @Test
    +   public void calculateNetworkBufMixed() throws Exception {
    +           Configuration config = new Configuration();
    +           config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
    +
    +           final Float defaultFrac = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
    +           final Long defaultMin = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
    +           final Long defaultMax = 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
    +
    +           // old + 1 new parameter = new:
    +           Configuration config1 = config.clone();
    +           
config1.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
    +           assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) 
(0.1f * (10L << 20)))),
    +                   TaskManagerServices.calculateNetworkBuf((64L << 20 + 
1), config1));
    +           assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) 
(0.1f * (10L << 30)))),
    +                   TaskManagerServices.calculateNetworkBuf((10L << 30), 
config1));
    +
    +           config1 = config.clone();
    +           long newMin = 
TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(); // smallest value 
possible
    +           config1.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 
newMin);
    +           assertEquals(Math.min(defaultMax, Math.max(newMin, (long) 
(defaultFrac * (10L << 20)))),
    +                   TaskManagerServices.calculateNetworkBuf((10L << 20), 
config1));
    +           assertEquals(Math.min(defaultMax, Math.max(newMin, (long) 
(defaultFrac * (10L << 30)))),
    +                   TaskManagerServices.calculateNetworkBuf((10L << 30), 
config1));
    +
    +           config1 = config.clone();
    +           long newMax = Math.max(64L << 20 + 1, 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue());
    +           config1.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 
newMax);
    +           assertEquals(Math.min(newMax, Math.max(defaultMin, (long) 
(defaultFrac * (10L << 20)))),
    +                   TaskManagerServices.calculateNetworkBuf((64L << 20 + 
1), config1));
    +           assertEquals(Math.min(newMax, Math.max(defaultMin, (long) 
(defaultFrac * (10L << 30)))),
    +                   TaskManagerServices.calculateNetworkBuf((10L << 30), 
config1));
    +           
assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config1));
    +
    +           // old + any new parameter = new:
    +           calculateNetworkBufNew(config);
    +   }
    +
    +   /**
    +    * Test for {@link 
TaskManagerServices#calculateNetworkBuf(TaskManagerServicesConfiguration)}
    +    * using the same (manual) test cases as in {@link 
#calculateHeapSizeMB()}.
    +    */
    +   @Test
    +   public void calculateNetworkBufFromHeapSize() throws Exception {
    +           PowerMockito.mockStatic(EnvironmentInformation.class);
    +           // some defaults:
    +           
when(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()).thenReturn(1000L
 << 20); // 1000MB
    +           
when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(1000L << 20); // 
1000MB
    +
    +           TaskManagerServicesConfiguration tmConfig;
    +
    +           tmConfig = 
getTmConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(),
    +                   
TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
    +                   0.1f, 60L << 20, 1L << 30, MemoryType.HEAP);
    +           
when(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()).thenReturn(1000L
 << 20); // 1000MB
    +           assertEquals(100L << 20, 
TaskManagerServices.calculateNetworkBuf(tmConfig));
    +
    +           tmConfig = getTmConfig(10, 
TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
    +                   0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP);
    +           
when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(890L << 20); // 
890MB
    +           assertEquals((100L << 20) + 1 /* one too many due to floating 
point imprecision */,
    +                   TaskManagerServices.calculateNetworkBuf(tmConfig));
    +
    +           tmConfig = getTmConfig(-1, 0.1f,
    +                   0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP);
    +           
when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(810L << 20); // 
810MB
    +           assertEquals((100L << 20) + 1 /* one too many due to floating 
point imprecision */,
    +                   TaskManagerServices.calculateNetworkBuf(tmConfig));
    +   }
    +
    +   /**
    +    * Returns a task manager services configuration for the tests
    +    *
    +    * @param managedMemory         see {@link 
TaskManagerOptions#MANAGED_MEMORY_SIZE}
    +    * @param managedMemoryFraction see {@link 
TaskManagerOptions#MANAGED_MEMORY_FRACTION}
    +    * @param networkBufFraction    see {@link 
TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}
    +    * @param networkBufMin                 see {@link 
TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN}
    +    * @param networkBufMax                 see {@link 
TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}
    +    * @param memType                               on-heap or off-heap
    +    *
    +    * @return configuration object
    +    */
    +   private static TaskManagerServicesConfiguration getTmConfig(
    +           final long managedMemory, final float managedMemoryFraction, 
float networkBufFraction,
    +           long networkBufMin, long networkBufMax,
    +           final MemoryType memType) {
    +
    +           final NetworkEnvironmentConfiguration networkConfig = new 
NetworkEnvironmentConfiguration(
    +                   networkBufFraction,
    +                   networkBufMin,
    +                   networkBufMax,
    +                   TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(),
    +                   memType,
    +                   null,
    +                   
TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(),
    +                   
TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(),
    +                   
TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(),
    +                   
TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue(),
    +                   null);
    +
    +           return new TaskManagerServicesConfiguration(
    +                   mock(InetAddress.class),
    +                   new String[] {},
    +                   networkConfig,
    +                   QueryableStateConfiguration.disabled(),
    +                   1,
    +                   managedMemory,
    +                   false,
    +                   managedMemoryFraction,
    +                   mock(MetricRegistryConfiguration.class),
    +                   0);
    +   }
    +
    +   /**
    +    * Test for {@link TaskManagerServices#calculateHeapSizeMB(long, 
Configuration)} with some
    +    * manually calculated scenarios.
    +    */
    +   @Test
    +   public void calculateHeapSizeMB() throws Exception {
    +           Configuration config = new Configuration();
    +           
config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
    +           config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 
64L << 20); // 64MB
    +           config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 
1L << 30); // 1GB
    +
    +           config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, false);
    +           assertEquals(1000, 
TaskManagerServices.calculateHeapSizeMB(1000, config));
    +
    +           config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true);
    +           config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 10); // 
10MB
    +           assertEquals(890, TaskManagerServices.calculateHeapSizeMB(1000, 
config));
    +
    +           config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1); // 
use fraction of given memory
    +           config.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 
0.1f); // 10%
    +           assertEquals(810, TaskManagerServices.calculateHeapSizeMB(1000, 
config));
    --- End diff --
    
    Now that i see this test it got me thinking: the managedMemory and 
networkBuffersmemory frations do not work on the same base value; i.e if both 
are set at 0.5 then one (the network i think) gets 0.5 of the total memory, 
while the managedMemory gets 0.25. I'm wondering how intuitive this is; they 
are similar when used alone, but when both are used 0.5 doesn't equal 0.5 in a 
way.


> Flink automatically manages TM network buffer
> ---------------------------------------------
>
>                 Key: FLINK-4545
>                 URL: https://issues.apache.org/jira/browse/FLINK-4545
>             Project: Flink
>          Issue Type: Wish
>          Components: Network
>            Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to