AlexYinHan commented on code in PR #23514: URL: https://github.com/apache/flink/pull/23514#discussion_r1366799573
########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/SegmentFileStateHandle.java: ########## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.filemerging; + +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.runtime.state.filesystem.FsSegmentDataInputStream; + +import java.io.IOException; +import java.util.Objects; + +/** + * {@link FileStateHandle} for state that was written to a file segment. A {@link + * SegmentFileStateHandle} represents a {@link LogicalFile}, which has already been written to a + * segment in a physical file. + * + * <p>TODO (FLINK-32079): serialization and deserialization of {@link SegmentFileStateHandle}. + */ +public class SegmentFileStateHandle extends FileStateHandle { + + private static final long serialVersionUID = 1L; + + private final long startPos; + + private final CheckpointedStateScope scope; + + /** + * Creates a new segment file state for the given file path. + * + * @param filePath The path to the file that stores the state. + * @param startPos Start position of the segment in the physical file. + * @param stateSize Size of the segment. + * @param scope The state's scope, whether it is exclusive or shared. + */ + public SegmentFileStateHandle( + Path filePath, long startPos, long stateSize, CheckpointedStateScope scope) { + super(filePath, stateSize); + this.startPos = startPos; + this.scope = scope; + } + + /** + * This method should be empty, so that JM is not in charge of the lifecycle of files in a + * file-merging checkpoint. + */ + @Override + public void discardState() {} + + @Override + public FSDataInputStream openInputStream() throws IOException { + FSDataInputStream inputStream = super.openInputStream(); + return new FsSegmentDataInputStream(inputStream, startPos, stateSize); + } + + public long getStartPos() { + return startPos; + } + + @Override + public long getStateSize() { + return stateSize; + } + + public long getEndPos() { Review Comment: Indeed. I've removed that function. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org