reswqa commented on code in PR #19960: URL: https://github.com/apache/flink/pull/19960#discussion_r911659104
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataSpiller.java: ########## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil; +import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.SpilledBuffer; + +import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * This component is responsible for asynchronously writing in-memory data to disk. Each spilling + * operation will write the disk file sequentially. + */ +public class HsMemoryDataSpiller implements AutoCloseable { + /** One thread to perform spill operation. */ + private final ExecutorService ioExecutor = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("hybrid spiller thread").build()); + + /** File channel to write data. */ + private final FileChannel dataFileChannel; + + /** Records the current writing location. */ + private long totalBytesWritten; + + public HsMemoryDataSpiller(FileChannel dataFileChannel) { + this.dataFileChannel = dataFileChannel; + } + + /** + * Spilling buffers to disk asynchronously. + * + * @param bufferToSpill buffers need to be spilled, must ensure that it is sorted by + * (subpartitionId, bufferIndex). + * @return the completable future contains spilled buffers information. + */ + public CompletableFuture<List<SpilledBuffer>> spillAsync(List<BufferWithInfo> bufferToSpill) { + BufferInfoAndFuture bufferInfoAndFuture = new BufferInfoAndFuture(bufferToSpill); + ioExecutor.execute(() -> spill(bufferInfoAndFuture)); + return bufferInfoAndFuture.future; + } + + /** Called in single-threaded ioExecutor. Order is guaranteed. */ + private void spill(BufferInfoAndFuture bufferInfoAndFuture) { + try { + List<BufferWithInfo> toWrite = bufferInfoAndFuture.bufferWithInfos; + List<SpilledBuffer> spilledBuffers = new ArrayList<>(); + long expectedBytes = collectBuffers(toWrite, spilledBuffers); + // write all buffers to file + writeBuffers(toWrite, expectedBytes); + totalBytesWritten += expectedBytes; + // complete spill future when buffers are written to disk successfully. + // note that the ownership of these buffers is transferred to the MemoryDataManager, + // which controls data's life cycle. + // TODO update file data index and handle buffers release in future ticket. + bufferInfoAndFuture.future.complete(spilledBuffers); + } catch (Throwable t) { + bufferInfoAndFuture.future.completeExceptionally(t); + } + } + + /** Collect buffer's offset and create spilled buffers. */ + private long collectBuffers(List<BufferWithInfo> toWrite, List<SpilledBuffer> spilledBuffers) { Review Comment: Renamed to `createSpilledBuffersAndGetTotalBytes`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org