Hi fanrui,
Thanks for creating the FLIP.
In general, I think the overdraft is good idea and it should help in
described above cases. Here are my thoughts about configuration:
Please, correct me if I am wrong but as I understand right now we have
following calculation.
maxBuffersNumber(per TaskManager) = Network memory(calculated via
taskmanager.memory.network.fraction, taskmanager.memory.network.min,
taskmanager.memory.network.max and total memory size) /
taskmanager.memory.segment-size.
requiredBuffersNumber(per TaskManager) = (exclusive buffers *
parallelism + floating buffers) * subtasks number in TaskManager
buffersInUseNumber = real number of buffers which used at current
moment(always <= requiredBuffersNumber)
Ideally requiredBuffersNumber should be equal to maxBuffersNumber which
allows Flink work predictibly. But if requiredBuffersNumber >
maxBuffersNumber sometimes it is also fine(but not good) since not all
required buffers really mandatory(e.g. it is ok if Flink can not
allocate floating buffers)
But if maxBuffersNumber > requiredBuffersNumber, as I understand Flink
just never use these leftovers buffers(maxBuffersNumber -
requiredBuffersNumber). Which I propose to use. ( we can actualy use
even difference 'requiredBuffersNumber - buffersInUseNumber' since if
one TaskManager contains several operators including 'window' which can
temporally borrow buffers from the global pool).
My proposal, more specificaly(it relates only to requesting buffers
during processing single record while switching to unavalability between
records should be the same as we have it now):
* If one more buffer requested but maxBuffersPerChannel reached, then
just ignore this limitation and allocate this buffers from any
place(from LocalBufferPool if it has something yet otherwise from
NetworkBufferPool)
* If LocalBufferPool exceeds limit, then temporally allocate it from
NetworkBufferPool while it has something to allocate
Maybe I missed something and this solution won't work, but I like it
since on the one hand, it work from the scratch without any
configuration, on the other hand, it can be configuration by changing
proportion of maxBuffersNumber and requiredBuffersNumber.
The last thing that I want to say, I don't really want to implement new
configuration since even now it is not clear how to correctly configure
network buffers with existing configuration and I don't want to
complicate it, especially if it will be possible to resolve the problem
automatically(as described above).
So is my understanding about network memory/buffers correct?
--
Best regards,
Anton Kalashnikov
27.04.2022 07:46, rui fan пишет:
Hi everyone,
Unaligned Checkpoint (FLIP-76 [1]) is a major feature of Flink. It
effectively solves the problem of checkpoint timeout or slow
checkpoint when backpressure is severe.
We found that UC(Unaligned Checkpoint) does not work well when the
back pressure is severe and multiple output buffers are required to
process a single record. FLINK-14396 [2] also mentioned this issue
before. So we propose the overdraft buffer to solve it.
I created FLINK-26762[3] and FLIP-227[4] to detail the overdraft
buffer mechanism. After discussing with Anton Kalashnikov, there are
still some points to discuss:
* There are already a lot of buffer-related configurations. Do we
need to add a new configuration for the overdraft buffer?
* Where should the overdraft buffer use memory?
* If the overdraft-buffer uses the memory remaining in the
NetworkBufferPool, no new configuration needs to be added.
* If adding a new configuration:
o Should we set the overdraft-memory-size at the TM level or the
Task level?
o Or set overdraft-buffers to indicate the number of
memory-segments that can be overdrawn.
o What is the default value? How to set sensible defaults?
Currently, I implemented a POC [5] and verified it using
flink-benchmarks [6]. The POC sets overdraft-buffers at Task level,
and default value is 10. That is: each LocalBufferPool can overdraw up
to 10 memory-segments.
Looking forward to your feedback!
Thanks,
fanrui
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
[2] https://issues.apache.org/jira/browse/FLINK-14396
[3] https://issues.apache.org/jira/browse/FLINK-26762
[4]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer
[5]
https://github.com/1996fanrui/flink/commit/c7559d94767de97c24ea8c540878832138c8e8fe
[6] https://github.com/apache/flink-benchmarks/pull/54