xintongsong commented on code in PR #21843:
URL: https://github.com/apache/flink/pull/21843#discussion_r1099806797


##########
docs/content/docs/deployment/memory/network_mem_tuning.md:
##########
@@ -105,12 +105,14 @@ The size of the buffer can be configured by setting 
`taskmanager.memory.segment-
 
 ### Input network buffers
 
-Buffers in the input channel are divided into exclusive and floating buffers.  
Exclusive buffers can be used by only one particular channel.  A channel can 
request additional floating buffers from a buffer pool shared across all 
channels belonging to the given input gate. The remaining floating buffers are 
optional and are acquired only if there are enough resources available.
+The number of buffers in one pool calculated according to the above formula 
can be divided into two parts. The part below this configured value 
`taskmanager.network.memory.read-buffer.required-per-gate.max` is required. The 
excess part buffers, if any, is optional. A task will fail if the required 
buffers cannot be obtained in runtime. A task will not fail due to not 
obtaining optional buffers, but may suffer a performance reduction. If not 
explicitly configured, the default value is Integer.MAX_VALUE for streaming 
workloads, and 1000 for batch workloads.
+
+Generally, `taskmanager.network.memory.read-buffer.required-per-gate.max` need 
not configure, and the default value can fulfill most scenes. Setting the 
option to a lower value, at least 1, can avoid the "insufficient number of 
network buffers" exception as much as possible, but may reduce performance. 
Setting the option as Integer.MAX_VALUE to disable the required buffer limit, 
When disabled, more read buffers may be required, which is good for 
performance, but may lead to more easily throwing insufficient network buffers 
exceptions.
 
 In the initialization phase:
-- Flink will try to acquire the configured amount of exclusive buffers for 
each channel
-- all exclusive buffers must be fulfilled or the job will fail with an 
exception
-- a single floating buffer has to be allocated for Flink to be able to make 
progress
+- Get the effectively required buffers, determined by the min value between 
the number of buffers calculated according to the above formula and the 
configured required buffers by the option 
`taskmanager.network.memory.read-buffer.required-per-gate.max`.
+- When the total network buffers are less than the effectively required 
buffers, an exception will be thrown.
+- When the memory is sufficient, try to allocate the configured number of 
exclusive buffers to each channel. If the memory is insufficient, gradually 
reduce the number of exclusive buffers for each channel until all buffers are 
floating.

Review Comment:
   These can be removed.



##########
docs/content/docs/deployment/memory/mem_setup_tm.md:
##########
@@ -147,7 +147,7 @@ which affect the size of the respective components:
 | [Managed memory](#managed-memory)                                  | 
[`taskmanager.memory.managed.size`]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-managed-size) <br/> 
[`taskmanager.memory.managed.fraction`]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-managed-fraction)                                        
                                                                             | 
Native memory managed by Flink, reserved for sorting, hash tables, caching of 
intermediate results and RocksDB state backend                                  
                                                                                
                              |
 | [Framework Off-heap Memory](#framework-memory)                     | 
[`taskmanager.memory.framework.off-heap.size`]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-framework-off-heap-size)                                 
                                                                                
                                                                                
| [Off-heap direct (or native) 
memory](#configure-off-heap-memory-direct-or-native) dedicated to Flink 
framework (advanced option)                                                     
                                                                                
       |
 | [Task Off-heap Memory](#configure-off-heap-memory-direct-or-native)| 
[`taskmanager.memory.task.off-heap.size`]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-task-off-heap-size)                                      
                                                                                
                                                                                
     | [Off-heap direct (or native) 
memory](#configure-off-heap-memory-direct-or-native) dedicated to Flink 
application to run operators                                                    
                                                                                
       |
-| Network Memory                                                     | 
[`taskmanager.memory.network.min`]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-network-min) <br/> 
[`taskmanager.memory.network.max`]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-network-max) <br/> 
[`taskmanager.memory.network.fraction`]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-network-fraction)                               | Direct 
memory reserved for data record exchange between tasks (e.g. buffering for the 
transfer over the network), is a [capped fractionated component]({{< ref 
"docs/deployment/memory/mem_setup" >}}#capped-fractionated-components) of the 
[total Flink memory]({{< ref "docs/deployment/memory/mem_setup" 
>}}#configure-total-memory). This memory is used for allocation of [network 
buffers]({{< ref "docs/deployment/memory/network_mem_tuning" >}}) |
+| Network Memory                                                     | 
[`taskmanager.memory.network.min`]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-network-min) <br/> 
[`taskmanager.memory.network.max`]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-network-max) <br/> 
[`taskmanager.memory.network.fraction`]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-network-fraction)                               | Direct 
memory reserved for data record exchange between tasks (e.g. buffering for the 
transfer over the network), is a [capped fractionated component]({{< ref 
"docs/deployment/memory/mem_setup" >}}#capped-fractionated-components) of the 
[total Flink memory]({{< ref "docs/deployment/memory/mem_setup" 
>}}#configure-total-memory). This memory is used for allocation of [network 
buffers]({{< ref "docs/deployment/memory/network_mem_tuning" >}}). Since 1.17, 
[`taskmanager.memory.network.max`]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-network-max) 
 of the network memory size is set as Long.MAX_VALUE, which means the maximum 
network memory size is no longer limited by default. If you still want to limit 
the maximum network memory size, set the option to a new value, for example, 
1g. |

Review Comment:
   This change is not necessary. The documentation corresponds to Flink 1.17.



##########
docs/content/docs/deployment/memory/network_mem_tuning.md:
##########
@@ -105,12 +105,14 @@ The size of the buffer can be configured by setting 
`taskmanager.memory.segment-
 
 ### Input network buffers
 
