yuqi1129 commented on code in PR #7782:
URL: https://github.com/apache/gravitino/pull/7782#discussion_r2246830124
##########
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java:
##########
@@ -129,8 +129,75 @@ public class FilesetCatalogOperations extends
ManagedSchemaOperations
private boolean disableFSOps;
+ @VisibleForTesting ScheduledThreadPoolExecutor scheduler;
+ @VisibleForTesting Cache<FileSystemCacheKey, FileSystem> fileSystemCache;
+
FilesetCatalogOperations(EntityStore store) {
this.store = store;
+ scheduler =
+ new ScheduledThreadPoolExecutor(
+ 1,
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("file-system-cache-for-fileset" + "-%d")
+ .build());
+
+ this.fileSystemCache =
+ Caffeine.newBuilder()
+ .expireAfterAccess(1, TimeUnit.HOURS)
+ .removalListener(
+ (ignored, value, cause) -> {
+ try {
+ ((FileSystem) value).close();
+ } catch (IOException e) {
+ LOG.warn("Failed to close FileSystem instance in cache",
e);
+ }
+ })
+ .scheduler(Scheduler.forScheduledExecutorService(scheduler))
+ .build();
+ }
+
+ static class FileSystemCacheKey {
+ private final NameIdentifier ident;
+ private final Map<String, String> conf;
+ private final String pathPrefix;
+ private final String currentUser;
Review Comment:
> Another question: might a closed FS be retrieved from the cache?
The connection for a cache instance may timeout due to long-term idleness,
however, the instance itself will not close until we manually close it. When an
instance with a closed connection is reused, I believe it will try to connect
to the server side automatically.
##########
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java:
##########
@@ -1248,10 +1302,59 @@ private boolean hasCallerContext() {
&& !CallerContext.CallerContextHolder.get().context().isEmpty();
}
- private boolean checkSingleFile(Fileset fileset, String locationName) {
+ @VisibleForTesting
+ FileSystem getFileSystemWithCache(
+ Path path, Map<String, String> conf, NameIdentifier identifier) {
+ String pathString = path.toString();
+ // extract the prefix of the path to use as the cache key
+ String prefix = extractPrefix(pathString);
+ return fileSystemCache.get(
+ new FileSystemCacheKey(identifier, conf, prefix),
+ cacheKey -> {
+ try {
+ return getFileSystem(path, conf);
+ } catch (IOException e) {
+ throw new GravitinoRuntimeException(
+ e, "Failed to get FileSystem for fileset: %s", identifier);
+ }
+ });
+ }
+
+ /**
+ * Extracts the prefix from the given path. The prefix is defined as the
scheme and the first
+ * slash after the scheme.
+ *
+ * @param path the path from which to extract the prefix.
+ * @return the prefix of the path, or an empty string if the path is null or
empty.
+ */
+ @VisibleForTesting
+ String extractPrefix(String path) {
+ if (path == null || path.isEmpty()) {
+ return "";
+ }
+
+ if (path.startsWith("file:/")) {
+ return "file:///";
+ }
+
+ int protocolEnd = path.indexOf("://");
+ if (protocolEnd == -1) {
+ return path;
+ }
+
+ int firstSlash = path.indexOf('/', protocolEnd + 3);
Review Comment:
sure
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]