[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15974951#comment-15974951
 ] 

ASF GitHub Bot commented on FLINK-4545:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3721#discussion_r112242996
  
    --- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
    @@ -398,3 +428,106 @@ readSlaves() {
     useOffHeapMemory() {
         [[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" 
]]
     }
    +
    +HAVE_AWK=
    +# same as 
org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long
 totalJavaMemorySize, Configuration config)
    +calculateNetworkBuf() {
    +    local network_buffers_bytes
    +    if [ "${FLINK_TM_HEAP}" -le "0" ]; then
    +        echo "Variable 'FLINK_TM_HEAP' not set (usually read from 
'${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})."
    +        exit 1
    +    fi
    +
    +    if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then
    +        # fix memory size for network buffers
    +        network_buffers_bytes=${FLINK_TM_NET_BUF_MIN}
    +    else
    +        if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; 
then
    +            echo "[ERROR] Configured TaskManager network buffer memory 
min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid."
    +            echo "Min must be less than or equal to max."
    +            echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and 
'${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}."
    +            exit 1
    +        fi
    +
    +        # Bash only performs integer arithmetic so floating point 
computation is performed using awk
    +        if [[ -z "${HAVE_AWK}" ]] ; then
    +            command -v awk >/dev/null 2>&1
    +            if [[ $? -ne 0 ]]; then
    +                echo "[ERROR] Program 'awk' not found."
    +                echo "Please install 'awk' or define 
'${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of 
'${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
    +                exit 1
    +            fi
    +            HAVE_AWK=true
    +        fi
    +
    +        # We calculate the memory using a fraction of the total memory
    +        if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< 
"${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then
    +            echo "[ERROR] Configured TaskManager network buffer memory 
fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value."
    +            echo "It must be between 0.0 and 1.0."
    +            echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in 
${FLINK_CONF_FILE}."
    +            exit 1
    +        fi
    +
    +        network_buffers_bytes=`awk "BEGIN { x = 
lshift(${FLINK_TM_HEAP},20) * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > 
${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} 
? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"`
    +    fi
    +
    +    # recalculate the JVM heap memory by taking the network buffers into 
account
    --- End diff --
    
    To me, "recalculate" implied that it would change some configuration value, 
but that's not happening. It's only verifying that the memory for network 
buffers is less than the heap memory.


> Flink automatically manages TM network buffer
> ---------------------------------------------
>
>                 Key: FLINK-4545
>                 URL: https://issues.apache.org/jira/browse/FLINK-4545
>             Project: Flink
>          Issue Type: Wish
>          Components: Network
>            Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to