Zakelly commented on code in PR #26237: URL: https://github.com/apache/flink/pull/26237#discussion_r1977016840
########## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/FileBasedCache.java: ########## @@ -123,28 +206,185 @@ public void delete(Path path) { } } + // ----------------------------------------------------------------------- + // Overriding methods of {@link DoubleListLru} to provide thread-safe. + // ----------------------------------------------------------------------- + @Override - FileCacheEntry internalGet(String key, FileCacheEntry value) { - if (metricGroup != null) { - if (value != null) { - hitCounter.inc(); - } else { - missCounter.inc(); + public FileCacheEntry get(String key, boolean affectOrder) { + synchronized (this) { + return super.get(key, affectOrder); + } + } + + @Override + public void addFirst(String key, FileCacheEntry value) { + synchronized (this) { + super.addFirst(key, value); + } + } + + @Override + public void addSecond(String key, FileCacheEntry value) { + synchronized (this) { + super.addSecond(key, value); + } + } + + @Override + public FileCacheEntry remove(String key) { + synchronized (this) { + return super.remove(key); + } + } + + /** Directly insert in cache when restoring. */ + public void registerInCache(Path originalPath, long size) { + Path cachePath = getCachePath(originalPath); + FileCacheEntry fileCacheEntry = new FileCacheEntry(this, originalPath, cachePath, size); + // We want the new registered cache to load ASAP, so assign a initial access count. + fileCacheEntry.accessCountInColdLink = Math.max(0, accessBeforePromote - 2); + addSecond(cachePath.toString(), fileCacheEntry); + } + + void removeFile(FileCacheEntry entry) { + if (closed) { + entry.doRemoveFile(); + } else { + executorService.submit(entry::doRemoveFile); + } + } + + @Override + public void close() throws IOException { + if (closed) { + return; + } + closed = true; + executorService.shutdown(); + for (Tuple2<String, FileCacheEntry> entry : this) { + entry.f1.close(); + } + } + + // ----------------------------- + // Hook methods implementation + // ----------------------------- + + @Override + boolean isSafeToAddFirst(FileCacheEntry value) { + return cacheLimitPolicy.isSafeToAdd(value.entrySize); + } + + @Override + void newNodeCreated(FileCacheEntry value, DoubleListLru<String, FileCacheEntry>.Node n) { + value.setTouchFunction( + () -> { + // provide synchronized access to the LRU cache. + synchronized (FileBasedCache.this) { + accessNode(n); + } + }); + } + + @Override + void addedToFirst(FileCacheEntry value) { + LOG.trace("Cache entry {} added to first link.", value.cachePath); + while (cacheLimitPolicy.isOverflow(value.entrySize)) { + moveMiddleFront(); + } + cacheLimitPolicy.acquire(value.entrySize); + } + + @Override + void addedToSecond(FileCacheEntry value) { + LOG.trace("Cache entry {} added to second link.", value.cachePath); + value.secondAccessEpoch = (++secondAccessEpoch); + } + + @Override + void removedFromFirst(FileCacheEntry value) { + cacheLimitPolicy.release(value.entrySize); + value.close(); + } + + @Override + void removedFromSecond(FileCacheEntry value) { + value.close(); + } + + @Override + void movedToFirst(FileCacheEntry entry) { + // here we won't consider the cache limit policy. + // since there will be promotedToFirst called after this. + LOG.trace("Cache entry {} moved to first link.", entry.cachePath); + // trigger the loading + if (entry.switchStatus( + FileCacheEntry.EntryStatus.INVALID, FileCacheEntry.EntryStatus.LOADED)) { + // just a try + entry.loaded(); + if (loadBackCounter != null) { + loadBackCounter.inc(); } } - return value; + if (entry.switchStatus( + FileCacheEntry.EntryStatus.REMOVED, FileCacheEntry.EntryStatus.LOADING)) { + executorService.submit( + () -> { + if (entry.checkStatus(FileCacheEntry.EntryStatus.LOADING)) { + Path path = entry.load(); + if (path == null) { + entry.switchStatus( + FileCacheEntry.EntryStatus.LOADING, + FileCacheEntry.EntryStatus.REMOVED); + } else if (entry.switchStatus( + FileCacheEntry.EntryStatus.LOADING, + FileCacheEntry.EntryStatus.LOADED)) { + entry.loaded(); + if (loadBackCounter != null) { + loadBackCounter.inc(); + } + } else { + try { + path.getFileSystem().delete(path, false); + // delete the file + } catch (IOException e) { + } + } + } + }); + } } @Override - void internalInsert(String key, FileCacheEntry value) {} + void movedToSecond(FileCacheEntry value) { + // trigger the evicting + LOG.trace("Cache entry {} moved to second link.", value.cachePath); + cacheLimitPolicy.release(value.entrySize); Review Comment: The cache will be deleted in `value.invalidate()`, if the reference count reached 0. We'd better not wait for that, since we are on the task thread and it's better not block here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org