azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8090#discussion_r271820791
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java ########## @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() { return nettyConfig; } + public boolean isCreditBased() { + return isCreditBased; + } + + // ------------------------------------------------------------------------ + + /** + * Utility method to extract network related parameters from the configuration and to + * sanity check them. + * + * @param configuration configuration object + * @param maxJvmHeapMemory the maximum JVM heap size (in bytes) + * @param localTaskManagerCommunication true, to skip initializing the network stack + * @param taskManagerAddress identifying the IP address under which the TaskManager will be accessible + * @return NetworkEnvironmentConfiguration + */ + @Deprecated + public static NetworkEnvironmentConfiguration fromConfiguration( + Configuration configuration, + long maxJvmHeapMemory, + boolean localTaskManagerCommunication, + InetAddress taskManagerAddress) { + + // ----> hosts / ports for communication and data exchange + + final int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT); + ConfigurationParserUtils.checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(), + "Leave config parameter empty or use 0 to let the system choose a port automatically."); + + final int pageSize = ConfigurationParserUtils.getPageSize(configuration); + + final int numNetworkBuffers; + if (!hasNewNetworkConfig(configuration)) { + // fallback: number of network buffers + numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS); + + checkOldNetworkConfig(numNetworkBuffers); + } else { + if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) { + LOG.info("Ignoring old (but still present) network buffer configuration via {}.", + TaskManagerOptions.NETWORK_NUM_BUFFERS.key()); + } + + final long networkMemorySize = calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory); + + // tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory) + long numNetworkBuffersLong = networkMemorySize / pageSize; + if (numNetworkBuffersLong > Integer.MAX_VALUE) { + throw new IllegalArgumentException("The given number of memory bytes (" + networkMemorySize + + ") corresponds to more than MAX_INT pages."); + } + numNetworkBuffers = (int) numNetworkBuffersLong; + } + + final NettyConfig nettyConfig; + if (!localTaskManagerCommunication) { + final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport); + + nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(), taskManagerInetSocketAddress.getPort(), + pageSize, ConfigurationParserUtils.getSlot(configuration), configuration); + } else { + nettyConfig = null; + } + + int initialRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL); + int maxRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX); + + int buffersPerChannel = configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL); + int extraBuffersPerGate = configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE); + + boolean isCreditBased = configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL); + + return new NetworkEnvironmentConfiguration( + numNetworkBuffers, + pageSize, + initialRequestBackoff, + maxRequestBackoff, + buffersPerChannel, + extraBuffersPerGate, + isCreditBased, + nettyConfig); + } + + /** + * Calculates the amount of memory used for network buffers inside the current JVM instance + * based on the available heap or the max heap size and the according configuration parameters. + * + * <p>For containers or when started via scripts, if started with a memory limit and set to use + * off-heap memory, the maximum heap size for the JVM is adjusted accordingly and we are able + * to extract the intended values from this. + * + * <p>The following configuration parameters are involved: + * <ul> + * <li>{@link TaskManagerOptions#MANAGED_MEMORY_SIZE},</li> + * <li>{@link TaskManagerOptions#MANAGED_MEMORY_FRACTION},</li> + * <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li> + * <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},</li> + * <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, and</li> + * <li>{@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist)</li> + * </ul>. + * + * @param config configuration object + * @param maxJvmHeapMemory the maximum JVM heap size (in bytes) + * + * @return memory to use for network buffers (in bytes) + */ + @VisibleForTesting + public static long calculateNewNetworkBufferMemory(Configuration config, long maxJvmHeapMemory) { + // The maximum heap memory has been adjusted as in TaskManagerServices#calculateHeapSizeMB + // and we need to invert these calculations. + final long jvmHeapNoNet; + final MemoryType memoryType = ConfigurationParserUtils.getMemoryType(config); + if (memoryType == MemoryType.HEAP) { + jvmHeapNoNet = maxJvmHeapMemory; + } else if (memoryType == MemoryType.OFF_HEAP) { + long configuredMemory = ConfigurationParserUtils.getManagedMemorySize(config) << 20; // megabytes to bytes + if (configuredMemory > 0) { + // The maximum heap memory has been adjusted according to configuredMemory, i.e. + // maxJvmHeap = jvmHeapNoNet - configuredMemory + jvmHeapNoNet = maxJvmHeapMemory + configuredMemory; + } else { + // The maximum heap memory has been adjusted according to the fraction, i.e. + // maxJvmHeap = jvmHeapNoNet - jvmHeapNoNet * managedFraction = jvmHeapNoNet * (1 - managedFraction) + jvmHeapNoNet = (long) (maxJvmHeapMemory / (1.0 - ConfigurationParserUtils.getManagedMemoryFraction(config))); + } + } else { + throw new RuntimeException("No supported memory type detected."); + } + + // finally extract the network buffer memory size again from: + // jvmHeapNoNet = jvmHeap - networkBufBytes + // = jvmHeap - Math.min(networkBufMax, Math.max(networkBufMin, jvmHeap * netFraction) + // jvmHeap = jvmHeapNoNet / (1.0 - networkBufFraction) + float networkBufFraction = config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION); + long networkBufSize = (long) (jvmHeapNoNet / (1.0 - networkBufFraction) * networkBufFraction); + return calculateNewNetworkBufferMemory(config, networkBufSize, maxJvmHeapMemory); + } + + /** + * Calculates the amount of memory used for network buffers based on the total memory to use and + * the according configuration parameters. + * + * <p>The following configuration parameters are involved: + * <ul> + * <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li> + * <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},</li> + * <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, and</li> + * <li>{@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist)</li> + * </ul>. + * + * @param totalJavaMemorySize overall available memory to use (in bytes) + * @param config configuration object + * + * @return memory to use for network buffers (in bytes) + */ + @SuppressWarnings("deprecation") + public static long calculateNetworkBufferMemory(long totalJavaMemorySize, Configuration config) { + int segmentSize = ConfigurationParserUtils.getPageSize(config); + + final long networkBufBytes; + if (hasNewNetworkConfig(config)) { + float networkBufFraction = config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION); + long networkBufSize = (long) (totalJavaMemorySize * networkBufFraction); + networkBufBytes = calculateNewNetworkBufferMemory(config, networkBufSize, totalJavaMemorySize); + } else { + // use old (deprecated) network buffers parameter + int numNetworkBuffers = config.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS); + networkBufBytes = (long) numNetworkBuffers * (long) segmentSize; + + checkOldNetworkConfig(numNetworkBuffers); + + ConfigurationParserUtils.checkConfigParameter(networkBufBytes < totalJavaMemorySize, + networkBufBytes, TaskManagerOptions.NETWORK_NUM_BUFFERS.key(), + "Network buffer memory size too large: " + networkBufBytes + " >= " + + totalJavaMemorySize + " (total JVM memory size)"); + ConfigurationParserUtils.checkConfigParameter(networkBufBytes >= segmentSize, + networkBufBytes, TaskManagerOptions.NETWORK_NUM_BUFFERS.key(), + "Network buffer memory size too small: " + networkBufBytes + " < " + + segmentSize + " (" + TaskManagerOptions.MEMORY_SEGMENT_SIZE.key() + ")"); + } + + return networkBufBytes; + } + + /** + * Calculates the amount of memory used for network buffers based on the total memory to use and + * the according configuration parameters. + * + * <p>The following configuration parameters are involved: + * <ul> + * <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li> + * <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},</li> + * <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}</li> + * </ul>. + * + * @param config configuration object + * @param networkBufSize memory of network buffers based on JVM memory size and network fraction + * @param maxJvmHeapMemory maximum memory used for checking the results of network memory + * + * @return memory to use for network buffers (in bytes) + */ + private static long calculateNewNetworkBufferMemory(Configuration config, long networkBufSize, long maxJvmHeapMemory) { + float networkBufFraction = config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION); + long networkBufMin = MemorySize.parse(config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes(); + long networkBufMax = MemorySize.parse(config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes(); + + int pageSize = ConfigurationParserUtils.getPageSize(config); + + checkNewNetworkConfig(pageSize, networkBufFraction, networkBufMin, networkBufMax); + + long networkBufBytes = Math.min(networkBufMax, Math.max(networkBufMin, networkBufSize)); + + ConfigurationParserUtils.checkConfigParameter(networkBufBytes < maxJvmHeapMemory, + "(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")", + "(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " + + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " + + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")", + "Network buffer memory size too large: " + networkBufBytes + " >= " + + maxJvmHeapMemory + " (maximum JVM memory size)"); + ConfigurationParserUtils.checkConfigParameter(networkBufBytes >= pageSize, Review comment: this check `networkBufBytes >= pageSize` does not look to happen before for similar code path for `calculateNewNetworkBufferMemory`. Not sure, how important it is, but `networkBufBytes` seems to be calculated in a different way, so the check might be valid for containers but not for network. what do you think? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services