Hi Jiayi,

Thanks for concerning the network stack and you pointed out a very good 
question.

Your understanding is right. In credit-based mode, on receiver side it has 
fixed exclusive buffers(credits) for each remote input channel to confirm every 
channel could receive data in parallel, not block each other.
The receiver also has a floating shared buffer pool for all the input channels 
in order to give more credits for large backlog on sender side.

On sender side it still uses a shared buffer pool for all the subpartitions. In 
one-to-one mode which means one producer only produces data for one consumer, 
then it seems no other concerns. In all-to-all mode which means
one producer emits data for all the consumers, then the buffers in pool might 
be eventually accumulated into the slow subpartition until exhausted, which 
would cause the other fast subpartitions have no available buffers to fill in 
more data.
This would cause backpressure finally. 

Because the operator does not know the condition of buffer usage and it could 
not select which records are emitted in priority. Until the record is emitted 
by producer then we could know which subpartition covers this record via 
ChannelSelector. If we do not serialize this record into slow subpartition to 
occupy buffer resource, then it needs additional memory overhead for caching 
this record, which is not within expectation to cause unstable. So on producer 
side it seems have no other choice until the buffer resource is exhausted.

The credit-based is not for solving the backpressure issue which would not be 
avoided completely.  The credit-based could bring obvious benefits for 
one-to-one mode sharing tcp channel in backpressure scenario, and could aovid 
overhead memory usages in netty stack to casue unstable and speed up 
exactly-once checkpoint for avoiding spilling blocked data.

In addition, we ever implemeted an improvement for RebalanceStrategy in 
considering the slow subpartition issue. For rebalance channel selector, the 
record could be emitted to any subpartitions actually, no correctness issue. 
Then when the record is emmited, we select the fastest subpartition to take 
this record based on the current backlog size instead of previous round-robin 
way. Then it could bing benefits for some scenarios.

Best,
Zhijiang
------------------------------------------------------------------
From:bupt_ljy <bupt_...@163.com>
Send Time:2019年6月18日(星期二) 16:35
To:dev <dev@flink.apache.org>
Subject:A Question About Flink Network Stack

Hi all,
I’m not very familiar with the network part of Flink. And a question occurs to 
me after I read most related source codes about network. I’ve seen that Flink 
uses the credit-based machanism to solve the blocking problem from receivers’ 
side, which means that one “slow” input channel won’t block other input 
channels’ consumption because of their own exclusive credits.
However, from the sender’s side, I find that memory segments sent to all 
receivers’ channels share the same local segment pool(LocalBufferPool), which 
may cause a problem here. Assume that we have a non-parallel source, which is 
partitioned into a map operator, whose parallelism is two, and one of the map 
tasks is consuming very slow. Is there any possibility that the memory segments 
which should be sent to the slower receiver fill the whole local segment pool, 
which blocks the data which should be sent to the faster receiver?
I appreciate any comments or answers, and please correct me if I am wrong about 
this.




Best Regards,
Jiayi Liao

Reply via email to