davidradl commented on code in PR #26415:
URL: https://github.com/apache/flink/pull/26415#discussion_r2033089874


##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/CachedDataInputStream.java:
##########
@@ -125,39 +127,53 @@ private FSDataInputStream getStream() throws IOException {
                         StreamStatus.CACHED_CLOSED);
                 streamStatus = StreamStatus.CACHED_CLOSED;
             }
-            // try reopen
-            tryReopen();
-            stream = tryGetCacheStream();
-            if (stream != null) {
-                fileBasedCache.incHitCounter();
-                return stream;
-            }
-            fileBasedCache.incMissCounter();
-            return originalStream;
-        } else if (streamStatus == StreamStatus.ORIGINAL) {
-            fileBasedCache.incMissCounter();
-            return originalStream;
-        } else {
-            if (streamStatus == StreamStatus.CACHED_OPEN) {
-                stream = tryGetCacheStream();
+            // if it is CACHED_CLOSED, we try to reopen it
+            if (streamStatus == StreamStatus.CACHED_CLOSED) {
+                stream = tryReopen();
                 if (stream != null) {
                     fileBasedCache.incHitCounter();
                     return stream;
                 }
+                fileBasedCache.incMissCounter();
+                return originalStream;
+            } else if (streamStatus == StreamStatus.ORIGINAL) {
+                fileBasedCache.incMissCounter();
+                return originalStream;
+            } else {
+                // The stream is not closed, but we cannot get the cache 
stream.
+                // Meaning that it is in the process of closing, but the 
status has not been
+                // updated. Thus, we'd better retry here until it reach a 
stable state (CLOSING).
+                Thread.yield();
             }
-            fileBasedCache.incMissCounter();
-            return originalStream;
         }
     }
 
+    /**
+     * Attempts to retrieve the cached stream if it is open and the reference 
count is greater than
+     * zero. If successful, it retains the reference count and returns the 
cached stream. The
+     * invoker must ensure to release the stream after use.
+     *
+     * @return the cached stream if available, or null if not
+     */
     private FSDataInputStream tryGetCacheStream() {
         if (streamStatus == StreamStatus.CACHED_OPEN && cacheEntry.tryRetain() 
> 0) {
-            return fsdis;
+            // Double-check the status as it may change after retain.

Review Comment:
   what do we mean by retain here? Do we mean retry?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to