xintongsong commented on code in PR #21843: URL: https://github.com/apache/flink/pull/21843#discussion_r1099806797
########## docs/content/docs/deployment/memory/network_mem_tuning.md: ########## @@ -105,12 +105,14 @@ The size of the buffer can be configured by setting `taskmanager.memory.segment- ### Input network buffers -Buffers in the input channel are divided into exclusive and floating buffers. Exclusive buffers can be used by only one particular channel. A channel can request additional floating buffers from a buffer pool shared across all channels belonging to the given input gate. The remaining floating buffers are optional and are acquired only if there are enough resources available. +The number of buffers in one pool calculated according to the above formula can be divided into two parts. The part below this configured value `taskmanager.network.memory.read-buffer.required-per-gate.max` is required. The excess part buffers, if any, is optional. A task will fail if the required buffers cannot be obtained in runtime. A task will not fail due to not obtaining optional buffers, but may suffer a performance reduction. If not explicitly configured, the default value is Integer.MAX_VALUE for streaming workloads, and 1000 for batch workloads. + +Generally, `taskmanager.network.memory.read-buffer.required-per-gate.max` need not configure, and the default value can fulfill most scenes. Setting the option to a lower value, at least 1, can avoid the "insufficient number of network buffers" exception as much as possible, but may reduce performance. Setting the option as Integer.MAX_VALUE to disable the required buffer limit, When disabled, more read buffers may be required, which is good for performance, but may lead to more easily throwing insufficient network buffers exceptions. In the initialization phase: -- Flink will try to acquire the configured amount of exclusive buffers for each channel -- all exclusive buffers must be fulfilled or the job will fail with an exception -- a single floating buffer has to be allocated for Flink to be able to make progress +- Get the effectively required buffers, determined by the min value between the number of buffers calculated according to the above formula and the configured required buffers by the option `taskmanager.network.memory.read-buffer.required-per-gate.max`. +- When the total network buffers are less than the effectively required buffers, an exception will be thrown. +- When the memory is sufficient, try to allocate the configured number of exclusive buffers to each channel. If the memory is insufficient, gradually reduce the number of exclusive buffers for each channel until all buffers are floating. Review Comment: These can be removed. ########## docs/content/docs/deployment/memory/mem_setup_tm.md: ########## @@ -147,7 +147,7 @@ which affect the size of the respective components: | [Managed memory](#managed-memory) | [`taskmanager.memory.managed.size`]({{< ref "docs/deployment/config" >}}#taskmanager-memory-managed-size) <br/> [`taskmanager.memory.managed.fraction`]({{< ref "docs/deployment/config" >}}#taskmanager-memory-managed-fraction) | Native memory managed by Flink, reserved for sorting, hash tables, caching of intermediate results and RocksDB state backend | | [Framework Off-heap Memory](#framework-memory) | [`taskmanager.memory.framework.off-heap.size`]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-size) | [Off-heap direct (or native) memory](#configure-off-heap-memory-direct-or-native) dedicated to Flink framework (advanced option) | | [Task Off-heap Memory](#configure-off-heap-memory-direct-or-native)| [`taskmanager.memory.task.off-heap.size`]({{< ref "docs/deployment/config" >}}#taskmanager-memory-task-off-heap-size) | [Off-heap direct (or native) memory](#configure-off-heap-memory-direct-or-native) dedicated to Flink application to run operators | -| Network Memory | [`taskmanager.memory.network.min`]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-min) <br/> [`taskmanager.memory.network.max`]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-max) <br/> [`taskmanager.memory.network.fraction`]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-fraction) | Direct memory reserved for data record exchange between tasks (e.g. buffering for the transfer over the network), is a [capped fractionated component]({{< ref "docs/deployment/memory/mem_setup" >}}#capped-fractionated-components) of the [total Flink memory]({{< ref "docs/deployment/memory/mem_setup" >}}#configure-total-memory). This memory is used for allocation of [network buffers]({{< ref "docs/deployment/memory/network_mem_tuning" >}}) | +| Network Memory | [`taskmanager.memory.network.min`]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-min) <br/> [`taskmanager.memory.network.max`]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-max) <br/> [`taskmanager.memory.network.fraction`]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-fraction) | Direct memory reserved for data record exchange between tasks (e.g. buffering for the transfer over the network), is a [capped fractionated component]({{< ref "docs/deployment/memory/mem_setup" >}}#capped-fractionated-components) of the [total Flink memory]({{< ref "docs/deployment/memory/mem_setup" >}}#configure-total-memory). This memory is used for allocation of [network buffers]({{< ref "docs/deployment/memory/network_mem_tuning" >}}). Since 1.17, [`taskmanager.memory.network.max`]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-max) of the network memory size is set as Long.MAX_VALUE, which means the maximum network memory size is no longer limited by default. If you still want to limit the maximum network memory size, set the option to a new value, for example, 1g. | Review Comment: This change is not necessary. The documentation corresponds to Flink 1.17. ########## docs/content/docs/deployment/memory/network_mem_tuning.md: ########## @@ -105,12 +105,14 @@ The size of the buffer can be configured by setting `taskmanager.memory.segment- ### Input network buffers -Buffers in the input channel are divided into exclusive and floating buffers. Exclusive buffers can be used by only one particular channel. A channel can request additional floating buffers from a buffer pool shared across all channels belonging to the given input gate. The remaining floating buffers are optional and are acquired only if there are enough resources available. +The number of buffers in one pool calculated according to the above formula can be divided into two parts. The part below this configured value `taskmanager.network.memory.read-buffer.required-per-gate.max` is required. The excess part buffers, if any, is optional. A task will fail if the required buffers cannot be obtained in runtime. A task will not fail due to not obtaining optional buffers, but may suffer a performance reduction. If not explicitly configured, the default value is Integer.MAX_VALUE for streaming workloads, and 1000 for batch workloads. + +Generally, `taskmanager.network.memory.read-buffer.required-per-gate.max` need not configure, and the default value can fulfill most scenes. Setting the option to a lower value, at least 1, can avoid the "insufficient number of network buffers" exception as much as possible, but may reduce performance. Setting the option as Integer.MAX_VALUE to disable the required buffer limit, When disabled, more read buffers may be required, which is good for performance, but may lead to more easily throwing insufficient network buffers exceptions. Review Comment: How do users know when to tune this config? ########## docs/content/docs/deployment/memory/mem_setup_tm.md: ########## @@ -147,7 +147,7 @@ which affect the size of the respective components: | [Managed memory](#managed-memory) | [`taskmanager.memory.managed.size`]({{< ref "docs/deployment/config" >}}#taskmanager-memory-managed-size) <br/> [`taskmanager.memory.managed.fraction`]({{< ref "docs/deployment/config" >}}#taskmanager-memory-managed-fraction) | Native memory managed by Flink, reserved for sorting, hash tables, caching of intermediate results and RocksDB state backend | | [Framework Off-heap Memory](#framework-memory) | [`taskmanager.memory.framework.off-heap.size`]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-size) | [Off-heap direct (or native) memory](#configure-off-heap-memory-direct-or-native) dedicated to Flink framework (advanced option) | | [Task Off-heap Memory](#configure-off-heap-memory-direct-or-native)| [`taskmanager.memory.task.off-heap.size`]({{< ref "docs/deployment/config" >}}#taskmanager-memory-task-off-heap-size) | [Off-heap direct (or native) memory](#configure-off-heap-memory-direct-or-native) dedicated to Flink application to run operators | -| Network Memory | [`taskmanager.memory.network.min`]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-min) <br/> [`taskmanager.memory.network.max`]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-max) <br/> [`taskmanager.memory.network.fraction`]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-fraction) | Direct memory reserved for data record exchange between tasks (e.g. buffering for the transfer over the network), is a [capped fractionated component]({{< ref "docs/deployment/memory/mem_setup" >}}#capped-fractionated-components) of the [total Flink memory]({{< ref "docs/deployment/memory/mem_setup" >}}#configure-total-memory). This memory is used for allocation of [network buffers]({{< ref "docs/deployment/memory/network_mem_tuning" >}}) | +| Network Memory | [`taskmanager.memory.network.min`]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-min) <br/> [`taskmanager.memory.network.max`]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-max) <br/> [`taskmanager.memory.network.fraction`]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-fraction) | Direct memory reserved for data record exchange between tasks (e.g. buffering for the transfer over the network), is a [capped fractionated component]({{< ref "docs/deployment/memory/mem_setup" >}}#capped-fractionated-components) of the [total Flink memory]({{< ref "docs/deployment/memory/mem_setup" >}}#configure-total-memory). This memory is used for allocation of [network buffers]({{< ref "docs/deployment/memory/network_mem_tuning" >}}). Since 1.17, [`taskmanager.memory.network.max`]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-max) of the network memory size is set as Long.MAX_VALUE, which means the maximum network memory size is no longer limited by default. If you still want to limit the maximum network memory size, set the option to a new value, for example, 1g. | Review Comment: However, I do find the default value `Long#MAX_VALUE` is displayed as `9223372036854775807`. It should be override to `infinite`. See `CleanupOptions#CLEANUP_STRATEGY_FIXED_DELAY_ATTEMPTS` for an example. There are other options with the same problem. We can add a hotfix commit for all of them. ########## docs/content/docs/ops/batch/batch_shuffle.md: ########## @@ -76,8 +76,9 @@ The memory usage of `mmap` is not accounted for by configured memory limits, but `Sort Shuffle` is another blocking shuffle implementation introduced in version 1.13 and it becomes the default blocking shuffle implementation in 1.15. Different from `Hash Shuffle`, `Sort Shuffle` writes only one file for each result partition. When the result partition is read by multiple downstream tasks concurrently, the data file is opened only once and shared by all readers. As a result, the cluster uses fewer resources like inode and file descriptors, which improves stability. Furthermore, by writing fewer files and making a best effort to read data sequentially, `Sort Shuffle` can achieve better performance than `Hash Shuffle`, especially on HDD. Additionally, `Sort Shuffle` uses extra managed memory as data reading buffer and does not rely on `sendfile` or `mmap` mechanism, thus it also works well with [SSL]({{< ref "docs/deployment/security/security-ssl" >}}). Please refer to [FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) and [FLINK-19614](https://issues .apache.org/jira/browse/FLINK-19614) for more details about `Sort Shuffle`. Here are some config options that might need adjustment when using sort blocking shuffle: -- [taskmanager.network.sort-shuffle.min-buffers]({{< ref "docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers): Config option to control data writing buffer size. For large scale jobs, you may need to increase this value, usually, several hundreds of megabytes memory is enough. Because this memory is allocated from network memory, to increase this value, you may also need to increase the total network memory by adjusting [taskmanager.memory.network.fraction]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-fraction), [taskmanager.memory.network.min]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-min) and [taskmanager.memory.network.max]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-max) to avoid the potential "Insufficient number of network buffers" error. +- [taskmanager.network.sort-shuffle.min-buffers]({{< ref "docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers): Config option to control data writing buffer size. For large scale jobs, you may need to increase this value, usually, several hundreds of megabytes memory is enough. Because this memory is allocated from network memory, to increase this value, you may also need to increase the total network memory by adjusting [taskmanager.memory.network.fraction]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-fraction), [taskmanager.memory.network.min]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-min) to avoid the potential "Insufficient number of network buffers" error. - [taskmanager.memory.framework.off-heap.batch-shuffle.size]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-batch-shuffle-size): Config option to control data reading buffer size. For large scale jobs, you may need to increase this value, usually, several hundreds of megabytes memory is enough. Because this memory is cut from the framework off-heap memory, to increase this value, you need also to increase the total framework off-heap memory by adjusting [taskmanager.memory.framework.off-heap.size]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-size) to avoid the potential direct memory OOM error. +- [taskmanager.memory.network.max]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-max): Config option to control the maximum total network memory. Since 1.17, this option is set as Long.MAX_VALUE by default, which means that the maximum network memory size is no longer limited. If you still want to limit the maximum network memory, set the config to a new value, for example, 1g. Review Comment: Why this change? ########## docs/content/docs/deployment/memory/network_mem_tuning.md: ########## @@ -105,12 +105,14 @@ The size of the buffer can be configured by setting `taskmanager.memory.segment- ### Input network buffers -Buffers in the input channel are divided into exclusive and floating buffers. Exclusive buffers can be used by only one particular channel. A channel can request additional floating buffers from a buffer pool shared across all channels belonging to the given input gate. The remaining floating buffers are optional and are acquired only if there are enough resources available. +The number of buffers in one pool calculated according to the above formula can be divided into two parts. The part below this configured value `taskmanager.network.memory.read-buffer.required-per-gate.max` is required. The excess part buffers, if any, is optional. A task will fail if the required buffers cannot be obtained in runtime. A task will not fail due to not obtaining optional buffers, but may suffer a performance reduction. If not explicitly configured, the default value is Integer.MAX_VALUE for streaming workloads, and 1000 for batch workloads. Review Comment: "The number of buffers in one pool calculated according to the above formula" - give it a name -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org