[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15974988#comment-15974988 ]
ASF GitHub Bot commented on FLINK-4545: --------------------------------------- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112248789 --- Diff: flink-dist/src/main/flink-bin/bin/config.sh --- @@ -398,3 +428,106 @@ readSlaves() { useOffHeapMemory() { [[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" ]] } + +HAVE_AWK= +# same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long totalJavaMemorySize, Configuration config) +calculateNetworkBuf() { + local network_buffers_bytes + if [ "${FLINK_TM_HEAP}" -le "0" ]; then + echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})." + exit 1 + fi + + if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then + # fix memory size for network buffers + network_buffers_bytes=${FLINK_TM_NET_BUF_MIN} + else + if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; then + echo "[ERROR] Configured TaskManager network buffer memory min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid." + echo "Min must be less than or equal to max." + echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}." + exit 1 + fi + + # Bash only performs integer arithmetic so floating point computation is performed using awk + if [[ -z "${HAVE_AWK}" ]] ; then + command -v awk >/dev/null 2>&1 + if [[ $? -ne 0 ]]; then + echo "[ERROR] Program 'awk' not found." + echo "Please install 'awk' or define '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}." + exit 1 + fi + HAVE_AWK=true + fi + + # We calculate the memory using a fraction of the total memory + if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then + echo "[ERROR] Configured TaskManager network buffer memory fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value." + echo "It must be between 0.0 and 1.0." + echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}." + exit 1 + fi + + network_buffers_bytes=`awk "BEGIN { x = lshift(${FLINK_TM_HEAP},20) * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > ${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"` + fi + + # recalculate the JVM heap memory by taking the network buffers into account --- End diff -- no, actually, the user may give the `FLINK_TM_HEAP` environment variable or configure the "flink heap size" via `taskmanager.heap.mb` but this is not the real "heap" size - rather the overall memory size used by flink (including off-heap). So this function removes the off-heap part and returns the real heap sizes to use with `-Xmx` and `-Xms` > Flink automatically manages TM network buffer > --------------------------------------------- > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network > Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)