github-actions[bot] commented on code in PR #63677:
URL: https://github.com/apache/doris/pull/63677#discussion_r3323500514
##########
fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java:
##########
@@ -23,48 +23,186 @@
import org.apache.doris.filesystem.FileSystem;
import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.Objects;
import java.util.OptionalLong;
+import java.util.function.Function;
public class FileSystemCache {
private static final Logger LOG =
LogManager.getLogger(FileSystemCache.class);
- private final LoadingCache<FileSystemCacheKey, FileSystem> fileSystemCache;
+ private final LoadingCache<FileSystemCacheKey, FileSystemHolder>
fileSystemCache;
+ private final Function<FileSystemCacheKey, FileSystem> loader;
public FileSystemCache() {
+ this(
+ Config.max_remote_file_system_cache_num,
+
OptionalLong.of(Config.external_cache_expire_time_seconds_after_access),
+ FileSystemCache::loadFileSystem);
+ }
+
+ @VisibleForTesting
+ FileSystemCache(long maxSize, OptionalLong expireAfterAccessSec,
Function<FileSystemCacheKey, FileSystem> loader) {
+ this.loader = Objects.requireNonNull(loader, "loader");
+ if (maxSize == 0) {
+ fileSystemCache = null;
+ return;
+ }
// no need to set refreshAfterWrite, because the FileSystem is created
once and never changed
CacheFactory fsCacheFactory = new CacheFactory(
-
OptionalLong.of(Config.external_cache_expire_time_seconds_after_access),
+ expireAfterAccessSec,
OptionalLong.empty(),
- Config.max_remote_file_system_cache_num,
+ maxSize,
false,
null);
- fileSystemCache =
fsCacheFactory.buildCacheWithSyncRemovalListener(this::loadFileSystem, (key,
fs, cause) -> {
- if (fs != null) {
- try {
- fs.close();
- } catch (IOException e) {
- LOG.warn("Failed to close evicted FileSystem for key: {}",
key, e);
- }
- }
- });
+ fileSystemCache = fsCacheFactory.buildCacheWithSyncRemovalListener(
+ key -> new FileSystemHolder(key, loader.apply(key)), (key,
holder, cause) -> {
+ if (holder != null) {
+ holder.markEvicted();
+ }
+ });
}
- private FileSystem loadFileSystem(FileSystemCacheKey key) {
+ private static FileSystem loadFileSystem(FileSystemCacheKey key) {
try {
return FileSystemFactory.getFileSystem(key.properties);
} catch (IOException e) {
throw new RuntimeException("Failed to create filesystem for key: "
+ key, e);
}
}
- public FileSystem getFileSystem(FileSystemCacheKey key) {
- return fileSystemCache.get(key);
+ public FileSystemLease getFileSystem(FileSystemCacheKey key) {
+ if (fileSystemCache == null) {
+ return new DirectFileSystemLease(key, loader.apply(key));
+ }
+ while (true) {
+ FileSystemHolder holder = fileSystemCache.get(key);
+ FileSystemLease lease = holder.acquire();
+ if (lease != null) {
+ return lease;
+ }
+ fileSystemCache.asMap().remove(key, holder);
+ }
+ }
+
+ @VisibleForTesting
+ void cleanUp() {
+ if (fileSystemCache != null) {
+ fileSystemCache.cleanUp();
+ }
+ }
+
+ private static final class FileSystemHolder {
+ private final FileSystemCacheKey key;
+ private final FileSystem fileSystem;
+ private int referenceCount = 0;
+ private boolean evicted = false;
+ private boolean closed = false;
+
+ private FileSystemHolder(FileSystemCacheKey key, FileSystem
fileSystem) {
+ this.key = Objects.requireNonNull(key, "key");
+ this.fileSystem = Objects.requireNonNull(fileSystem, "fileSystem");
+ }
+
+ private synchronized FileSystemLease acquire() {
+ if (evicted || closed) {
+ return null;
+ }
+ referenceCount++;
+ return new CachedFileSystemLease(this);
+ }
+
+ private synchronized void release() {
+ Preconditions.checkState(referenceCount > 0, "FileSystem lease has
been released more than once");
+ referenceCount--;
+ closeIfIdle();
+ }
+
+ private synchronized void markEvicted() {
+ evicted = true;
+ closeIfIdle();
+ }
+
+ private void closeIfIdle() {
+ if (!evicted || referenceCount != 0 || closed) {
+ return;
+ }
+ closed = true;
+ try {
+ fileSystem.close();
+ } catch (IOException e) {
+ LOG.warn("Failed to close evicted FileSystem for key: {}",
key, e);
+ }
+ }
+
+ private FileSystem fileSystem() {
+ return fileSystem;
+ }
+ }
+
+ public interface FileSystemLease extends AutoCloseable {
+ FileSystem fileSystem();
+
+ @Override
+ void close();
+ }
+
+ private static final class CachedFileSystemLease implements
FileSystemLease {
+ private final FileSystemHolder holder;
+ private boolean closed = false;
+
+ private CachedFileSystemLease(FileSystemHolder holder) {
+ this.holder = holder;
+ }
+
+ @Override
+ public FileSystem fileSystem() {
+ return holder.fileSystem();
+ }
+
+ @Override
+ public void close() {
+ if (closed) {
+ return;
Review Comment:
`close()` is public and the implementation is trying to be idempotent, but
the `closed` guard is a plain unsynchronized boolean. If the same lease is
closed concurrently, both callers can observe `closed == false`; for
`CachedFileSystemLease` that calls `holder.release()` twice, tripping the
`referenceCount > 0` precondition or corrupting the holder lifecycle, and
`DirectFileSystemLease` has the same double-close pattern below. Please make
the guard atomic/synchronized, for example with
`AtomicBoolean.compareAndSet(false, true)` around the release/close body.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]