Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5710#discussion_r175363048 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java --- @@ -99,10 +99,11 @@ public void commit() { * Mark this {@link BufferBuilder} and associated {@link BufferConsumer} as finished - no new data writes will be * allowed. * + * <p>This method should be idempotent to handle failures and task interruptions. Check FLINK-8948 for more details. + * * @return number of written bytes. */ public int finish() { - checkState(!isFinished()); --- End diff -- I fear, it's not as simple as this since `positionMarker.markFisinished()` negates the cached position and will do so twice when called twice. Please add unit tests which verify the idempotency and extend `BufferBuilderAndConsumerTest#testIsFinished` to also verify the positions/sizes of the finished buffers/buffer builders/buffer consumers.
---