
ASF GitHub Bot commented on FLINK-4545:

Github user NicoK commented on a diff in the pull request:

    --- Diff: 
    @@ -0,0 +1,272 @@
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.taskexecutor;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.TaskManagerOptions;
    +import org.apache.flink.core.memory.MemoryType;
    +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
    +import org.apache.flink.runtime.util.EnvironmentInformation;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.powermock.api.mockito.PowerMockito;
    +import org.powermock.core.classloader.annotations.PrepareForTest;
    +import org.powermock.modules.junit4.PowerMockRunner;
    +import java.net.InetAddress;
    +import java.util.Random;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Mockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    + * Unit test for {@link TaskManagerServices}.
    + */
    +public class TaskManagerServicesTest {
    +   /**
    +    * Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using old
    +    * configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
    +    */
    +   @SuppressWarnings("deprecation")
    +   @Test
    +   public void calculateNetworkBufOld() throws Exception {
    +           Configuration config = new Configuration();
    +           config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
    +           // note: actual network buffer memory size is independent of 
the totalJavaMemorySize
    +                   TaskManagerServices.calculateNetworkBuf(10L << 20, 
    +                   TaskManagerServices.calculateNetworkBuf(64L << 20, 
    +           // test integer overflow in the memory size
    +           int numBuffers = (int) ((2L << 32) / 
TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33
    +           config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 
    +           assertEquals(2L << 32, 
TaskManagerServices.calculateNetworkBuf(2L << 33, config));
    +   }
    +   /**
    +    * Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using new
    +    * configurations via {@link 
    +    * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and
    +    * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
    +    */
    +   @Test
    +   public void calculateNetworkBufNew() throws Exception {
    +           Configuration config = new Configuration();
    +           // (1) defaults
    +           final Float defaultFrac = 
    +           final Long defaultMin = 
    +           final Long defaultMax = 
    +           assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) 
(defaultFrac * (10L << 20)))),
    +                   TaskManagerServices.calculateNetworkBuf((64L << 20 + 
1), config));
    +           assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) 
(defaultFrac * (10L << 30)))),
    +                   TaskManagerServices.calculateNetworkBuf((10L << 30), 
    +           calculateNetworkBufNew(config);
    +   }
    +   /**
    +    * Helper to test {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} with the
    +    * new configuration parameters.
    +    *
    +    * @param config configuration object
    +    */
    +   private static void calculateNetworkBufNew(final Configuration config) {
    +           // (2) fixed size memory
    +           config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 
1L << 20); // 1MB
    +           config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 
1L << 20); // 1MB
    +           // note: actual network buffer memory size is independent of 
the totalJavaMemorySize
    +           assertEquals(1 << 20, 
TaskManagerServices.calculateNetworkBuf(10L << 20, config));
    +           assertEquals(1 << 20, 
TaskManagerServices.calculateNetworkBuf(64L << 20, config));
    +           assertEquals(1 << 20, 
TaskManagerServices.calculateNetworkBuf(1L << 30, config));
    +           // (3) random fraction, min, and max values
    +           Random ran = new Random();
    +           for (int i = 0; i < 1_000; ++i){
    +                   float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE);
config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac);
    +                   long min = 
Math.max(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), ran.nextLong());
config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, min);
    +                   long max = Math.max(min, ran.nextLong());
config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, max);
    +                   long javaMem = Math.max(max + 1, ran.nextLong());
    +                   final long networkBufMem = 
TaskManagerServices.calculateNetworkBuf(javaMem, config);
    +                   assertTrue(networkBufMem >= min);
    +                   assertTrue(networkBufMem <= max);
    +                   if (networkBufMem > min && networkBufMem < max) {
    +                           assertEquals((long) (javaMem * frac), 
    +                   }
    +           }
    +   }
    +   /**
    +    * Test for {@link TaskManagerServices#calculateNetworkBuf(long, 
Configuration)} using mixed
    +    * old/new configurations.
    +    */
    +   @SuppressWarnings("deprecation")
    +   @Test
    +   public void calculateNetworkBufMixed() throws Exception {
    +           Configuration config = new Configuration();
    +           config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
    +           final Float defaultFrac = 
    +           final Long defaultMin = 
    +           final Long defaultMax = 
    +           // old + 1 new parameter = new:
    +           Configuration config1 = config.clone();
config1.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
    +           assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) 
(0.1f * (10L << 20)))),
    --- End diff --
    yes, if we define our own test-defaults, your are right - however, we would 
decouple the test from the defaults that are set for real-world applications 
and I wanted to keep them as close as possible including any future change in 
the default values

> Flink automatically manages TM network buffer
> ---------------------------------------------
>                 Key: FLINK-4545
>                 URL: https://issues.apache.org/jira/browse/FLINK-4545
>             Project: Flink
>          Issue Type: Wish
>          Components: Network
>            Reporter: Zhenzhong Xu
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.

This message was sent by Atlassian JIRA

Reply via email to