StephanEwen commented on a change in pull request #12332: URL: https://github.com/apache/flink/pull/12332#discussion_r430231771
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java ########## @@ -247,15 +247,13 @@ public long getPos() throws IOException { return pos + (outStream == null ? 0 : outStream.getPos()); } - @Override - public void flush() throws IOException { + private void flushToFile() throws IOException { Review comment: We can make this public, let the tests call it directly. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java ########## @@ -266,6 +264,16 @@ public void flush() throws IOException { } } + /** + * Flush buffers to file if their size is above {@link #localStateThreshold}. + */ + @Override + public void flush() throws IOException { + if (pos > localStateThreshold) { Review comment: Maybe extend this to `if (stream != null || pos > localStateThreshold)`? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageTest.java ########## @@ -165,8 +165,7 @@ public void testDirectoriesForExclusiveAndSharedState() throws Exception { CheckpointStateOutputStream exclusiveStream = storageLocation.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE); - exclusiveStream.write(42); - exclusiveStream.flush(); Review comment: If we type the `exclusiveStream` to `FsCheckpointStateOutputStream`, we could call `flushToFile()` directly. Then we don't implicitly rely on the threshold logic here as well. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org