fredia commented on code in PR #26237:
URL: https://github.com/apache/flink/pull/26237#discussion_r1978510149


##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/CachedDataOutputStream.java:
##########
@@ -64,58 +66,80 @@ public CachedDataOutputStream(
 
     @Override
     public long getPos() throws IOException {
-        return cacheOutputStream.getPos();
+        if (cacheOutputStream == null) {
+            return originOutputStream.getPos();
+        } else {
+            return cacheOutputStream.getPos();
+        }
     }
 
     @Override
     public void write(int b) throws IOException {
-        cacheOutputStream.write(b);
+        if (cacheOutputStream != null) {
+            cacheOutputStream.write(b);
+        }
         originOutputStream.write(b);
     }
 
     public void write(byte[] b) throws IOException {
-        cacheOutputStream.write(b);
+        if (cacheOutputStream != null) {
+            cacheOutputStream.write(b);
+        }
         originOutputStream.write(b);
     }
 
     @Override
     public void write(byte[] b, int off, int len) throws IOException {
-        cacheOutputStream.write(b, off, len);
+        if (cacheOutputStream != null) {
+            cacheOutputStream.write(b, off, len);
+        }
         originOutputStream.write(b, off, len);
     }
 
     @Override
     public void flush() throws IOException {
-        cacheOutputStream.flush();
+        if (cacheOutputStream != null) {
+            cacheOutputStream.flush();
+        }
         originOutputStream.flush();
     }
 
     @Override
     public void sync() throws IOException {
-        cacheOutputStream.sync();
+        if (cacheOutputStream != null) {
+            cacheOutputStream.sync();
+        }
         originOutputStream.sync();
     }
 
     @Override
     public void close() throws IOException {
+        long size = getPos();
         if (originOutputStream != null) {
             originOutputStream.close();
             originOutputStream = null;
         }
         if (cacheOutputStream != null) {
-            putIntoCache();
+            putIntoCache(size);
             cacheOutputStream.close();
             cacheOutputStream = null;
+        } else {
+            registerIntoCache(size);
         }
     }
 
-    private void putIntoCache() throws IOException {
-        long thisSize = cacheOutputStream.getPos();
+    private void putIntoCache(long size) {
         FileCacheEntry fileCacheEntry =
-                new FileCacheEntry(fileBasedCache, originalPath, cachePath, 
thisSize);
+                new FileCacheEntry(fileBasedCache, originalPath, cachePath, 
size);
         fileCacheEntry.switchStatus(
                 FileCacheEntry.EntryStatus.REMOVED, 
FileCacheEntry.EntryStatus.LOADED);
         fileCacheEntry.loaded();
         fileBasedCache.addFirst(cachePath.toString(), fileCacheEntry);
     }
+
+    private void registerIntoCache(long size) {
+        FileCacheEntry fileCacheEntry =

Review Comment:
   Switch Status REMOVED->INVALID?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to