[ https://issues.apache.org/jira/browse/FLINK-8755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16378940#comment-16378940 ]
ASF GitHub Bot commented on FLINK-8755: --------------------------------------- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5581#discussion_r170993496 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -222,59 +219,24 @@ public void testConsumeSpilledPartition() throws Exception { assertEquals(1, listener.getNumNotifications()); assertFalse(reader.nextBufferIsEvent()); // buffer - BufferAndBacklog read = reader.getNextBuffer(); - assertNotNull(read); - assertTrue(read.buffer().isBuffer()); + assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, true); assertEquals(2, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - assertFalse(read.buffer().isRecycled()); - read.buffer().recycleBuffer(); - assertTrue(read.buffer().isRecycled()); - assertFalse(read.nextBufferIsEvent()); assertFalse(reader.nextBufferIsEvent()); // buffer - read = reader.getNextBuffer(); - assertNotNull(read); - assertTrue(read.buffer().isBuffer()); + assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true); assertEquals(1, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - assertFalse(read.buffer().isRecycled()); - read.buffer().recycleBuffer(); - assertTrue(read.buffer().isRecycled()); - assertTrue(read.nextBufferIsEvent()); assertTrue(reader.nextBufferIsEvent()); // event - read = reader.getNextBuffer(); - assertNotNull(read); - assertFalse(read.buffer().isBuffer()); + assertNextEvent(reader, BUFFER_DATA_SIZE, null, true, 1, false, true); --- End diff -- almost - it remains `@Nullable` in `SubpartitionTestBase#assertNextBufferOrEvent` > SpilledSubpartitionView wrongly relys on the backlog for determining whether > more data is available > --------------------------------------------------------------------------------------------------- > > Key: FLINK-8755 > URL: https://issues.apache.org/jira/browse/FLINK-8755 > Project: Flink > Issue Type: Sub-task > Components: Network > Reporter: Nico Kruber > Assignee: Nico Kruber > Priority: Blocker > Fix For: 1.5.0 > > > {code} > public BufferAndBacklog getNextBuffer() throws IOException, > InterruptedException { > //... > int newBacklog = parent.decreaseBuffersInBacklog(current); > return new BufferAndBacklog(current, newBacklog > 0, newBacklog, > nextBufferIsEvent); > {code} > relies on the backlog to signal further data availability. However, if there > are only events left in the buffer queue, their buffers are not included in > the backlog counting and therefore, {{isMoreAvailable}} will be wrongly > {{false}} here. -- This message was sent by Atlassian JIRA (v7.6.3#76005)