curcur commented on a change in pull request #13614: URL: https://github.com/apache/flink/pull/13614#discussion_r504675916
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumerWithPartialRecordLength.java ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.buffer; + +import javax.annotation.concurrent.NotThreadSafe; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * BufferConsumer with partial record length if a record is spanning over buffers + * + * <p>`partialRecordLength` is the length of bytes to skip in order to start with a complete record, + * from position index 0 of the underlying MemorySegment. `partialRecordLength` is used in approximate + * local recovery to find the start position of a complete record on a BufferConsumer, so called + * `partial record clean-up`. + * + * <p>Partial records happen if a record can not fit into one buffer, then the remaining part of the same record + * is put into the next buffer. Hence partial records only exist at the beginning of a buffer. + * Partial record clean-up is needed in the mode of approximate local recovery. + * If a record is spanning over multiple buffers, and the first (several) buffers have got lost due to the failure + * of the receiver task, the remaining data belonging to the same record in transition should be cleaned up. + * + * <p> If partialRecordLength == 0, the buffer starts with a complete record</p> + * <p> If partialRecordLength > 0, the buffer starts with a partial record, its length = partialRecordLength</p> + * <p> If partialRecordLength < 0, partialRecordLength is undefined. It is currently used in + * {@cite ResultSubpartitionRecoveredStateHandler#recover}</p> + */ +@NotThreadSafe +public class BufferConsumerWithPartialRecordLength { + private final BufferConsumer bufferConsumer; + private final int partialRecordLength; + + public BufferConsumerWithPartialRecordLength(BufferConsumer bufferConsumer, int partialRecordLength) { + this.bufferConsumer = checkNotNull(bufferConsumer); + this.partialRecordLength = partialRecordLength; + } + + public BufferConsumer getBufferConsumer() { + return bufferConsumer; + } + + public int getPartialRecordLength() { + return partialRecordLength; + } + + public Buffer build() { + return bufferConsumer.build(); + } + + public PartialRecordCleanupResult cleanupPartialRecord() { Review comment: > * `skipBuild(...)` would be replaced with `skip(...)`, that would just move the offset, without returning the buffer > * `.build()` call would be required afterwards, to get the remaining data I would also thinking of that, and it would be perfect if we do not have the case `bytesToSkip == buffer.getMaxCapacity()`. In the current build(), currentReaderPosition can never be `buffer.getMaxCapacity()`, because the consumer has already been marked as finished `currentReaderPosition = cachedWriterPosition` if the full buffer is read. Remember in case of a record spanning over buffers, we finish the builder first and then request a new builder. But I can still handle this special case in build() if you are fine to change the build() method. I was trying to not touch the build() method which is used in the normal PipelinedSubPartition read. I am fine with either way. > Do you really need to return the buffer here? I have a feeling that the could would be simpler if: > > * this method was returning just `true/false` if cleanup has finished or not. Yes, I need both `whether the clean-up is successful` + `returned buffer`, which would be called later in PipelinedApproximateSubpartition within `pollBuffer` method to 1. decide whether the clean up is successful 2. Handle sliced Buffer if successful or not. The reason I am returning an `empty` buffer instead of a `null` buffer in case of `bytesToSkip == buffer.getMaxCapacity()` is also to be easier to follow the current logic in pollBuffer what if we do not read data. Overall I want to wrap the clean-up logic within `BufferConsumerWithPartialRecordLength`, which I guess is one of the main reasons why we introduce this class? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org