[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16326440#comment-16326440 ]
ASF GitHub Bot commented on FLINK-7456: --------------------------------------- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161559913 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +77,95 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** + * Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, + * verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even + * though it has no available credits. + */ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = spy(new PartitionRequestQueue()); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + + // Notify an available event buffer to trigger enqueue the reader + reader.notifyBuffersAvailable(1); + + channel.runPendingTasks(); + + verify(queue, times(1)).triggerEnqueueAvailableReader(reader); + // The reader is enqueued in the pipeline because the next buffer is event, even though no available credits + verify(queue, times(1)).enqueueAvailableReader(reader); + assertEquals(0, reader.getNumCreditsAvailable()); + } + + /** + * Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, + * verifying the reader would be enqueued in the pipeline iff it has both available credits and buffers. + */ + @Test + public void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(false); + when(view.getNextBuffer()).thenReturn(new BufferAndBacklog(TestBufferFactory.createBuffer(), 2)); --- End diff -- let's remove that mock ... > Implement Netty sender incoming pipeline for credit-based > --------------------------------------------------------- > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network > Reporter: zhijiang > Assignee: zhijiang > Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)