[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16326438#comment-16326438 ]
ASF GitHub Bot commented on FLINK-7456: --------------------------------------- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161567331 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +78,142 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** + * Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, + * verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even + * though it has no available credits. + */ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = new PartitionRequestQueue(); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + + // block the channel so that we see an intermediate state in the test + ByteBuf channelBlockingBuffer = blockChannel(channel); + assertNull(channel.readOutbound()); + + // Notify an available event buffer to trigger enqueue the reader + reader.notifyBuffersAvailable(1); + + channel.runPendingTasks(); + + // The reader is enqueued in the pipeline because the next buffer is an event, even though no credits are available + assertEquals(1, queue.getAvailableReaders().size()); + assertEquals(0, reader.getNumCreditsAvailable()); + + // Flush the buffer to make the channel writable again and see the final results + channel.flush(); + assertSame(channelBlockingBuffer, channel.readOutbound()); + + assertEquals(0, queue.getAvailableReaders().size()); + assertEquals(0, reader.getNumCreditsAvailable()); + } + + /** + * Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, + * verifying the reader would be enqueued in the pipeline iff it has both available credits and buffers. + */ + @Test + public void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(false); + when(view.getNextBuffer()).thenReturn(new BufferAndBacklog(TestBufferFactory.createBuffer(), 2, false)); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = new PartitionRequestQueue(); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + queue.notifyReaderCreated(reader); + + // block the channel so that we see an intermediate state in the test + ByteBuf channelBlockingBuffer = blockChannel(channel); + assertNull(channel.readOutbound()); + + // Notify available buffers to trigger enqueue the reader + final int notifyNumBuffers = 5; + for (int i = 0; i < notifyNumBuffers; i++) { + reader.notifyBuffersAvailable(1); + } + + channel.runPendingTasks(); + + // the reader is not enqueued in the pipeline because no credits are available + // -> it should still have the same number of pending buffers + assertEquals(0, queue.getAvailableReaders().size()); + assertEquals(notifyNumBuffers, reader.getNumBuffersAvailable()); + assertFalse(reader.isRegisteredAsAvailable()); + assertEquals(0, reader.getNumCreditsAvailable()); + + // Notify available credits to trigger enqueue the reader again + final int notifyNumCredits = 3; + for (int i = 1; i <= notifyNumCredits; i++) { + queue.addCredit(receiverId, 1); + + // the reader is enqueued in the pipeline because it has both available buffers and credits + // since the channel is blocked though, we will not process anything and only enqueue the + // reader once + assertTrue(reader.isRegisteredAsAvailable()); + assertEquals(1, queue.getAvailableReaders().size()); + assertEquals(i, reader.getNumCreditsAvailable()); + assertEquals(notifyNumBuffers, reader.getNumBuffersAvailable()); + } + + // Flush the buffer to make the channel writable again and see the final results + channel.flush(); + assertSame(channelBlockingBuffer, channel.readOutbound()); + + assertEquals(0, queue.getAvailableReaders().size()); + assertEquals(0, reader.getNumCreditsAvailable()); + assertEquals(notifyNumBuffers - notifyNumCredits, reader.getNumBuffersAvailable()); + assertFalse(reader.isRegisteredAsAvailable()); --- End diff -- let's end with `assertNull(channel.readOutbound());` > Implement Netty sender incoming pipeline for credit-based > --------------------------------------------------------- > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network > Reporter: zhijiang > Assignee: zhijiang > Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)