Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5105#discussion_r156413073 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java --- @@ -169,33 +151,29 @@ public Buffer getCurrentBuffer() { if (targetBuffer == null) { return null; } - - targetBuffer.setSize(position); - return targetBuffer; + Buffer result = targetBuffer.build(); + targetBuffer = null; + return result; } @Override public void clearCurrentBuffer() { targetBuffer = null; - position = 0; - limit = 0; } @Override public void clear() { targetBuffer = null; - position = 0; - limit = 0; - // ensure clear state with hasRemaining false (for correct setNextBuffer logic) + // ensure clear state with hasRemaining false (for correct setNextBufferBuilder logic) dataBuffer.position(dataBuffer.limit()); lengthBuffer.position(4); } @Override public boolean hasData() { // either data in current target buffer or intermediate buffers - return position > 0 || (lengthBuffer.hasRemaining() || dataBuffer.hasRemaining()); + return (targetBuffer != null && !targetBuffer.isEmpty()) || (lengthBuffer.hasRemaining() || dataBuffer.hasRemaining()); --- End diff -- it has been there before, but the second set of parentheses is actually not needed... `return (targetBuffer != null && !targetBuffer.isEmpty()) || lengthBuffer.hasRemaining() || dataBuffer.hasRemaining();`
---