This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 7b5c80ca3d [#8251][#8249] fix(fileset-catalog): Fix OOM problem for 
fileset catalog (#8252)
7b5c80ca3d is described below

commit 7b5c80ca3d56686c40e5118b51815af063914a81
Author: Mini Yu <[email protected]>
AuthorDate: Mon Sep 1 20:34:33 2025 +0800

    [#8251][#8249] fix(fileset-catalog): Fix OOM problem for fileset catalog 
(#8252)
    
    ### What changes were proposed in this pull request?
    
    1. Clear thread locals and daemon thread for cusom class loader to let
    GC free it.
    2. Fix tiemstamp can be 0 for azure file system.
    
    ### Why are the changes needed?
    
    They are bugs.
    
    Fix: #8251
    Fix: #8249
    
    ### Does this PR introduce _any_ user-facing change?
    
    N/A.
    
    ### How was this patch tested?
    
    Test locally.
---
 bundles/azure-bundle/build.gradle.kts              |   7 +-
 bundles/gcp-bundle/build.gradle.kts                |   5 +-
 .../utils/ClassLoaderResourceCleanerUtils.java     | 303 +++++++++++++++++++++
 .../fileset/SecureFilesetCatalogOperations.java    |   3 +
 .../org/apache/gravitino/dto/file/FileInfoDTO.java |   3 +-
 5 files changed, 318 insertions(+), 3 deletions(-)

diff --git a/bundles/azure-bundle/build.gradle.kts 
b/bundles/azure-bundle/build.gradle.kts
index 088b01a407..6ff704ea28 100644
--- a/bundles/azure-bundle/build.gradle.kts
+++ b/bundles/azure-bundle/build.gradle.kts
@@ -25,7 +25,12 @@ plugins {
 }
 
 dependencies {
-  implementation(project(":bundles:azure"))
+  implementation(project(":bundles:azure")) {
+    // There is already a dependency on commons-logging v1.2 in hadoop-azure, 
so exclude the one
+    // from the bundle.
+    exclude(group = "commons-logging", module = "commons-logging")
+  }
+
   implementation(libs.hadoop3.abs)
   implementation(libs.hadoop3.client.api)
   implementation(libs.hadoop3.client.runtime)
diff --git a/bundles/gcp-bundle/build.gradle.kts 
b/bundles/gcp-bundle/build.gradle.kts
index 1f0fc7ec23..df3d976b8f 100644
--- a/bundles/gcp-bundle/build.gradle.kts
+++ b/bundles/gcp-bundle/build.gradle.kts
@@ -25,7 +25,10 @@ plugins {
 }
 
 dependencies {
-  implementation(project(":bundles:gcp"))
+  implementation(project(":bundles:gcp")) {
+    // There is already a dependency on commons-logging v1.2 in hadoop-gcs, so 
exclude the one.
+    exclude(group = "commons-logging", module = "commons-logging")
+  }
   implementation(libs.hadoop3.client.api)
   implementation(libs.hadoop3.client.runtime)
   implementation(libs.hadoop3.gcs)
diff --git 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/utils/ClassLoaderResourceCleanerUtils.java
 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/utils/ClassLoaderResourceCleanerUtils.java
new file mode 100644
index 0000000000..3601351e36
--- /dev/null
+++ 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/utils/ClassLoaderResourceCleanerUtils.java
@@ -0,0 +1,303 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.utils;
+
+import java.lang.reflect.Field;
+import java.util.IdentityHashMap;
+import java.util.Timer;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.commons.lang3.reflect.MethodUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to clean up resources related to a specific class loader to 
prevent memory leaks.
+ * Gravitino will create a new class loader for each catalog and release it 
when there exist any
+ * changes to the catalog. So, it's important to clean up resources related to 
the class loader to
+ * prevent memory leaks.
+ */
+public class ClassLoaderResourceCleanerUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ClassLoaderResourceCleanerUtils.class);
+
+  private ClassLoaderResourceCleanerUtils() {}
+
+  /**
+   * Close all resources related to the given class loader to prevent memory 
leaks.
+   *
+   * @param classLoader the classloader to be closed
+   */
+  public static void closeClassLoaderResource(ClassLoader classLoader) {
+    boolean testEnv = System.getenv("GRAVITINO_TEST") != null;
+    if (testEnv) {
+      // In test environment, we do not need to clean up class loader related 
stuff
+      return;
+    }
+
+    // Clear statics threads in FileSystem and close all FileSystem instances.
+    executeAndCatch(
+        ClassLoaderResourceCleanerUtils::closeStatsDataClearerInFileSystem, 
classLoader);
+
+    // Stop all threads with the current class loader and clear their 
threadLocal variables for
+    // jetty threads that are loaded by the current class loader.
+    // For example, thread local `threadData` in 
FileSystem#StatisticsDataCleaner is created
+    // within jetty thread with the current class loader. However, there are 
clear by
+    // `catalog.close` in ForkJoinPool in CaffeineCache, in this case, the 
thread local variable
+    // will not be cleared, so we need to clear them manually here.
+    executeAndCatch(
+        
ClassLoaderResourceCleanerUtils::stopThreadsAndClearThreadLocalVariables, 
classLoader);
+
+    // Release the LogFactory for the classloader, each classloader has its 
own LogFactory
+    // instance.
+    
executeAndCatch(ClassLoaderResourceCleanerUtils::releaseLogFactoryInCommonLogging,
 classLoader);
+
+    executeAndCatch(ClassLoaderResourceCleanerUtils::closeResourceInAWS, 
classLoader);
+
+    executeAndCatch(ClassLoaderResourceCleanerUtils::closeResourceInGCP, 
classLoader);
+
+    executeAndCatch(ClassLoaderResourceCleanerUtils::closeResourceInAzure, 
classLoader);
+
+    executeAndCatch(ClassLoaderResourceCleanerUtils::clearShutdownHooks, 
classLoader);
+  }
+
+  /**
+   * Close the stats data clearer thread in Hadoop FileSystem to prevent 
memory leaks when using
+   *
+   * @param targetClassLoader the classloader where Hadoop FileSystem is loaded
+   */
+  private static void closeStatsDataClearerInFileSystem(ClassLoader 
targetClassLoader)
+      throws Exception {
+    Class<?> fileSystemClass =
+        Class.forName("org.apache.hadoop.fs.FileSystem", true, 
targetClassLoader);
+    MethodUtils.invokeStaticMethod(fileSystemClass, "closeAll");
+
+    Class<?> mutableQuantilesClass =
+        Class.forName("org.apache.hadoop.metrics2.lib.MutableQuantiles", true, 
targetClassLoader);
+    Class<?> statisticsClass =
+        Class.forName("org.apache.hadoop.fs.FileSystem$Statistics", true, 
targetClassLoader);
+
+    ScheduledExecutorService scheduler =
+        (ScheduledExecutorService)
+            FieldUtils.readStaticField(mutableQuantilesClass, "scheduler", 
true);
+    scheduler.shutdownNow();
+    Field statisticsCleanerField = FieldUtils.getField(statisticsClass, 
"STATS_DATA_CLEANER", true);
+    Object statisticsCleaner = statisticsCleanerField.get(null);
+    if (statisticsCleaner != null) {
+      ((Thread) statisticsCleaner).interrupt();
+      ((Thread) statisticsCleaner).setContextClassLoader(null);
+      ((Thread) statisticsCleaner).join();
+    }
+  }
+
+  /**
+   * Stop all threads that are using the target class loader and clear thread 
local variables to
+   * prevent memory leaks.
+   *
+   * <pre>
+   * This method aims to:
+   * 1. Stop all threads that are using the target class loader.
+   * 2. Clear thread local variables in all threads that are using the target 
class loader. some thread
+   * local variables are loaded in thread jetty-webserver-* threads, which are 
long-lived threads and
+   * will not be stopped when the catalog is closed.
+   * </pre>
+   */
+  private static void stopThreadsAndClearThreadLocalVariables(ClassLoader 
classLoader) {
+    Thread[] threads = getAllThreads();
+    for (Thread thread : threads) {
+      // First clear thread local variables
+      clearThreadLocalMap(thread, classLoader);
+      // Close all threads that are using the FilesetCatalogOperations class 
loader
+      if (runningWithClassLoader(thread, classLoader)) {
+        LOG.info("Interrupting thread: {}", thread.getName());
+        thread.setContextClassLoader(null);
+        thread.interrupt();
+        try {
+          thread.join(500);
+        } catch (InterruptedException e) {
+          LOG.warn("Failed to join thread: {}", thread.getName(), e);
+        }
+      }
+    }
+  }
+
+  private static boolean runningWithClassLoader(Thread thread, ClassLoader 
targetClassLoader) {
+    return thread != null && thread.getContextClassLoader() == 
targetClassLoader;
+  }
+
+  private static Thread[] getAllThreads() {
+    ThreadGroup rootGroup = Thread.currentThread().getThreadGroup();
+    ThreadGroup parentGroup;
+    while ((parentGroup = rootGroup.getParent()) != null) {
+      rootGroup = parentGroup;
+    }
+
+    Thread[] threads = new Thread[rootGroup.activeCount()];
+    while (rootGroup.enumerate(threads, true) == threads.length) {
+      threads = new Thread[threads.length * 2];
+    }
+    return threads;
+  }
+
+  private static void clearThreadLocalMap(Thread thread, ClassLoader 
targetClassLoader) {
+    if (thread == null || 
!thread.getName().startsWith("Gravitino-webserver-")) {
+      return;
+    }
+
+    try {
+      Field threadLocalsField = Thread.class.getDeclaredField("threadLocals");
+      threadLocalsField.setAccessible(true);
+      Object threadLocalMap = threadLocalsField.get(thread);
+
+      if (threadLocalMap != null) {
+        Class<?> tlmClass = 
Class.forName("java.lang.ThreadLocal$ThreadLocalMap");
+        Field tableField = tlmClass.getDeclaredField("table");
+        tableField.setAccessible(true);
+        Object[] table = (Object[]) tableField.get(threadLocalMap);
+
+        for (Object entry : table) {
+          if (entry != null) {
+            Object value = FieldUtils.readField(entry, "value", true);
+            if (value != null
+                && value.getClass().getClassLoader() != null
+                && value.getClass().getClassLoader() == targetClassLoader) {
+              LOG.info(
+                  "Cleaning up thread local {} for thread {} with custom class 
loader",
+                  value,
+                  thread.getName());
+              FieldUtils.writeField(entry, "value", null, true);
+            }
+          }
+        }
+      }
+    } catch (Exception e) {
+      LOG.warn("Failed to clean up thread locals for thread {}", 
thread.getName(), e);
+    }
+  }
+
+  /**
+   * Clear shutdown hooks registered by the target class loader to prevent 
memory leaks.
+   *
+   * <p>All shutdown hooks are run with the system class loader, so we need to 
manually clear the
+   * shutdown hooks registered by the target class loader.
+   *
+   * @param targetClassLoader the classloader where the shutdown hooks are 
registered.
+   */
+  private static void clearShutdownHooks(ClassLoader targetClassLoader) throws 
Exception {
+    Class<?> shutdownHooks = 
Class.forName("java.lang.ApplicationShutdownHooks");
+    IdentityHashMap<Thread, Thread> hooks =
+        (IdentityHashMap<Thread, Thread>) 
FieldUtils.readStaticField(shutdownHooks, "hooks", true);
+
+    hooks
+        .entrySet()
+        .removeIf(
+            entry -> {
+              Thread thread = entry.getKey();
+              return thread.getContextClassLoader() == targetClassLoader;
+            });
+  }
+
+  /**
+   * Release the LogFactory for the target class loader to prevent memory 
leaks.
+   *
+   * @param currentClassLoader the classloader where the commons-logging is 
loaded.
+   */
+  private static void releaseLogFactoryInCommonLogging(ClassLoader 
currentClassLoader)
+      throws Exception {
+    // Release the LogFactory for the FilesetCatalogOperations class loader
+    Class<?> logFactoryClass =
+        Class.forName("org.apache.commons.logging.LogFactory", true, 
currentClassLoader);
+    MethodUtils.invokeStaticMethod(logFactoryClass, "release", 
currentClassLoader);
+  }
+
+  /**
+   * Close the AWS SDK metrics MBean to prevent memory leaks when using AWS S3.
+   *
+   * @param classLoader the classloader where AWS SDK is loaded
+   */
+  private static void closeResourceInAWS(ClassLoader classLoader) throws 
Exception {
+    // For Aws SDK metrics, unregister the metric admin MBean
+    Class<?> awsSdkMetricsClass =
+        Class.forName("com.amazonaws.metrics.AwsSdkMetrics", true, 
classLoader);
+    MethodUtils.invokeStaticMethod(awsSdkMetricsClass, 
"unregisterMetricAdminMBean");
+  }
+
+  private static void closeResourceInGCP(ClassLoader classLoader) throws 
Exception {
+    // For GCS
+    Class<?> relocatedLogFactory =
+        Class.forName(
+            
"org.apache.gravitino.gcp.shaded.org.apache.commons.logging.LogFactory",
+            true,
+            classLoader);
+    MethodUtils.invokeStaticMethod(relocatedLogFactory, "release", 
classLoader);
+  }
+
+  /**
+   * Close the timer in AbfsClientThrottlingAnalyzer to prevent memory leaks 
when using Azure Blob
+   * File System.
+   *
+   * <p>Timer is a daemon thread, so it won't prevent the JVM from shutting 
down, but it will
+   * prevent the class loader from being garbage collected.
+   *
+   * @param classLoader the classloader where Azure Blob File System is loaded
+   */
+  private static void closeResourceInAzure(ClassLoader classLoader) throws 
Exception {
+    // Clear timer in AbfsClientThrottlingAnalyzer
+    Class<?> abfsClientThrottlingInterceptClass =
+        Class.forName(
+            
"org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept",
+            true,
+            classLoader);
+    Object abfsClientThrottlingIntercept =
+        FieldUtils.readStaticField(abfsClientThrottlingInterceptClass, 
"singleton", true);
+
+    Object readThrottler =
+        FieldUtils.readField(abfsClientThrottlingIntercept, "readThrottler", 
true);
+    Object writeThrottler =
+        FieldUtils.readField(abfsClientThrottlingIntercept, "writeThrottler", 
true);
+
+    Timer readTimer = (Timer) FieldUtils.readField(readThrottler, "timer", 
true);
+    readTimer.cancel();
+    Timer writeTimer = (Timer) FieldUtils.readField(writeThrottler, "timer", 
true);
+    writeTimer.cancel();
+
+    // Release the LogFactory for the Azure shaded commons logging which has 
been relocated
+    // by the Azure SDK
+    Class<?> relocatedLogFactory =
+        Class.forName(
+            
"org.apache.gravitino.azure.shaded.org.apache.commons.logging.LogFactory",
+            true,
+            classLoader);
+    MethodUtils.invokeStaticMethod(relocatedLogFactory, "release", 
classLoader);
+  }
+
+  @FunctionalInterface
+  private interface ThrowableConsumer<T> {
+    void accept(T t) throws Exception;
+  }
+
+  private static <T> void executeAndCatch(ThrowableConsumer<T> consumer, T 
value) {
+    try {
+      consumer.accept(value);
+    } catch (Exception e) {
+      LOG.warn("Failed to execute consumer: ", e);
+    }
+  }
+}
diff --git 
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/SecureFilesetCatalogOperations.java
 
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/SecureFilesetCatalogOperations.java
index e1ca543190..bdc7f48735 100644
--- 
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/SecureFilesetCatalogOperations.java
+++ 
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/SecureFilesetCatalogOperations.java
@@ -65,6 +65,7 @@ import org.apache.gravitino.file.FilesetCatalog;
 import org.apache.gravitino.file.FilesetChange;
 import org.apache.gravitino.meta.FilesetEntity;
 import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.utils.ClassLoaderResourceCleanerUtils;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.PrincipalUtils;
 import org.slf4j.Logger;
@@ -271,6 +272,8 @@ public class SecureFilesetCatalogOperations
     catalogUserContext.close();
 
     UserContext.cleanAllUserContext();
+
+    
ClassLoaderResourceCleanerUtils.closeClassLoaderResource(this.getClass().getClassLoader());
   }
 
   @Override
diff --git 
a/common/src/main/java/org/apache/gravitino/dto/file/FileInfoDTO.java 
b/common/src/main/java/org/apache/gravitino/dto/file/FileInfoDTO.java
index c826fa14f4..8e4720af2a 100644
--- a/common/src/main/java/org/apache/gravitino/dto/file/FileInfoDTO.java
+++ b/common/src/main/java/org/apache/gravitino/dto/file/FileInfoDTO.java
@@ -162,7 +162,8 @@ public class FileInfoDTO implements FileInfo {
     public FileInfoDTO build() {
       Preconditions.checkArgument(StringUtils.isNotBlank(name), "name cannot 
be null or empty");
       Preconditions.checkArgument(size >= 0, "size cannot be negative");
-      Preconditions.checkArgument(lastModified > 0, "lastModified must be a 
valid timestamp");
+      // In Azure Blob Storage, it can be 0 for newly created files.
+      Preconditions.checkArgument(lastModified >= 0, "lastModified must be a 
valid timestamp");
       Preconditions.checkArgument(StringUtils.isNotBlank(path), "path cannot 
be null or empty");
 
       return new FileInfoDTO(name, isDir, size, lastModified, path);

Reply via email to