-Buffers in the input channel are divided into exclusive and floating buffers.  
Exclusive buffers can be used by only one particular channel.  A channel can 
request additional floating buffers from a buffer pool shared across all 
channels belonging to the given input gate. The remaining floating buffers are 
optional and are acquired only if there are enough resources available.
+The number of buffers in one pool calculated according to the above formula 
can be divided into two parts. The part below this configured value 
`taskmanager.network.memory.read-buffer.required-per-gate.max` is required. The 
excess part buffers, if any, is optional. A task will fail if the required 
buffers cannot be obtained in runtime. A task will not fail due to not 
obtaining optional buffers, but may suffer a performance reduction. If not 
explicitly configured, the default value is Integer.MAX_VALUE for streaming 
workloads, and 1000 for batch workloads.
+
+Generally, `taskmanager.network.memory.read-buffer.required-per-gate.max` need 
not configure, and the default value can fulfill most scenes. Setting the 
option to a lower value, at least 1, can avoid the "insufficient number of 
network buffers" exception as much as possible, but may reduce performance. 
Setting the option as Integer.MAX_VALUE to disable the required buffer limit, 
When disabled, more read buffers may be required, which is good for 
performance, but may lead to more easily throwing insufficient network buffers 
exceptions.

Review Comment:
   How do users know when to tune this config?



##########
docs/content/docs/deployment/memory/mem_setup_tm.md:
##########
@@ -147,7 +147,7 @@ which affect the size of the respective components:
 | [Managed memory](#managed-memory)                                  | 
[`taskmanager.memory.managed.size`]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-managed-size) <br/> 
[`taskmanager.memory.managed.fraction`]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-managed-fraction)                                        
                                                                             | 
Native memory managed by Flink, reserved for sorting, hash tables, caching of 
intermediate results and RocksDB state backend                                  
                                                                                
                              |
 | [Framework Off-heap Memory](#framework-memory)                     | 
[`taskmanager.memory.framework.off-heap.size`]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-framework-off-heap-size)                                 
                                                                                
                                                                                
| [Off-heap direct (or native) 
memory](#configure-off-heap-memory-direct-or-native) dedicated to Flink 
framework (advanced option)                                                     
                                                                                
       |
 | [Task Off-heap Memory](#configure-off-heap-memory-direct-or-native)| 
[`taskmanager.memory.task.off-heap.size`]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-task-off-heap-size)                                      
                                                                                
                                                                                
     | [Off-heap direct (or native) 
memory](#configure-off-heap-memory-direct-or-native) dedicated to Flink 
application to run operators                                                    
                                                                                
       |
-| Network Memory                                                     | 
[`taskmanager.memory.network.min`]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-network-min) <br/> 
[`taskmanager.memory.network.max`]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-network-max) <br/> 
[`taskmanager.memory.network.fraction`]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-network-fraction)                               | Direct 
memory reserved for data record exchange between tasks (e.g. buffering for the 
transfer over the network), is a [capped fractionated component]({{< ref 
"docs/deployment/memory/mem_setup" >}}#capped-fractionated-components) of the 
[total Flink memory]({{< ref "docs/deployment/memory/mem_setup" 
>}}#configure-total-memory). This memory is used for allocation of [network 
buffers]({{< ref "docs/deployment/memory/network_mem_tuning" >}}) |
+| Network Memory                                                     | 
[`taskmanager.memory.network.min`]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-network-min) <br/> 
[`taskmanager.memory.network.max`]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-network-max) <br/> 
[`taskmanager.memory.network.fraction`]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-network-fraction)                               | Direct 
memory reserved for data record exchange between tasks (e.g. buffering for the 
transfer over the network), is a [capped fractionated component]({{< ref 
"docs/deployment/memory/mem_setup" >}}#capped-fractionated-components) of the 
[total Flink memory]({{< ref "docs/deployment/memory/mem_setup" 
>}}#configure-total-memory). This memory is used for allocation of [network 
buffers]({{< ref "docs/deployment/memory/network_mem_tuning" >}}). Since 1.17, 
[`taskmanager.memory.network.max`]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-network-max) 
 of the network memory size is set as Long.MAX_VALUE, which means the maximum 
network memory size is no longer limited by default. If you still want to limit 
the maximum network memory size, set the option to a new value, for example, 
1g. |

