zhijiangW commented on a change in pull request #13595: URL: https://github.com/apache/flink/pull/13595#discussion_r514988635
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java ########## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.event.AbstractEvent; +import org.apache.flink.runtime.io.disk.FileChannelManager; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.util.function.SupplierWithException; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType; +import static org.apache.flink.runtime.io.network.partition.SortBuffer.BufferWithChannel; +import static org.apache.flink.util.Preconditions.checkElementIndex; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * {@link SortMergeResultPartition} appends records and events to {@link SortBuffer} and after the {@link SortBuffer} + * is full, all data in the {@link SortBuffer} will be copied and spilled to a {@link PartitionedFile} in subpartition + * index order sequentially. Large records that can not be appended to an empty {@link SortBuffer} will be spilled to + * the {@link PartitionedFile} separately. + */ +@NotThreadSafe +public class SortMergeResultPartition extends ResultPartition { + + private final Object lock = new Object(); + + /** All active readers which are consuming data from this result partition now. */ + @GuardedBy("lock") + private final Set<SortMergeSubpartitionReader> readers = new HashSet<>(); + + /** {@link PartitionedFile} produced by this result partition. */ + @GuardedBy("lock") + private PartitionedFile resultFile; + + /** Used to generate random file channel ID. */ + private final FileChannelManager channelManager; + + /** Number of data buffers (excluding events) written for each subpartition. */ + private final int[] numDataBuffers; + + /** A piece of unmanaged memory for data writing. */ + private final MemorySegment writeBuffer; + + /** Size of network buffer and write buffer. */ + private final int networkBufferSize; + + /** Current {@link SortBuffer} to append records to. */ + private SortBuffer currentSortBuffer; + + /** File writer for this result partition. */ + private PartitionedFileWriter fileWriter; + + public SortMergeResultPartition( + String owningTaskName, + int partitionIndex, + ResultPartitionID partitionId, + ResultPartitionType partitionType, + int numSubpartitions, + int numTargetKeyGroups, + int networkBufferSize, + ResultPartitionManager partitionManager, + FileChannelManager channelManager, + @Nullable BufferCompressor bufferCompressor, + SupplierWithException<BufferPool, IOException> bufferPoolFactory) { + + super( + owningTaskName, + partitionIndex, + partitionId, + partitionType, + numSubpartitions, + numTargetKeyGroups, + partitionManager, + bufferCompressor, + bufferPoolFactory); + + this.channelManager = checkNotNull(channelManager); + this.networkBufferSize = networkBufferSize; + this.numDataBuffers = new int[numSubpartitions]; + this.writeBuffer = MemorySegmentFactory.allocateUnpooledOffHeapMemory(networkBufferSize); + } + + @Override + protected void releaseInternal() { + synchronized (lock) { + isFinished = true; // to fail writing faster + + // delete the produced file only when no reader is reading now + if (readers.isEmpty()) { + if (resultFile != null) { + resultFile.deleteQuietly(); + resultFile = null; + } + } + } + } + + @Override + public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException { + emit(record, targetSubpartition, DataType.DATA_BUFFER); + } + + @Override + public void broadcastRecord(ByteBuffer record) throws IOException { + broadcast(record, DataType.DATA_BUFFER); + } + + @Override + public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException { + Buffer buffer = EventSerializer.toBuffer(event, isPriorityEvent); + try { + ByteBuffer serializedEvent = buffer.getNioBufferReadable(); + broadcast(serializedEvent, buffer.getDataType()); + } finally { + buffer.recycleBuffer(); + } + } + + private void broadcast(ByteBuffer record, DataType dataType) throws IOException { + for (int channelIndex = 0; channelIndex < numSubpartitions; ++channelIndex) { + record.rewind(); + emit(record, channelIndex, dataType); + } + } + + private void emit(ByteBuffer record, int targetSubpartition, DataType dataType) throws IOException { + checkInProduceState(); + + SortBuffer sortBuffer = getSortBuffer(); + if (sortBuffer.append(record, targetSubpartition, dataType)) { + return; + } + + if (!sortBuffer.hasRemaining()) { + // the record can not be appended to the free sort buffer because it is too large + releaseCurrentSortBuffer(); + writeLargeRecord(record, targetSubpartition, dataType); + return; + } + + flushCurrentSortBuffer(); + emit(record, targetSubpartition, dataType); + } + + private void releaseCurrentSortBuffer() { + if (currentSortBuffer != null) { + currentSortBuffer.release(); + currentSortBuffer = null; + } + } + + private SortBuffer getSortBuffer() { + if (currentSortBuffer != null) { + return currentSortBuffer; + } + + currentSortBuffer = new PartitionSortedBuffer(bufferPool, numSubpartitions, networkBufferSize); + return currentSortBuffer; + } + + private void flushCurrentSortBuffer() throws IOException { + if (currentSortBuffer == null || !currentSortBuffer.hasRemaining()) { + releaseCurrentSortBuffer(); + return; + } + + currentSortBuffer.finish(); + PartitionedFileWriter fileWriter = getPartitionedFileWriter(); + + while (currentSortBuffer.hasRemaining()) { + BufferWithChannel bufferWithChannel = currentSortBuffer.copyData(writeBuffer); + Buffer buffer = bufferWithChannel.getBuffer(); + int subpartitionIndex = bufferWithChannel.getChannelIndex(); + + writeCompressedBufferIfPossible(buffer, fileWriter, subpartitionIndex); + updateStatistics(buffer, subpartitionIndex); + } + + releaseCurrentSortBuffer(); + } + + private PartitionedFileWriter getPartitionedFileWriter() throws IOException { + if (fileWriter == null) { + String basePath = channelManager.createChannel().getPath(); + fileWriter = new PartitionedFileWriter(basePath, numSubpartitions); + fileWriter.open(); + } + + fileWriter.startNewRegion(); + return fileWriter; + } + + private void writeCompressedBufferIfPossible( + Buffer buffer, + PartitionedFileWriter fileWriter, + int targetSubpartition) throws IOException { + if (canBeCompressed(buffer)) { + buffer = bufferCompressor.compressToIntermediateBuffer(buffer); + } + fileWriter.writeBuffer(buffer, targetSubpartition); + buffer.recycleBuffer(); + } + + private void updateStatistics(Buffer buffer, int subpartitionIndex) { + numBuffersOut.inc(); + numBytesOut.inc(buffer.readableBytes()); + if (buffer.isBuffer()) { + ++numDataBuffers[subpartitionIndex]; + } + } + + /** + * Spills the large record into the target {@link PartitionedFile} as a separate data region. + */ + private void writeLargeRecord(ByteBuffer record, int targetSubpartition, DataType dataType) throws IOException { + PartitionedFileWriter fileWriter = getPartitionedFileWriter(); + + while (record.hasRemaining()) { + int toCopy = Math.min(record.remaining(), writeBuffer.size()); + writeBuffer.put(0, record, toCopy); + NetworkBuffer buffer = new NetworkBuffer(writeBuffer, (buf) -> {}, dataType, toCopy); + + writeCompressedBufferIfPossible(buffer, fileWriter, targetSubpartition); + updateStatistics(buffer, targetSubpartition); + } + } + + void releaseReader(SortMergeSubpartitionReader reader) { + synchronized (lock) { + readers.remove(reader); + + // release the result partition if it has been marked as released + if (readers.isEmpty() && isReleased()) { + releaseInternal(); + } + } + } + + @Override + public void finish() throws IOException { + checkInProduceState(); + + broadcastEvent(EndOfPartitionEvent.INSTANCE, false); + flushCurrentSortBuffer(); + + synchronized (lock) { + checkState(!isReleased()); + + resultFile = fileWriter.finish(); + fileWriter = null; + + LOG.info("New partitioned file produced: {}.", resultFile); + } + + super.finish(); + } + + @Override + public void close() { + releaseCurrentSortBuffer(); Review comment: If the variable `currentSortBuffer` might be touched concurrently by two threads (task main thread and canceler thread), it might still has the potential race condition if one thread reset it as `null` to cause misleading NPE for another thread. The ideal condition is only allowing main thread access this variable and release it no matter with active or passive exit. Of course we can also resort to lock to avoid it if the logic is not easy to refactored. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org