fredia commented on code in PR #26237: URL: https://github.com/apache/flink/pull/26237#discussion_r1978510149
########## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/CachedDataOutputStream.java: ########## @@ -64,58 +66,80 @@ public CachedDataOutputStream( @Override public long getPos() throws IOException { - return cacheOutputStream.getPos(); + if (cacheOutputStream == null) { + return originOutputStream.getPos(); + } else { + return cacheOutputStream.getPos(); + } } @Override public void write(int b) throws IOException { - cacheOutputStream.write(b); + if (cacheOutputStream != null) { + cacheOutputStream.write(b); + } originOutputStream.write(b); } public void write(byte[] b) throws IOException { - cacheOutputStream.write(b); + if (cacheOutputStream != null) { + cacheOutputStream.write(b); + } originOutputStream.write(b); } @Override public void write(byte[] b, int off, int len) throws IOException { - cacheOutputStream.write(b, off, len); + if (cacheOutputStream != null) { + cacheOutputStream.write(b, off, len); + } originOutputStream.write(b, off, len); } @Override public void flush() throws IOException { - cacheOutputStream.flush(); + if (cacheOutputStream != null) { + cacheOutputStream.flush(); + } originOutputStream.flush(); } @Override public void sync() throws IOException { - cacheOutputStream.sync(); + if (cacheOutputStream != null) { + cacheOutputStream.sync(); + } originOutputStream.sync(); } @Override public void close() throws IOException { + long size = getPos(); if (originOutputStream != null) { originOutputStream.close(); originOutputStream = null; } if (cacheOutputStream != null) { - putIntoCache(); + putIntoCache(size); cacheOutputStream.close(); cacheOutputStream = null; + } else { + registerIntoCache(size); } } - private void putIntoCache() throws IOException { - long thisSize = cacheOutputStream.getPos(); + private void putIntoCache(long size) { FileCacheEntry fileCacheEntry = - new FileCacheEntry(fileBasedCache, originalPath, cachePath, thisSize); + new FileCacheEntry(fileBasedCache, originalPath, cachePath, size); fileCacheEntry.switchStatus( FileCacheEntry.EntryStatus.REMOVED, FileCacheEntry.EntryStatus.LOADED); fileCacheEntry.loaded(); fileBasedCache.addFirst(cachePath.toString(), fileCacheEntry); } + + private void registerIntoCache(long size) { + FileCacheEntry fileCacheEntry = Review Comment: Switch Status REMOVED->INVALID? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org