tillrohrmann commented on a change in pull request #11963:
URL: https://github.com/apache/flink/pull/11963#discussion_r420651791



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
##########
@@ -208,28 +122,197 @@ public void shutdown() {
                        for (LibraryCacheEntry entry : cacheEntries.values()) {
                                entry.releaseClassLoader();
                        }
+
+                       cacheEntries.clear();
                }
        }
 
-       @Override
-       public boolean hasClassLoader(@Nonnull JobID jobId) {
-               synchronized (lockObject) {
-                       return cacheEntries.containsKey(jobId);
+       // 
--------------------------------------------------------------------------------------------
+
+       @FunctionalInterface
+       public interface ClassLoaderFactory {
+               URLClassLoader createClassLoader(URL[] libraryURLs);
+       }
+
+       private static final class DefaultClassLoaderFactory implements 
ClassLoaderFactory {
+
+               /** The resolve order to use when creating a {@link 
ClassLoader}. */
+               private final FlinkUserCodeClassLoaders.ResolveOrder 
classLoaderResolveOrder;
+
+               /**
+                * List of patterns for classes that should always be resolved 
from the parent ClassLoader,
+                * if possible.
+                */
+               private final String[] alwaysParentFirstPatterns;
+
+               private 
DefaultClassLoaderFactory(FlinkUserCodeClassLoaders.ResolveOrder 
classLoaderResolveOrder, String[] alwaysParentFirstPatterns) {
+                       this.classLoaderResolveOrder = classLoaderResolveOrder;
+                       this.alwaysParentFirstPatterns = 
alwaysParentFirstPatterns;
+               }
+
+               @Override
+               public URLClassLoader createClassLoader(URL[] libraryURLs) {
+                       return FlinkUserCodeClassLoaders.create(
+                               classLoaderResolveOrder,
+                               libraryURLs,
+                               
FlinkUserCodeClassLoaders.class.getClassLoader(),
+                               alwaysParentFirstPatterns);
                }
        }
 
+       public static ClassLoaderFactory 
defaultClassLoaderFactory(FlinkUserCodeClassLoaders.ResolveOrder 
classLoaderResolveOrder, String[] alwaysParentFirstPatterns) {
+               return new DefaultClassLoaderFactory(classLoaderResolveOrder, 
alwaysParentFirstPatterns);
+       }
+
        // 
--------------------------------------------------------------------------------------------
 
-       /**
-        * An entry in the per-job library cache. Tracks which execution 
attempts
-        * still reference the libraries. Once none reference it any more, the
-        * class loaders can be cleaned up.
-        */
-       private static class LibraryCacheEntry {
+       private final class LibraryCacheEntry {
+               private final JobID jobId;
+
+               @GuardedBy("lockObject")
+               private int referenceCount;
+
+               @GuardedBy("lockObject")
+               @Nullable
+               private ResolvedClassLoader resolvedClassLoader;
+
+               @GuardedBy("lockObject")
+               private boolean isReleased;
+
+               private LibraryCacheEntry(JobID jobId) {
+                       this.jobId = jobId;
+                       referenceCount = 0;
+                       this.resolvedClassLoader = null;
+                       this.isReleased = false;
+               }
+
+               private ClassLoader 
getOrResolveClassLoader(Collection<PermanentBlobKey> libraries, Collection<URL> 
classPaths) throws IOException {

Review comment:
       API-wise `Tasks` are not required to request the same user code class 
loader. Theoretically, every `Task` could request a slightly different user 
code class loader. At the moment this is not supported but the current design 
does not block this.
   
   Moreover, this would be a bigger protocol change which I would like to avoid 
at this point.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to