[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16186039#comment-16186039 ]
ASF GitHub Bot commented on FLINK-7406: --------------------------------------- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r141886467 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -390,7 +390,63 @@ public BufferProvider getBufferProvider() throws IOException { return inputGate.getBufferProvider(); } - public void onBuffer(Buffer buffer, int sequenceNumber) { + /** + * Requests buffer from input channel directly for receiving network data. + * It should always return an available buffer in credit-based mode. + * + * @return The available buffer. + */ + public Buffer requestBuffer() { + synchronized (availableBuffers) { + return availableBuffers.poll(); + } + } + + /** + * Receives the backlog from producer's buffer response. If the number of available + * buffers is less than the backlog length, it will request floating buffers from buffer + * pool, and then notify unannounced credits to the producer. + * + * @param backlog The number of unsent buffers in the producer's sub partition. + */ + private void onSenderBacklog(int backlog) { + int numRequestedBuffers = 0; + + synchronized (availableBuffers) { + // Important: the isReleased check should be inside the synchronized block. + if (!isReleased.get()) { + senderBacklog.set(backlog); + + while (senderBacklog.get() > availableBuffers.size() && !isWaitingForFloatingBuffers.get()) { --- End diff -- I was thinking about it a bit more and was talking to @StephanEwen about it, and we think, that it is actually fine to grab all resources we need at the moment. If there are not enough buffers at some point, the fair distribution will start when the buffers are recycled, i.e. via the callbacks of the new `BufferListener`. Since each channel always has its own exclusive buffers, we can guarantee that it always makes progress anyway! Additionally, we cannot really make a fair distribution from the start when receiving the first backlog (since we do not know all the other backlogs) unless we're waiting some time which we also do not want. I kind of like your idea of having a `numBuffersPerAllocation`. Let's keep this in our heads and evaluate the current solution first to see whether we need this addition. Regarding the formula (which I took from the network FLIP): - from the FLIP with regards to the buffer distribution strategy: `Design rationale 2: Each channel always tries to maintain a credit of ‘backlog_length + initialCredit’. That means that each channel tries to build the receive window for its current backlog as much as possible from the floating buffers, and use the exclusive ‘initialCredit’ buffers as a means to grow the window.` That way we always have some buffers available immediately on the receiver side so the sender can continue sending new buffers immediately (as long as there are buffers available on the receiver) and we do not have to wait for the exclusive buffers to come back. - Note that this would have to be changed in the various checks for `availableBuffers.size() >= senderBacklog.get()`, e.g. in `RemoteInputChannel#notifyBufferAvailable()`. - Similarly, `RemoteInputChannel#recycle()` needs to be adapted, in case our exclusive buffers are in use and we requested `backlog_length + initialCredit - currentCredit` *floating* buffers in order not to stack up `2*initialCredit` buffers once `backlog == 0` again. (+ an according unit test) - what do you mean with `backlog-currentCredit` not being very accurate? We guarantee that there are no more than `currentCredit` buffers on the wire (some alraedy in the channel, some only announced) and, at the time the buffer was sent, `backlog` additional buffers were queued so in order to send them, we always need `backlog-currentCredit` irrespective of how much credit is announced vs. being on the wire. Or am I not seeing something here? > Implement Netty receiver incoming pipeline for credit-based > ----------------------------------------------------------- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network > Reporter: zhijiang > Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)