jerryshao commented on code in PR #8252:
URL: https://github.com/apache/gravitino/pull/8252#discussion_r2303078600


##########
catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/SecureFilesetCatalogOperations.java:
##########
@@ -271,6 +280,211 @@ public void close() throws IOException {
     catalogUserContext.close();
 
     UserContext.cleanAllUserContext();
+
+    boolean testEnv = System.getenv("GRAVITINO_TEST") != null;
+    if (testEnv) {
+      // In test environment, we do not need to clean up class loader related 
stuff
+      return;
+    }
+
+    try {
+      closeStatsDataClearerInFileSystem();
+
+      stopThreadsAndClearThreadLocal();
+
+      // Release the LogFactory for the FilesetCatalogOperations class loader
+      unregisterLogFactory();
+
+      closeResourceInAWS();
+
+      closeResourceInGCP();
+
+      closeResourceInAzure();
+
+      clearShutdownHooks();
+    } catch (Exception e) {
+      LOG.warn("Failed to clear resources(Thread, ThreadLocal variants) in the 
class loader", e);
+    }
+  }
+
+  private static void stopThreadsAndClearThreadLocal() {
+    // Clear all thread references to the ClosableHiveCatalog class loader and 
clear thread local
+    Thread[] threads = getAllThreads();
+    for (Thread thread : threads) {
+      // Clear thread local map for webserver threads in the current class 
loader. Why do we need
+      // this? Because the webserver threads are long-lived threads and will 
holds may thread
+      // local references to the current class loader. However, they can't be 
cleared as
+      // `Catalog.close` is called in the thread pool located in the Caffeine 
cache, which is not
+      // in the webserver threads. So we need to manually clear the thread 
local map for webserver
+      // threads.
+      clearThreadLocalMap(thread);
+
+      // Close all threads that are using the FilesetCatalogOperations class 
loader
+      if (runningWithCurrentClassLoader(thread)) {
+        LOG.info("Interrupting thread: {}", thread.getName());
+        thread.setContextClassLoader(null);
+        thread.interrupt();
+        try {
+          thread.join(5000);
+        } catch (InterruptedException e) {
+          LOG.warn("Failed to join thread: {}", thread.getName(), e);
+        }
+      }
+    }
+  }
+
+  private void unregisterLogFactory() {
+    try {
+      
LogFactory.release(SecureFilesetCatalogOperations.class.getClassLoader());
+    } catch (Exception e) {
+      LOG.warn("Failed to unregister LogFactory", e);
+    }
+  }
+
+  private static void closeResourceInAzure() {
+    // For Azure
+    try {
+      Class<?> relocatedLogFactory =
+          
Class.forName("org.apache.gravitino.azure.shaded.org.apache.commons.logging.LogFactory");
+      MethodUtils.invokeStaticMethod(
+          relocatedLogFactory, "release", 
SecureFilesetCatalogOperations.class.getClassLoader());
+
+      // Clear timer in AbfsClientThrottlingAnalyzer
+      Class<?> abfsClientThrottlingInterceptClass =
+          
Class.forName("org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept");
+      Object abfsClientThrottlingIntercept =
+          FieldUtils.readStaticField(abfsClientThrottlingInterceptClass, 
"singleton", true);
+
+      Object readThrottler =
+          FieldUtils.readField(abfsClientThrottlingIntercept, "readThrottler", 
true);
+      Object writeThrottler =
+          FieldUtils.readField(abfsClientThrottlingIntercept, 
"writeThrottler", true);
+
+      Timer readTimer = (Timer) FieldUtils.readField(readThrottler, "timer", 
true);
+      readTimer.cancel();
+      Timer writeTimer = (Timer) FieldUtils.readField(writeThrottler, "timer", 
true);
+      writeTimer.cancel();
+    } catch (Exception e) {
+      LOG.warn("Failed to handle Azure file system...", e);
+    }
+  }
+
+  private static void closeResourceInGCP() {
+    // For GCS
+    try {
+      Class<?> relocatedLogFactory =
+          
Class.forName("org.apache.gravitino.gcp.shaded.org.apache.commons.logging.LogFactory");
+      MethodUtils.invokeStaticMethod(
+          relocatedLogFactory, "release", 
SecureFilesetCatalogOperations.class.getClassLoader());
+    } catch (Exception e) {
+      LOG.warn("Failed to find GCS shaded LogFactory", e);
+    }
+  }
+
+  private static void closeResourceInAWS() {
+    // For Aws SDK metrics, unregister the metric admin MBean
+    try {
+      Class<?> methodUtilsClass = 
Class.forName("com.amazonaws.metrics.AwsSdkMetrics");
+      MethodUtils.invokeStaticMethod(methodUtilsClass, 
"unregisterMetricAdminMBean");
+    } catch (Exception e) {
+      LOG.warn("Failed to unregister AWS SDK metrics admin MBean", e);
+      // This is not critical, so we just log the warning
+    }
+  }
+
+  private static void clearThreadLocalMap(Thread thread) {
+    if (thread != null && thread.getName().startsWith("Gravitino-webserver-")) 
{
+      try {
+        Field threadLocalsField = 
Thread.class.getDeclaredField("threadLocals");
+        threadLocalsField.setAccessible(true);
+        Object threadLocalMap = threadLocalsField.get(thread);
+
+        if (threadLocalMap != null) {
+          Class<?> tlmClass = 
Class.forName("java.lang.ThreadLocal$ThreadLocalMap");
+          Field tableField = tlmClass.getDeclaredField("table");
+          tableField.setAccessible(true);
+          Object[] table = (Object[]) tableField.get(threadLocalMap);
+
+          for (Object entry : table) {
+            if (entry != null) {
+              Object value = FieldUtils.readField(entry, "value", true);
+              if (value != null
+                  && value.getClass().getClassLoader() != null
+                  && value.getClass().getClassLoader()
+                      == 
SecureFilesetCatalogOperations.class.getClassLoader()) {
+                LOG.info(
+                    "Cleaning up thread local {} for thread {} with custom 
class loader",
+                    value,
+                    thread.getName());
+                FieldUtils.writeField(entry, "value", null, true);
+              }
+            }
+          }
+        }
+      } catch (Exception e) {
+        LOG.warn("Failed to clean up thread locals for thread {}", 
thread.getName(), e);
+      }
+    }
+  }
+
+  private static void closeStatsDataClearerInFileSystem() {
+    try {
+      ScheduledExecutorService scheduler =
+          (ScheduledExecutorService)
+              FieldUtils.readStaticField(MutableQuantiles.class, "scheduler", 
true);
+      scheduler.shutdownNow();
+      Field statisticsCleanerField =
+          FieldUtils.getField(FileSystem.Statistics.class, 
"STATS_DATA_CLEANER", true);
+      Object statisticsCleaner = statisticsCleanerField.get(null);
+      if (statisticsCleaner != null) {
+        ((Thread) statisticsCleaner).interrupt();
+        ((Thread) statisticsCleaner).setContextClassLoader(null);
+        ((Thread) statisticsCleaner).join();
+      }
+
+      FileSystem.closeAll();
+    } catch (Exception e) {
+      LOG.warn("Failed to close stats data clearer in FileSystem", e);
+    }
+  }
+
+  private static Thread[] getAllThreads() {
+    ThreadGroup rootGroup = Thread.currentThread().getThreadGroup();
+    ThreadGroup parentGroup;
+    while ((parentGroup = rootGroup.getParent()) != null) {
+      rootGroup = parentGroup;
+    }
+
+    Thread[] threads = new Thread[rootGroup.activeCount()];
+    while (rootGroup.enumerate(threads, true) == threads.length) {
+      threads = new Thread[threads.length * 2];
+    }
+    return threads;
+  }
+
+  private static boolean runningWithCurrentClassLoader(Thread thread) {
+    return thread != null
+        && thread.getContextClassLoader() == 
FilesetCatalogOperations.class.getClassLoader();
+  }
+
+  public static void clearShutdownHooks() {
+    try {
+      Class<?> shutdownHooks = 
Class.forName("java.lang.ApplicationShutdownHooks");
+      IdentityHashMap<Thread, Thread> hooks =
+          (IdentityHashMap<Thread, Thread>)
+              FieldUtils.readStaticField(shutdownHooks, "hooks", true);
+
+      hooks
+          .entrySet()
+          .removeIf(
+              entry -> {
+                Thread thread = entry.getKey();
+                return thread.getContextClassLoader()
+                    == FilesetCatalogOperations.class.getClassLoader();
+              });
+    } catch (Exception e) {
+      LOG.warn("Failed to clean shutdown hooks", e);
+    }

Review Comment:
   The code above is very very brittle. You should separate this bunch of code 
from the main logic, and use another util class to encapsulate all the logic. 
That util class can be removed as a whole once we figure out a better way.
   
   Another thing is that you should add lots of comments to describe each line 
of reflection, otherwise, others cannot understand why you made such a change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to