davidradl commented on code in PR #26415:
URL: https://github.com/apache/flink/pull/26415#discussion_r2033081370


##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/CachedDataInputStream.java:
##########
@@ -196,72 +213,87 @@ synchronized void closeCachedStream() throws IOException {
         }
     }
 
-    private void finish() {
-        if (streamStatus == StreamStatus.CACHED_OPEN) {
-            cacheEntry.release();
-        }
-    }
-
     @Override
     public void seek(long desired) throws IOException {
+        FSDataInputStream stream = getStream();
         try {
-            getStream().seek(desired);
+            stream.seek(desired);
         } finally {
-            finish();
+            if (stream != originalStream) {
+                cacheEntry.release();
+            }
         }
     }
 
     @Override
     public long getPos() throws IOException {
+        FSDataInputStream stream = getStream();
         try {
-            return getStream().getPos();
+            return stream.getPos();
         } finally {
-            finish();
+            if (stream != originalStream) {
+                cacheEntry.release();
+            }
         }
     }
 
     @Override
     public int read() throws IOException {
+        FSDataInputStream stream = getStream();
         try {
-            return getStream().read();
+            return stream.read();
         } finally {
-            finish();
+            if (stream != originalStream) {
+                cacheEntry.release();

Review Comment:
   this if is a repeated in the code - can we have a method for it please?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to