pnowojski commented on a change in pull request #16988: URL: https://github.com/apache/flink/pull/16988#discussion_r696380229
########## File path: docs/content/docs/ops/state/network_buffer.md ########## @@ -0,0 +1,108 @@ +--- +title: "Network Buffers" +weight: 100 +type: docs +aliases: + - /ops/state/network_buffer.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Network buffer + +## Overview + +Each record in flink is sent to the next subtask not individually but compounded in Network buffer, +the smallest unit inter subtask communication. Also, in order to keep high throughput, Review comment: ```suggestion the smallest unit for communication between subtasks. Also, in order to keep consistent high throughput, ``` ########## File path: docs/content/docs/ops/state/network_buffer.md ########## @@ -0,0 +1,108 @@ +--- +title: "Network Buffers" +weight: 100 +type: docs +aliases: + - /ops/state/network_buffer.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Network buffer + +## Overview + +Each record in flink is sent to the next subtask not individually but compounded in Network buffer, +the smallest unit inter subtask communication. Also, in order to keep high throughput, +flink supports the network buffers queue(so called in-flight data) as for output stream as well as for the input stream. Review comment: ```suggestion Flink uses the network buffer queues (so called in-flight data) both on the output as well as on the input side. ``` ########## File path: docs/content/docs/ops/state/network_buffer.md ########## @@ -0,0 +1,108 @@ +--- +title: "Network Buffers" +weight: 100 +type: docs +aliases: + - /ops/state/network_buffer.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Network buffer + +## Overview + +Each record in flink is sent to the next subtask not individually but compounded in Network buffer, +the smallest unit inter subtask communication. Also, in order to keep high throughput, +flink supports the network buffers queue(so called in-flight data) as for output stream as well as for the input stream. +In the result each subtask have an input queue waiting for the consumption and an output queue +waiting for sending to the next subtask. Having a full input queue can guarantee busyness of +the subtask and high level of the throughput but it has negative effect for the latency and +what more important for the checkpoint time. The long checkpoint time issue can be explained as +long waiting in network buffers queue(the subtask requires some time to consume all data from +the queue before it can handle the checkpoint barrier), as well as the data skew - +when different input queue for one subtask contain different number of the data which leads to +increasing the checkpoint alignment time and as result to increasing the whole checkpoint time. + +## Buffer debloat + +The buffer debloat is mechanism which automatically adjust the buffer size in order to keep configured +checkpoint time along with high throughput. More precisely, the buffer debloat calculate the maximum possible throughput +(the maximum throughput which would be if the subtask was always busy) +for the subtask and adapt the buffer size in such a way that the time for consumption of in-flight data would be equal to configured one. + +The most useful settings: +* The buffer debloat can be enabled by setting the property `taskmanager.network.memory.buffer-debloat.enabled` to `true`. +* Desirable time of the consumption in-flight data can be configured by setting `taskmanager.network.memory.buffer-debloat.target` to `duration`. + +The settings for the optimization: + +* `taskmanager.network.memory.buffer-debloat.period` - the minimum time between buffer size recalculation. +It can be decreased if the throughput is pretty volatile by some reason and the high speed on the changes required. +* `taskmanager.network.memory.buffer-debloat.samples` - The measure of calculation smoothing. +It can be decreased if the buffer size is changed undesirable slower compare to the instant throughput or it can be increased otherwise. Review comment: ```suggestion * `taskmanager.network.memory.buffer-debloat.samples` - Adjust the number of samples over which throughput measurements are averaged out. The frequency of the collected samples can be adjusted via `taskmanager.network.memory.buffer-debloat.period`. The fewer samples, the faster reaction time of the debloating mechanism, but a higher chance of a sudden spike or drop of the throughput to cause the buffer debloating to miscalculate the best amount of the in-flight data. ``` ########## File path: docs/content/docs/ops/state/network_buffer.md ########## @@ -0,0 +1,108 @@ +--- +title: "Network Buffers" +weight: 100 +type: docs +aliases: + - /ops/state/network_buffer.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Network buffer + +## Overview + +Each record in flink is sent to the next subtask not individually but compounded in Network buffer, +the smallest unit inter subtask communication. Also, in order to keep high throughput, +flink supports the network buffers queue(so called in-flight data) as for output stream as well as for the input stream. +In the result each subtask have an input queue waiting for the consumption and an output queue +waiting for sending to the next subtask. Having a full input queue can guarantee busyness of +the subtask and high level of the throughput but it has negative effect for the latency and +what more important for the checkpoint time. The long checkpoint time issue can be explained as +long waiting in network buffers queue(the subtask requires some time to consume all data from +the queue before it can handle the checkpoint barrier), as well as the data skew - +when different input queue for one subtask contain different number of the data which leads to +increasing the checkpoint alignment time and as result to increasing the whole checkpoint time. + +## Buffer debloat + +The buffer debloat is mechanism which automatically adjust the buffer size in order to keep configured +checkpoint time along with high throughput. More precisely, the buffer debloat calculate the maximum possible throughput +(the maximum throughput which would be if the subtask was always busy) +for the subtask and adapt the buffer size in such a way that the time for consumption of in-flight data would be equal to configured one. + +The most useful settings: +* The buffer debloat can be enabled by setting the property `taskmanager.network.memory.buffer-debloat.enabled` to `true`. +* Desirable time of the consumption in-flight data can be configured by setting `taskmanager.network.memory.buffer-debloat.target` to `duration`. + +The settings for the optimization: + +* `taskmanager.network.memory.buffer-debloat.period` - the minimum time between buffer size recalculation. +It can be decreased if the throughput is pretty volatile by some reason and the high speed on the changes required. +* `taskmanager.network.memory.buffer-debloat.samples` - The measure of calculation smoothing. +It can be decreased if the buffer size is changed undesirable slower compare to the instant throughput or it can be increased otherwise. +* `taskmanager.network.memory.buffer-debloat.threshold-percentages` - The optimization which prevents +the frequent buffer size change if the new size is not so different compared to the old one. + +See the [Configuration]({{< ref "docs/deployment/config" >}}#full-taskmanageroptions) documentation for details and additional parameters. + +The metrics which can help to observe the current buffer size: +* `estimatedTimeToConsumerBuffersMs` - the total time to consume data from all input channels. +* `debloatedBufferSize` - the current buffer size. + +## Network buffer lifecycle +Logically, Flink has several local buffer pools one for output stream and one for each input gate. +Each of that pools is limited to at most +``` +#channels * taskmanager.network.memory.buffers-per-channel + taskmanager.network.memory.floating-buffers-per-gate +``` + +The size of the buffer can be configured by setting `taskmanager.memory.segment-size`. + +### Input network buffers +Buffers in the input channel are divided into exclusive and floating buffers. +The exclusive buffers are always allocated at the initialization phase for each channel and +can be used only for this channel. In case, +when exclusive buffers are not enough the channel can request floating buffers from the shared gate buffer pool. + +The buffer lifecycle can be represented as follows: +* The netty thread receives data and requests the free buffer from the targeted channel. +* The channel, first of all, returns the floating buffer if it was allocated already if not, + the channel returns the exclusive buffer if it has a free one, if not, + the channel requests the floating buffer from the shared pool. +* The netty thread fills the buffer and sends it to the channel. +* The mailbox thread takes the filled buffer from the channel and consumes data from it. +* The mailbox thread returns the empty exclusive buffer to the channel or the empty floating buffer to the shared pool. + +### Output network buffers +Unlike the input buffer pool, the output buffers pool have only one type of buffers which it shares all among all subpartitions. +In order to avoid the data skew, the number of buffers for each subpartition is limited by `taskmanager.network.memory.max-buffers-per-channel`. Review comment: ```suggestion In order to avoid the excessive data skew, the number of buffers for each subpartition is limited by `taskmanager.network.memory.max-buffers-per-channel`. ``` ########## File path: docs/content/docs/ops/state/network_buffer.md ########## @@ -0,0 +1,108 @@ +--- +title: "Network Buffers" +weight: 100 +type: docs +aliases: + - /ops/state/network_buffer.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Network buffer + +## Overview + +Each record in flink is sent to the next subtask not individually but compounded in Network buffer, +the smallest unit inter subtask communication. Also, in order to keep high throughput, +flink supports the network buffers queue(so called in-flight data) as for output stream as well as for the input stream. +In the result each subtask have an input queue waiting for the consumption and an output queue +waiting for sending to the next subtask. Having a full input queue can guarantee busyness of +the subtask and high level of the throughput but it has negative effect for the latency and +what more important for the checkpoint time. The long checkpoint time issue can be explained as +long waiting in network buffers queue(the subtask requires some time to consume all data from +the queue before it can handle the checkpoint barrier), as well as the data skew - +when different input queue for one subtask contain different number of the data which leads to +increasing the checkpoint alignment time and as result to increasing the whole checkpoint time. Review comment: ```suggestion The long checkpoint time issue can be caused by many things, one of those is checkpoint barriers propagation time. Checkpoint in Flink can finish only once all subtask receives all injected checkpoint barriers. In aligned checkpoints those checkpoint barriers are traveling throughout the job graph long along the network buffers and the larger amount of in-flight data the longer the checkpoint barrier propagation time. In unaligned checkpoints on the other hand, the more in-flight data, the larger the checkpoint size as all of the captured in-flight data has to be persisted as part of the checkpoint.``` # ########## File path: docs/content/docs/ops/state/network_buffer.md ########## @@ -0,0 +1,108 @@ +--- +title: "Network Buffers" +weight: 100 +type: docs +aliases: + - /ops/state/network_buffer.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Network buffer + +## Overview + +Each record in flink is sent to the next subtask not individually but compounded in Network buffer, +the smallest unit inter subtask communication. Also, in order to keep high throughput, +flink supports the network buffers queue(so called in-flight data) as for output stream as well as for the input stream. +In the result each subtask have an input queue waiting for the consumption and an output queue +waiting for sending to the next subtask. Having a full input queue can guarantee busyness of +the subtask and high level of the throughput but it has negative effect for the latency and +what more important for the checkpoint time. The long checkpoint time issue can be explained as +long waiting in network buffers queue(the subtask requires some time to consume all data from +the queue before it can handle the checkpoint barrier), as well as the data skew - +when different input queue for one subtask contain different number of the data which leads to +increasing the checkpoint alignment time and as result to increasing the whole checkpoint time. + +## Buffer debloat + +The buffer debloat is mechanism which automatically adjust the buffer size in order to keep configured +checkpoint time along with high throughput. More precisely, the buffer debloat calculate the maximum possible throughput +(the maximum throughput which would be if the subtask was always busy) +for the subtask and adapt the buffer size in such a way that the time for consumption of in-flight data would be equal to configured one. + +The most useful settings: +* The buffer debloat can be enabled by setting the property `taskmanager.network.memory.buffer-debloat.enabled` to `true`. +* Desirable time of the consumption in-flight data can be configured by setting `taskmanager.network.memory.buffer-debloat.target` to `duration`. Review comment: ```suggestion * Desirable time of the consumption in-flight data can be configured by setting `taskmanager.network.memory.buffer-debloat.target` to `duration`. The default value of the debloat target should be good enough in most cases. ``` ########## File path: docs/content/docs/ops/state/network_buffer.md ########## @@ -0,0 +1,108 @@ +--- +title: "Network Buffers" +weight: 100 +type: docs +aliases: + - /ops/state/network_buffer.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Network buffer + +## Overview + +Each record in flink is sent to the next subtask not individually but compounded in Network buffer, +the smallest unit inter subtask communication. Also, in order to keep high throughput, +flink supports the network buffers queue(so called in-flight data) as for output stream as well as for the input stream. +In the result each subtask have an input queue waiting for the consumption and an output queue +waiting for sending to the next subtask. Having a full input queue can guarantee busyness of +the subtask and high level of the throughput but it has negative effect for the latency and +what more important for the checkpoint time. The long checkpoint time issue can be explained as +long waiting in network buffers queue(the subtask requires some time to consume all data from +the queue before it can handle the checkpoint barrier), as well as the data skew - +when different input queue for one subtask contain different number of the data which leads to +increasing the checkpoint alignment time and as result to increasing the whole checkpoint time. + +## Buffer debloat + +The buffer debloat is mechanism which automatically adjust the buffer size in order to keep configured +checkpoint time along with high throughput. More precisely, the buffer debloat calculate the maximum possible throughput +(the maximum throughput which would be if the subtask was always busy) +for the subtask and adapt the buffer size in such a way that the time for consumption of in-flight data would be equal to configured one. + +The most useful settings: +* The buffer debloat can be enabled by setting the property `taskmanager.network.memory.buffer-debloat.enabled` to `true`. +* Desirable time of the consumption in-flight data can be configured by setting `taskmanager.network.memory.buffer-debloat.target` to `duration`. + +The settings for the optimization: + +* `taskmanager.network.memory.buffer-debloat.period` - the minimum time between buffer size recalculation. +It can be decreased if the throughput is pretty volatile by some reason and the high speed on the changes required. +* `taskmanager.network.memory.buffer-debloat.samples` - The measure of calculation smoothing. +It can be decreased if the buffer size is changed undesirable slower compare to the instant throughput or it can be increased otherwise. +* `taskmanager.network.memory.buffer-debloat.threshold-percentages` - The optimization which prevents +the frequent buffer size change if the new size is not so different compared to the old one. + +See the [Configuration]({{< ref "docs/deployment/config" >}}#full-taskmanageroptions) documentation for details and additional parameters. + +The metrics which can help to observe the current buffer size: Review comment: Please cross reference the metrics documentation ########## File path: docs/content/docs/ops/state/network_buffer.md ########## @@ -0,0 +1,108 @@ +--- +title: "Network Buffers" +weight: 100 +type: docs +aliases: + - /ops/state/network_buffer.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Network buffer + +## Overview + +Each record in flink is sent to the next subtask not individually but compounded in Network buffer, +the smallest unit inter subtask communication. Also, in order to keep high throughput, +flink supports the network buffers queue(so called in-flight data) as for output stream as well as for the input stream. +In the result each subtask have an input queue waiting for the consumption and an output queue +waiting for sending to the next subtask. Having a full input queue can guarantee busyness of +the subtask and high level of the throughput but it has negative effect for the latency and +what more important for the checkpoint time. The long checkpoint time issue can be explained as Review comment: ```suggestion waiting for sending to the next subtask. Having a larger amount of the in-flight data means Flink can provide a higher throughput that's more resilient to small hiccups in the pipeline but it has negative effect for the checkpoint time. ``` ########## File path: docs/content/docs/ops/state/network_buffer.md ########## @@ -0,0 +1,108 @@ +--- +title: "Network Buffers" +weight: 100 +type: docs +aliases: + - /ops/state/network_buffer.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Network buffer + +## Overview + +Each record in flink is sent to the next subtask not individually but compounded in Network buffer, +the smallest unit inter subtask communication. Also, in order to keep high throughput, +flink supports the network buffers queue(so called in-flight data) as for output stream as well as for the input stream. +In the result each subtask have an input queue waiting for the consumption and an output queue +waiting for sending to the next subtask. Having a full input queue can guarantee busyness of +the subtask and high level of the throughput but it has negative effect for the latency and +what more important for the checkpoint time. The long checkpoint time issue can be explained as Review comment: I wouldn't mention latency, because that's not entirely true. Without back-pressure amount of in-flight buffers do not affect latency. With back-pressure, speaking about latency is more complicated, as the amount of buffered in-flight data in the middle of your system doesn't affect end to end latency. By making buffers smaller/larger in one place you are just moving those records from one queue to another (for example instead of waiting in the Flink's network buffers, records will be just waiting longer in the Kafka topic). ########## File path: docs/content/docs/ops/state/network_buffer.md ########## @@ -0,0 +1,108 @@ +--- +title: "Network Buffers" +weight: 100 +type: docs +aliases: + - /ops/state/network_buffer.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Network buffer + +## Overview + +Each record in flink is sent to the next subtask not individually but compounded in Network buffer, +the smallest unit inter subtask communication. Also, in order to keep high throughput, +flink supports the network buffers queue(so called in-flight data) as for output stream as well as for the input stream. +In the result each subtask have an input queue waiting for the consumption and an output queue +waiting for sending to the next subtask. Having a full input queue can guarantee busyness of +the subtask and high level of the throughput but it has negative effect for the latency and +what more important for the checkpoint time. The long checkpoint time issue can be explained as +long waiting in network buffers queue(the subtask requires some time to consume all data from +the queue before it can handle the checkpoint barrier), as well as the data skew - +when different input queue for one subtask contain different number of the data which leads to +increasing the checkpoint alignment time and as result to increasing the whole checkpoint time. + +## Buffer debloat + +The buffer debloat is mechanism which automatically adjust the buffer size in order to keep configured +checkpoint time along with high throughput. More precisely, the buffer debloat calculate the maximum possible throughput +(the maximum throughput which would be if the subtask was always busy) +for the subtask and adapt the buffer size in such a way that the time for consumption of in-flight data would be equal to configured one. + +The most useful settings: +* The buffer debloat can be enabled by setting the property `taskmanager.network.memory.buffer-debloat.enabled` to `true`. +* Desirable time of the consumption in-flight data can be configured by setting `taskmanager.network.memory.buffer-debloat.target` to `duration`. + +The settings for the optimization: Review comment: ```suggestion Buffer debloating in Flink works by measuring past throguhput to predict future time to consume the remaining in-flight data. If those predictions are incorrect, the debloating mechanism can fail in one of the two ways: * there won't be enough buffered data to provide full throughput * there will be too many buffered in-flight data and the aligned checkpoint barriers propagation time or the unaligned checkpoint size will suffer. Hence if you have a varying load in your job, for example a sudden spikes of incoming records, or periodically firing windowed aggregations or joins, you might need to adjust the following settings: ``` ########## File path: docs/content/docs/ops/state/network_buffer.md ########## @@ -0,0 +1,108 @@ +--- +title: "Network Buffers" +weight: 100 +type: docs +aliases: + - /ops/state/network_buffer.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Network buffer + +## Overview + +Each record in flink is sent to the next subtask not individually but compounded in Network buffer, +the smallest unit inter subtask communication. Also, in order to keep high throughput, +flink supports the network buffers queue(so called in-flight data) as for output stream as well as for the input stream. +In the result each subtask have an input queue waiting for the consumption and an output queue +waiting for sending to the next subtask. Having a full input queue can guarantee busyness of +the subtask and high level of the throughput but it has negative effect for the latency and +what more important for the checkpoint time. The long checkpoint time issue can be explained as +long waiting in network buffers queue(the subtask requires some time to consume all data from +the queue before it can handle the checkpoint barrier), as well as the data skew - +when different input queue for one subtask contain different number of the data which leads to +increasing the checkpoint alignment time and as result to increasing the whole checkpoint time. + +## Buffer debloat + +The buffer debloat is mechanism which automatically adjust the buffer size in order to keep configured +checkpoint time along with high throughput. More precisely, the buffer debloat calculate the maximum possible throughput +(the maximum throughput which would be if the subtask was always busy) +for the subtask and adapt the buffer size in such a way that the time for consumption of in-flight data would be equal to configured one. + +The most useful settings: +* The buffer debloat can be enabled by setting the property `taskmanager.network.memory.buffer-debloat.enabled` to `true`. +* Desirable time of the consumption in-flight data can be configured by setting `taskmanager.network.memory.buffer-debloat.target` to `duration`. + +The settings for the optimization: + +* `taskmanager.network.memory.buffer-debloat.period` - the minimum time between buffer size recalculation. +It can be decreased if the throughput is pretty volatile by some reason and the high speed on the changes required. +* `taskmanager.network.memory.buffer-debloat.samples` - The measure of calculation smoothing. +It can be decreased if the buffer size is changed undesirable slower compare to the instant throughput or it can be increased otherwise. +* `taskmanager.network.memory.buffer-debloat.threshold-percentages` - The optimization which prevents +the frequent buffer size change if the new size is not so different compared to the old one. + +See the [Configuration]({{< ref "docs/deployment/config" >}}#full-taskmanageroptions) documentation for details and additional parameters. + +The metrics which can help to observe the current buffer size: +* `estimatedTimeToConsumerBuffersMs` - the total time to consume data from all input channels. +* `debloatedBufferSize` - the current buffer size. + +## Network buffer lifecycle +Logically, Flink has several local buffer pools one for output stream and one for each input gate. +Each of that pools is limited to at most +``` +#channels * taskmanager.network.memory.buffers-per-channel + taskmanager.network.memory.floating-buffers-per-gate +``` + +The size of the buffer can be configured by setting `taskmanager.memory.segment-size`. + +### Input network buffers +Buffers in the input channel are divided into exclusive and floating buffers. +The exclusive buffers are always allocated at the initialization phase for each channel and +can be used only for this channel. In case, +when exclusive buffers are not enough the channel can request floating buffers from the shared gate buffer pool. + +The buffer lifecycle can be represented as follows: +* The netty thread receives data and requests the free buffer from the targeted channel. +* The channel, first of all, returns the floating buffer if it was allocated already if not, + the channel returns the exclusive buffer if it has a free one, if not, + the channel requests the floating buffer from the shared pool. +* The netty thread fills the buffer and sends it to the channel. +* The mailbox thread takes the filled buffer from the channel and consumes data from it. +* The mailbox thread returns the empty exclusive buffer to the channel or the empty floating buffer to the shared pool. Review comment: I would say that's a bit of too much details and I would just drop this paragraph. ########## File path: docs/content/docs/ops/state/network_buffer.md ########## @@ -0,0 +1,108 @@ +--- +title: "Network Buffers" +weight: 100 +type: docs +aliases: + - /ops/state/network_buffer.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Network buffer + +## Overview + +Each record in flink is sent to the next subtask not individually but compounded in Network buffer, +the smallest unit inter subtask communication. Also, in order to keep high throughput, +flink supports the network buffers queue(so called in-flight data) as for output stream as well as for the input stream. +In the result each subtask have an input queue waiting for the consumption and an output queue +waiting for sending to the next subtask. Having a full input queue can guarantee busyness of +the subtask and high level of the throughput but it has negative effect for the latency and +what more important for the checkpoint time. The long checkpoint time issue can be explained as +long waiting in network buffers queue(the subtask requires some time to consume all data from +the queue before it can handle the checkpoint barrier), as well as the data skew - +when different input queue for one subtask contain different number of the data which leads to +increasing the checkpoint alignment time and as result to increasing the whole checkpoint time. + +## Buffer debloat + +The buffer debloat is mechanism which automatically adjust the buffer size in order to keep configured +checkpoint time along with high throughput. More precisely, the buffer debloat calculate the maximum possible throughput +(the maximum throughput which would be if the subtask was always busy) +for the subtask and adapt the buffer size in such a way that the time for consumption of in-flight data would be equal to configured one. Review comment: ```suggestion Historically the only way to configure the amount of in-flight data was to specify both amount and the size of the buffers. However ideal values for those numbers are hard to pick, as they are different for every deployment. The buffer debloating mechanism added in Flink 1.14 attempts to address this issue. It tries to automatically adjust the amount of in-flight data in order to a reasonable values. More precisely, the buffer debloating calculate the maximum possible throughput (the maximum throughput which would be if the subtask was always busy) for the subtask and adjusts the amount of in-flight data in such a way that the time for consumption of those in-flight data will be equal to the configured value. ``` ########## File path: docs/content/docs/ops/state/network_buffer.md ########## @@ -0,0 +1,108 @@ +--- +title: "Network Buffers" +weight: 100 +type: docs +aliases: + - /ops/state/network_buffer.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Network buffer + +## Overview + +Each record in flink is sent to the next subtask not individually but compounded in Network buffer, +the smallest unit inter subtask communication. Also, in order to keep high throughput, +flink supports the network buffers queue(so called in-flight data) as for output stream as well as for the input stream. +In the result each subtask have an input queue waiting for the consumption and an output queue +waiting for sending to the next subtask. Having a full input queue can guarantee busyness of +the subtask and high level of the throughput but it has negative effect for the latency and +what more important for the checkpoint time. The long checkpoint time issue can be explained as +long waiting in network buffers queue(the subtask requires some time to consume all data from +the queue before it can handle the checkpoint barrier), as well as the data skew - +when different input queue for one subtask contain different number of the data which leads to +increasing the checkpoint alignment time and as result to increasing the whole checkpoint time. + +## Buffer debloat + +The buffer debloat is mechanism which automatically adjust the buffer size in order to keep configured +checkpoint time along with high throughput. More precisely, the buffer debloat calculate the maximum possible throughput +(the maximum throughput which would be if the subtask was always busy) +for the subtask and adapt the buffer size in such a way that the time for consumption of in-flight data would be equal to configured one. + +The most useful settings: +* The buffer debloat can be enabled by setting the property `taskmanager.network.memory.buffer-debloat.enabled` to `true`. +* Desirable time of the consumption in-flight data can be configured by setting `taskmanager.network.memory.buffer-debloat.target` to `duration`. + +The settings for the optimization: + +* `taskmanager.network.memory.buffer-debloat.period` - the minimum time between buffer size recalculation. +It can be decreased if the throughput is pretty volatile by some reason and the high speed on the changes required. Review comment: ```suggestion The shorter the period, the faster reaction time of the debloating mechanism, but a higher CPU overhead for the necessary calculations. ``` ########## File path: docs/content/docs/ops/state/network_buffer.md ########## @@ -0,0 +1,108 @@ +--- +title: "Network Buffers" +weight: 100 +type: docs +aliases: + - /ops/state/network_buffer.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Network buffer + +## Overview + +Each record in flink is sent to the next subtask not individually but compounded in Network buffer, +the smallest unit inter subtask communication. Also, in order to keep high throughput, +flink supports the network buffers queue(so called in-flight data) as for output stream as well as for the input stream. +In the result each subtask have an input queue waiting for the consumption and an output queue +waiting for sending to the next subtask. Having a full input queue can guarantee busyness of +the subtask and high level of the throughput but it has negative effect for the latency and +what more important for the checkpoint time. The long checkpoint time issue can be explained as +long waiting in network buffers queue(the subtask requires some time to consume all data from +the queue before it can handle the checkpoint barrier), as well as the data skew - +when different input queue for one subtask contain different number of the data which leads to +increasing the checkpoint alignment time and as result to increasing the whole checkpoint time. Review comment: And can you link somewhere here this documentation? https://ci.apache.org/projects/flink/flink-docs-master/docs/concepts/stateful-stream-processing/#checkpointing https://ci.apache.org/projects/flink/flink-docs-master/docs/concepts/stateful-stream-processing/#unaligned-checkpointing ########## File path: docs/content/docs/ops/state/network_buffer.md ########## @@ -0,0 +1,108 @@ +--- +title: "Network Buffers" +weight: 100 +type: docs +aliases: + - /ops/state/network_buffer.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Network buffer + +## Overview + +Each record in flink is sent to the next subtask not individually but compounded in Network buffer, +the smallest unit inter subtask communication. Also, in order to keep high throughput, +flink supports the network buffers queue(so called in-flight data) as for output stream as well as for the input stream. +In the result each subtask have an input queue waiting for the consumption and an output queue +waiting for sending to the next subtask. Having a full input queue can guarantee busyness of +the subtask and high level of the throughput but it has negative effect for the latency and +what more important for the checkpoint time. The long checkpoint time issue can be explained as +long waiting in network buffers queue(the subtask requires some time to consume all data from +the queue before it can handle the checkpoint barrier), as well as the data skew - +when different input queue for one subtask contain different number of the data which leads to +increasing the checkpoint alignment time and as result to increasing the whole checkpoint time. + +## Buffer debloat + +The buffer debloat is mechanism which automatically adjust the buffer size in order to keep configured +checkpoint time along with high throughput. More precisely, the buffer debloat calculate the maximum possible throughput +(the maximum throughput which would be if the subtask was always busy) +for the subtask and adapt the buffer size in such a way that the time for consumption of in-flight data would be equal to configured one. + +The most useful settings: +* The buffer debloat can be enabled by setting the property `taskmanager.network.memory.buffer-debloat.enabled` to `true`. +* Desirable time of the consumption in-flight data can be configured by setting `taskmanager.network.memory.buffer-debloat.target` to `duration`. + +The settings for the optimization: + +* `taskmanager.network.memory.buffer-debloat.period` - the minimum time between buffer size recalculation. +It can be decreased if the throughput is pretty volatile by some reason and the high speed on the changes required. +* `taskmanager.network.memory.buffer-debloat.samples` - The measure of calculation smoothing. +It can be decreased if the buffer size is changed undesirable slower compare to the instant throughput or it can be increased otherwise. +* `taskmanager.network.memory.buffer-debloat.threshold-percentages` - The optimization which prevents +the frequent buffer size change if the new size is not so different compared to the old one. + +See the [Configuration]({{< ref "docs/deployment/config" >}}#full-taskmanageroptions) documentation for details and additional parameters. + +The metrics which can help to observe the current buffer size: +* `estimatedTimeToConsumerBuffersMs` - the total time to consume data from all input channels. +* `debloatedBufferSize` - the current buffer size. + +## Network buffer lifecycle +Logically, Flink has several local buffer pools one for output stream and one for each input gate. +Each of that pools is limited to at most +``` +#channels * taskmanager.network.memory.buffers-per-channel + taskmanager.network.memory.floating-buffers-per-gate +``` + +The size of the buffer can be configured by setting `taskmanager.memory.segment-size`. + +### Input network buffers +Buffers in the input channel are divided into exclusive and floating buffers. +The exclusive buffers are always allocated at the initialization phase for each channel and +can be used only for this channel. In case, +when exclusive buffers are not enough the channel can request floating buffers from the shared gate buffer pool. + +The buffer lifecycle can be represented as follows: +* The netty thread receives data and requests the free buffer from the targeted channel. +* The channel, first of all, returns the floating buffer if it was allocated already if not, + the channel returns the exclusive buffer if it has a free one, if not, + the channel requests the floating buffer from the shared pool. +* The netty thread fills the buffer and sends it to the channel. +* The mailbox thread takes the filled buffer from the channel and consumes data from it. +* The mailbox thread returns the empty exclusive buffer to the channel or the empty floating buffer to the shared pool. + +### Output network buffers +Unlike the input buffer pool, the output buffers pool have only one type of buffers which it shares all among all subpartitions. +In order to avoid the data skew, the number of buffers for each subpartition is limited by `taskmanager.network.memory.max-buffers-per-channel`. + +The buffer lifecycle can be represented as follows: +* The mailbox thread choise the subpartition for sending data and request the buffer for filling. +* The subpartition returns the last partitily filled buffer or request empty one from the shared pool. +* The subpartition prepare buffer for sending when it is full. +* The netty thread request the filled`*` buffer from the subpartition. +* The netty thread sends data from the buffer and returns the buffer to the shared pool. Review comment: Again I would drop this. ########## File path: docs/content/docs/ops/state/network_buffer.md ########## @@ -0,0 +1,108 @@ +--- +title: "Network Buffers" +weight: 100 +type: docs +aliases: + - /ops/state/network_buffer.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Network buffer + +## Overview + +Each record in flink is sent to the next subtask not individually but compounded in Network buffer, +the smallest unit inter subtask communication. Also, in order to keep high throughput, +flink supports the network buffers queue(so called in-flight data) as for output stream as well as for the input stream. +In the result each subtask have an input queue waiting for the consumption and an output queue +waiting for sending to the next subtask. Having a full input queue can guarantee busyness of +the subtask and high level of the throughput but it has negative effect for the latency and +what more important for the checkpoint time. The long checkpoint time issue can be explained as +long waiting in network buffers queue(the subtask requires some time to consume all data from +the queue before it can handle the checkpoint barrier), as well as the data skew - +when different input queue for one subtask contain different number of the data which leads to +increasing the checkpoint alignment time and as result to increasing the whole checkpoint time. + +## Buffer debloat + +The buffer debloat is mechanism which automatically adjust the buffer size in order to keep configured +checkpoint time along with high throughput. More precisely, the buffer debloat calculate the maximum possible throughput +(the maximum throughput which would be if the subtask was always busy) +for the subtask and adapt the buffer size in such a way that the time for consumption of in-flight data would be equal to configured one. + +The most useful settings: +* The buffer debloat can be enabled by setting the property `taskmanager.network.memory.buffer-debloat.enabled` to `true`. +* Desirable time of the consumption in-flight data can be configured by setting `taskmanager.network.memory.buffer-debloat.target` to `duration`. + +The settings for the optimization: + +* `taskmanager.network.memory.buffer-debloat.period` - the minimum time between buffer size recalculation. +It can be decreased if the throughput is pretty volatile by some reason and the high speed on the changes required. +* `taskmanager.network.memory.buffer-debloat.samples` - The measure of calculation smoothing. +It can be decreased if the buffer size is changed undesirable slower compare to the instant throughput or it can be increased otherwise. +* `taskmanager.network.memory.buffer-debloat.threshold-percentages` - The optimization which prevents +the frequent buffer size change if the new size is not so different compared to the old one. + +See the [Configuration]({{< ref "docs/deployment/config" >}}#full-taskmanageroptions) documentation for details and additional parameters. + +The metrics which can help to observe the current buffer size: +* `estimatedTimeToConsumerBuffersMs` - the total time to consume data from all input channels. +* `debloatedBufferSize` - the current buffer size. + +## Network buffer lifecycle +Logically, Flink has several local buffer pools one for output stream and one for each input gate. +Each of that pools is limited to at most +``` +#channels * taskmanager.network.memory.buffers-per-channel + taskmanager.network.memory.floating-buffers-per-gate +``` + +The size of the buffer can be configured by setting `taskmanager.memory.segment-size`. + +### Input network buffers +Buffers in the input channel are divided into exclusive and floating buffers. +The exclusive buffers are always allocated at the initialization phase for each channel and +can be used only for this channel. In case, +when exclusive buffers are not enough the channel can request floating buffers from the shared gate buffer pool. + +The buffer lifecycle can be represented as follows: +* The netty thread receives data and requests the free buffer from the targeted channel. +* The channel, first of all, returns the floating buffer if it was allocated already if not, + the channel returns the exclusive buffer if it has a free one, if not, + the channel requests the floating buffer from the shared pool. +* The netty thread fills the buffer and sends it to the channel. +* The mailbox thread takes the filled buffer from the channel and consumes data from it. +* The mailbox thread returns the empty exclusive buffer to the channel or the empty floating buffer to the shared pool. + +### Output network buffers +Unlike the input buffer pool, the output buffers pool have only one type of buffers which it shares all among all subpartitions. +In order to avoid the data skew, the number of buffers for each subpartition is limited by `taskmanager.network.memory.max-buffers-per-channel`. Review comment: I would also add: ``` Similarly as on the input side, the configured amount of the exclusive buffers and floating buffers is treated only as the recommended values. If there are not enough buffers available, Flink will be able to make a progress with only a single exclusive buffer per output subpartition and zero floating buffers. ``` ########## File path: docs/content/docs/ops/state/network_buffer.md ########## @@ -0,0 +1,108 @@ +--- +title: "Network Buffers" +weight: 100 +type: docs +aliases: + - /ops/state/network_buffer.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Network buffer + +## Overview + +Each record in flink is sent to the next subtask not individually but compounded in Network buffer, +the smallest unit inter subtask communication. Also, in order to keep high throughput, +flink supports the network buffers queue(so called in-flight data) as for output stream as well as for the input stream. +In the result each subtask have an input queue waiting for the consumption and an output queue +waiting for sending to the next subtask. Having a full input queue can guarantee busyness of +the subtask and high level of the throughput but it has negative effect for the latency and +what more important for the checkpoint time. The long checkpoint time issue can be explained as +long waiting in network buffers queue(the subtask requires some time to consume all data from +the queue before it can handle the checkpoint barrier), as well as the data skew - +when different input queue for one subtask contain different number of the data which leads to +increasing the checkpoint alignment time and as result to increasing the whole checkpoint time. + +## Buffer debloat + +The buffer debloat is mechanism which automatically adjust the buffer size in order to keep configured +checkpoint time along with high throughput. More precisely, the buffer debloat calculate the maximum possible throughput +(the maximum throughput which would be if the subtask was always busy) +for the subtask and adapt the buffer size in such a way that the time for consumption of in-flight data would be equal to configured one. + +The most useful settings: +* The buffer debloat can be enabled by setting the property `taskmanager.network.memory.buffer-debloat.enabled` to `true`. +* Desirable time of the consumption in-flight data can be configured by setting `taskmanager.network.memory.buffer-debloat.target` to `duration`. + +The settings for the optimization: + +* `taskmanager.network.memory.buffer-debloat.period` - the minimum time between buffer size recalculation. +It can be decreased if the throughput is pretty volatile by some reason and the high speed on the changes required. +* `taskmanager.network.memory.buffer-debloat.samples` - The measure of calculation smoothing. +It can be decreased if the buffer size is changed undesirable slower compare to the instant throughput or it can be increased otherwise. +* `taskmanager.network.memory.buffer-debloat.threshold-percentages` - The optimization which prevents +the frequent buffer size change if the new size is not so different compared to the old one. + +See the [Configuration]({{< ref "docs/deployment/config" >}}#full-taskmanageroptions) documentation for details and additional parameters. + +The metrics which can help to observe the current buffer size: +* `estimatedTimeToConsumerBuffersMs` - the total time to consume data from all input channels. +* `debloatedBufferSize` - the current buffer size. + +## Network buffer lifecycle +Logically, Flink has several local buffer pools one for output stream and one for each input gate. +Each of that pools is limited to at most +``` +#channels * taskmanager.network.memory.buffers-per-channel + taskmanager.network.memory.floating-buffers-per-gate +``` + +The size of the buffer can be configured by setting `taskmanager.memory.segment-size`. + +### Input network buffers +Buffers in the input channel are divided into exclusive and floating buffers. +The exclusive buffers are always allocated at the initialization phase for each channel and +can be used only for this channel. In case, +when exclusive buffers are not enough the channel can request floating buffers from the shared gate buffer pool. Review comment: ```suggestion Flink attempts to acquire the configured amount of the exclusive buffers in the initialization phase for each channel. Exclusive buffers can be used only by one particular channel. A channel can request additional floating buffers from a buffer pool shared across all channels belonging to the given input gate. Flink treats the configured amount of exclusive and floating buffers as only a recommended values. If there are not enough buffers available on the input side, Flink will be able to make a progress with zero exclusive buffers and a single floating buffer. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org