Review Comment:
   However, I do find the default value `Long#MAX_VALUE` is displayed as 
`9223372036854775807`. It should be override to `infinite`. See 
`CleanupOptions#CLEANUP_STRATEGY_FIXED_DELAY_ATTEMPTS` for an example.
   
   There are other options with the same problem. We can add a hotfix commit 
for all of them.



##########
docs/content/docs/ops/batch/batch_shuffle.md:
##########
@@ -76,8 +76,9 @@ The memory usage of `mmap` is not accounted for by configured 
memory limits, but
 `Sort Shuffle` is another blocking shuffle implementation introduced in 
version 1.13 and it becomes the default blocking shuffle implementation in 
1.15. Different from `Hash Shuffle`, `Sort Shuffle` writes only one file for 
each result partition. When the result partition is read by multiple downstream 
tasks concurrently, the data file is opened only once and shared by all 
readers. As a result, the cluster uses fewer resources like inode and file 
descriptors, which improves stability. Furthermore, by writing fewer files and 
making a best effort to read data sequentially, `Sort Shuffle` can achieve 
better performance than `Hash Shuffle`, especially on HDD. Additionally, `Sort 
Shuffle` uses extra managed memory as data reading buffer and does not rely on 
`sendfile` or `mmap` mechanism, thus it also works well with [SSL]({{< ref 
"docs/deployment/security/security-ssl" >}}). Please refer to 
[FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) and 
[FLINK-19614](https://issues
 .apache.org/jira/browse/FLINK-19614) for more details about `Sort Shuffle`.
 
 Here are some config options that might need adjustment when using sort 
blocking shuffle:
-- [taskmanager.network.sort-shuffle.min-buffers]({{< ref 
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers): 
Config option to control data writing buffer size. For large scale jobs, you 
may need to increase this value, usually, several hundreds of megabytes memory 
is enough. Because this memory is allocated from network memory, to increase 
this value, you may also need to increase the total network memory by adjusting 
[taskmanager.memory.network.fraction]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-network-fraction), [taskmanager.memory.network.min]({{< 
ref "docs/deployment/config" >}}#taskmanager-memory-network-min) and 
[taskmanager.memory.network.max]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-network-max) to avoid the potential "Insufficient number 
of network buffers" error.
+- [taskmanager.network.sort-shuffle.min-buffers]({{< ref 
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers): 
Config option to control data writing buffer size. For large scale jobs, you 
may need to increase this value, usually, several hundreds of megabytes memory 
is enough. Because this memory is allocated from network memory, to increase 
this value, you may also need to increase the total network memory by adjusting 
[taskmanager.memory.network.fraction]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-network-fraction), [taskmanager.memory.network.min]({{< 
ref "docs/deployment/config" >}}#taskmanager-memory-network-min) to avoid the 
potential "Insufficient number of network buffers" error.
 - [taskmanager.memory.framework.off-heap.batch-shuffle.size]({{< ref 
"docs/deployment/config" 
>}}#taskmanager-memory-framework-off-heap-batch-shuffle-size): Config option to 
control data reading buffer size. For large scale jobs, you may need to 
increase this value, usually, several hundreds of megabytes memory is enough. 
Because this memory is cut from the framework off-heap memory, to increase this 
value, you need also to increase the total framework off-heap memory by 
adjusting [taskmanager.memory.framework.off-heap.size]({{< ref 
"docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-size) to 
avoid the potential direct memory OOM error.
+- [taskmanager.memory.network.max]({{< ref "docs/deployment/config" 
>}}#taskmanager-memory-network-max): Config option to control the maximum total 
network memory. Since 1.17, this option is set as Long.MAX_VALUE by default, 
which means that the maximum network memory size is no longer limited. If you 
still want to limit the maximum network memory, set the config to a new value, 
for example, 1g.

Review Comment:
   Why this change?



##########
docs/content/docs/deployment/memory/network_mem_tuning.md:
##########
@@ -105,12 +105,14 @@ The size of the buffer can be configured by setting 
`taskmanager.memory.segment-
 
 ### Input network buffers
 
-Buffers in the input channel are divided into exclusive and floating buffers.  
Exclusive buffers can be used by only one particular channel.  A channel can 
request additional floating buffers from a buffer pool shared across all 
channels belonging to the given input gate. The remaining floating buffers are 
optional and are acquired only if there are enough resources available.
+The number of buffers in one pool calculated according to the above formula 
can be divided into two parts. The part below this configured value 
`taskmanager.network.memory.read-buffer.required-per-gate.max` is required. The 
excess part buffers, if any, is optional. A task will fail if the required 
buffers cannot be obtained in runtime. A task will not fail due to not 
obtaining optional buffers, but may suffer a performance reduction. If not 
explicitly configured, the default value is Integer.MAX_VALUE for streaming 
workloads, and 1000 for batch workloads.

Review Comment:
   "The number of buffers in one pool calculated according to the above 
formula" - give it a name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to