zentol commented on a change in pull request #11963:
URL: https://github.com/apache/flink/pull/11963#discussion_r420132981



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
##########
@@ -208,28 +122,197 @@ public void shutdown() {
                        for (LibraryCacheEntry entry : cacheEntries.values()) {
                                entry.releaseClassLoader();
                        }
+
+                       cacheEntries.clear();
                }
        }
 
-       @Override
-       public boolean hasClassLoader(@Nonnull JobID jobId) {
-               synchronized (lockObject) {
-                       return cacheEntries.containsKey(jobId);
+       // 
--------------------------------------------------------------------------------------------
+
+       @FunctionalInterface
+       public interface ClassLoaderFactory {
+               URLClassLoader createClassLoader(URL[] libraryURLs);
+       }
+
+       private static final class DefaultClassLoaderFactory implements 
ClassLoaderFactory {
+
+               /** The resolve order to use when creating a {@link 
ClassLoader}. */
+               private final FlinkUserCodeClassLoaders.ResolveOrder 
classLoaderResolveOrder;
+
+               /**
+                * List of patterns for classes that should always be resolved 
from the parent ClassLoader,
+                * if possible.
+                */
+               private final String[] alwaysParentFirstPatterns;
+
+               private 
DefaultClassLoaderFactory(FlinkUserCodeClassLoaders.ResolveOrder 
classLoaderResolveOrder, String[] alwaysParentFirstPatterns) {
+                       this.classLoaderResolveOrder = classLoaderResolveOrder;
+                       this.alwaysParentFirstPatterns = 
alwaysParentFirstPatterns;
+               }
+
+               @Override
+               public URLClassLoader createClassLoader(URL[] libraryURLs) {
+                       return FlinkUserCodeClassLoaders.create(
+                               classLoaderResolveOrder,
+                               libraryURLs,
+                               
FlinkUserCodeClassLoaders.class.getClassLoader(),
+                               alwaysParentFirstPatterns);
                }
        }
 
+       public static ClassLoaderFactory 
defaultClassLoaderFactory(FlinkUserCodeClassLoaders.ResolveOrder 
classLoaderResolveOrder, String[] alwaysParentFirstPatterns) {
+               return new DefaultClassLoaderFactory(classLoaderResolveOrder, 
alwaysParentFirstPatterns);
+       }
+
        // 
--------------------------------------------------------------------------------------------
 
-       /**
-        * An entry in the per-job library cache. Tracks which execution 
attempts
-        * still reference the libraries. Once none reference it any more, the
-        * class loaders can be cleaned up.
-        */
-       private static class LibraryCacheEntry {
+       private final class LibraryCacheEntry {
+               private final JobID jobId;
+
+               @GuardedBy("lockObject")
+               private int referenceCount;
+
+               @GuardedBy("lockObject")
+               @Nullable
+               private ResolvedClassLoader resolvedClassLoader;
+
+               @GuardedBy("lockObject")
+               private boolean isReleased;
+
+               private LibraryCacheEntry(JobID jobId) {
+                       this.jobId = jobId;
+                       referenceCount = 0;
+                       this.resolvedClassLoader = null;
+                       this.isReleased = false;
+               }
+
+               private ClassLoader 
getOrResolveClassLoader(Collection<PermanentBlobKey> libraries, Collection<URL> 
classPaths) throws IOException {

Review comment:
       Since the classloader is bound to a slot, and the set of required 
libraries must be identical for all tasks, it seems odd to be setting up the 
classloader and doing the equality checks on each task submission.
   Could we not set this up directly when creating the JobManagerConnection 
(which I suppose would mean sending the required libraries as part of the slot 
request?)?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
##########
@@ -20,91 +20,79 @@
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-
-import javax.annotation.Nonnull;
 
 import java.io.IOException;
 import java.net.URL;
 import java.util.Collection;
 
+/**
+ * The LibraryCacheManager is responsible for creating and managing the user 
code
+ * class loaders.
+ *
+ * <p>In order to obtain a user code class loader, one first needs to obtain a
+ * {@link ClassLoaderLease} for a given {@link JobID}. At first, the {@link 
ClassLoaderLease}
+ * is unresolved. In order to obtain the user class loader one needs to 
resolve it
+ * by specifying the required jar files and class paths. The user code class 
loader
+ * for a job is valid as long as there exists a valid {@link 
ClassLoaderLease}. A
+ * {@link ClassLoaderLease} becomes invalid once it gets closed.
+ */
 public interface LibraryCacheManager {
 
        /**
-        * Returns the user code class loader associated with id.
+        * Registers a new class loader lease for the given jobId. The user
+        * code class loader for this job will be valid as long as there
+        * exists a valid lease for this job.
         *
-        * @param id identifying the job
-        * @return ClassLoader which can load the user code
+        * @param jobId jobId for which to register a new class loader lease
+        * @return a new class loader lease for the given job
         */
-       ClassLoader getClassLoader(JobID id);
+       ClassLoaderLease registerClassLoaderLease(JobID jobId);
 
        /**
-        * Registers a job with its required jar files and classpaths. The jar 
files are identified by
-        * their blob keys and downloaded for use by a {@link ClassLoader}.
-        *
-        * @param id job ID
-        * @param requiredJarFiles collection of blob keys identifying the 
required jar files
-        * @param requiredClasspaths collection of classpaths that are added to 
the user code class loader
-        *
-        * @throws IOException if any error occurs when retrieving the required 
jar files
-        *
-        * @see #unregisterJob(JobID) counterpart of this method
+        * Shuts the library cache manager down. Thereby it will close all open
+        * {@link ClassLoaderLease} and release all registered user code class
+        * loaders.
         */
-       void registerJob(JobID id, Collection<PermanentBlobKey> 
requiredJarFiles, Collection<URL> requiredClasspaths)
-               throws IOException;
+       void shutdown();
 
        /**
-        * Registers a job task execution with its required jar files and 
classpaths. The jar files are
-        * identified by their blob keys and downloaded for use by a {@link 
ClassLoader}.
-        *
-        * @param id job ID
-        * @param requiredJarFiles collection of blob keys identifying the 
required jar files
-        * @param requiredClasspaths collection of classpaths that are added to 
the user code class loader
-        *
-        * @throws IOException if any error occurs when retrieving the required 
jar files
-        *
-        * @see #unregisterTask(JobID, ExecutionAttemptID) counterpart of this 
method
+        * Handle to retrieve a user code class loader for the associated job.
         */
-       void registerTask(JobID id, ExecutionAttemptID execution, 
Collection<PermanentBlobKey> requiredJarFiles,
-               Collection<URL> requiredClasspaths) throws IOException;
+       interface ClassLoaderHandle {
 
-       /**
-        * Unregisters a job task execution from the library cache manager.
-        *
-        * <p><strong>Note:</strong> this is the counterpart of {@link 
#registerTask(JobID,
-        * ExecutionAttemptID, Collection, Collection)} and it will not remove 
any job added via
-        * {@link #registerJob(JobID, Collection, Collection)}!
-        *
-        * @param id job ID
-        *
-        * @see #registerTask(JobID, ExecutionAttemptID, Collection, 
Collection) counterpart of this method
-        */
-       void unregisterTask(JobID id, ExecutionAttemptID execution);
-
-       /**
-        * Unregisters a job from the library cache manager.
-        *
-        * <p><strong>Note:</strong> this is the counterpart of {@link 
#registerJob(JobID, Collection,
-        * Collection)} and it will not remove any job task execution added via 
{@link
-        * #registerTask(JobID, ExecutionAttemptID, Collection, Collection)}!
-        *
-        * @param id job ID
-        *
-        * @see #registerJob(JobID, Collection, Collection) counterpart of this 
method
-        */
-       void unregisterJob(JobID id);
+               /**
+                * Gets or resolves the user code class loader for the 
associated job.
+                *
+                * <p>In order to retrieve the user code class loader the caller
+                * has to specify the required jars and class paths. Upon 
calling this
+                * method first for a job, it will make sure that the required 
jars are
+                * present and potentially cache the created user code class 
loader.
+                * Every subsequent call to this method, will ensure that 
created
+                * user code class loader can fulfill the required jar files and
+                * class paths.
+                *
+                * @param requiredJarFiles requiredJarFiles the user code class 
loader needs to load
+                * @param requiredClasspaths requiredClasspaths the user code 
class loader needs to be started with
+                * @return the user code class loader fulfilling the 
requirements
+                * @throws IOException if the required jar files cannot be 
downloaded
+                * @throws IllegalStateException if the cached user code class 
loader does not fulfill the requirements
+                */
+               ClassLoader 
getOrResolveClassLoader(Collection<PermanentBlobKey> requiredJarFiles, 
Collection<URL> requiredClasspaths) throws IOException;
+       }
 
        /**
-        * Shutdown method which may release created class loaders.
+        * Lease which allows to signal when the user code class loader is no 
longer needed.
         */
-       void shutdown();
+       interface ClassLoaderLease extends ClassLoaderHandle {
 
-       /**
-        * True if the LibraryCacheManager has a user code class loader 
registered
-        * for the given job id.
-        *
-        * @param jobId identifying the job for which to check the class loader
-        * @return true if the user code class loader for the given job has 
been registered. Otherwise false.
-        */
-       boolean hasClassLoader(@Nonnull JobID jobId);
+               /**
+                * Closes the lease to the user code class loader for the 
associated job.
+                *
+                * <p>This method signals that the lease holder not longer 
needs the user
+                * code class loader for the associated job. Once all leases 
for a job a
+                * closed, the library cache manager is allowed to release the 
associated
+                * user code class loader.
+                */
+               void close();

Review comment:
       extend `AutoCloseable` instead?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
##########
@@ -116,21 +119,21 @@ public void testLibraryCacheManagerJobCleanup() throws 
IOException, InterruptedE
                        checkFileCountForJob(1, jobId2, server);
                        checkFileCountForJob(0, jobId2, cache);
 
-                       libCache.registerJob(jobId2, keys2, 
Collections.<URL>emptyList());
-                       ClassLoader classLoader2 = 
libCache.getClassLoader(jobId2);
+                       final LibraryCacheManager.ClassLoaderLease 
classLoaderLeaseJob2 = libCache.registerClassLoaderLease(jobId2);
+                       final ClassLoader classLoader2 = 
classLoaderLeaseJob2.getOrResolveClassLoader(keys2, Collections.emptyList());
                        assertNotEquals(classLoader1, classLoader2);
 
                        try {
-                               libCache.registerJob(jobId2, keys1, 
Collections.<URL>emptyList());
+                               
classLoaderLeaseJob2.getOrResolveClassLoader(keys1, 
Collections.<URL>emptyList());
                                fail("Should fail with an 
IllegalStateException");
                        }
                        catch (IllegalStateException e) {
                                // that's what we want
                        }
 
                        try {
-                               libCache.registerJob(
-                                       jobId2, keys2,
+                               classLoaderLeaseJob1.getOrResolveClassLoader(

Review comment:
       are we intentionally using the least for job 1 with the keys of job 2?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
##########
@@ -48,133 +48,46 @@
  * Provides facilities to download a set of libraries (typically JAR files) 
for a job from a
  * {@link PermanentBlobService} and create a class loader with references to 
them.
  */
+@ThreadSafe
 public class BlobLibraryCacheManager implements LibraryCacheManager {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(BlobLibraryCacheManager.class);
 
-       private static final ExecutionAttemptID JOB_ATTEMPT_ID = new 
ExecutionAttemptID(-1, -1);
-
        // 
--------------------------------------------------------------------------------------------
 
        /** The global lock to synchronize operations. */
        private final Object lockObject = new Object();
 
        /** Registered entries per job. */
+       @GuardedBy("lockObject")
        private final Map<JobID, LibraryCacheEntry> cacheEntries = new 
HashMap<>();
 
        /** The blob service to download libraries. */
+       @GuardedBy("lockObject")
        private final PermanentBlobService blobService;
 
-       /** The resolve order to use when creating a {@link ClassLoader}. */
-       private final FlinkUserCodeClassLoaders.ResolveOrder 
classLoaderResolveOrder;
-
-       /**
-        * List of patterns for classes that should always be resolved from the 
parent ClassLoader,
-        * if possible.
-        */
-       private final String[] alwaysParentFirstPatterns;
+       private final ClassLoaderFactory classLoaderFactory;
 
        // 
--------------------------------------------------------------------------------------------
 
        public BlobLibraryCacheManager(
                        PermanentBlobService blobService,
-                       FlinkUserCodeClassLoaders.ResolveOrder 
classLoaderResolveOrder,
-                       String[] alwaysParentFirstPatterns) {
+                       ClassLoaderFactory classLoaderFactory) {
                this.blobService = checkNotNull(blobService);
-               this.classLoaderResolveOrder = 
checkNotNull(classLoaderResolveOrder);
-               this.alwaysParentFirstPatterns = alwaysParentFirstPatterns;
+               this.classLoaderFactory = checkNotNull(classLoaderFactory);
        }
 
        @Override
-       public void registerJob(JobID id, Collection<PermanentBlobKey> 
requiredJarFiles, Collection<URL> requiredClasspaths)
-               throws IOException {
-               registerTask(id, JOB_ATTEMPT_ID, requiredJarFiles, 
requiredClasspaths);
-       }
-
-       @Override
-       public void registerTask(
-               JobID jobId,
-               ExecutionAttemptID task,
-               @Nullable Collection<PermanentBlobKey> requiredJarFiles,
-               @Nullable Collection<URL> requiredClasspaths) throws 
IOException {
-
-               checkNotNull(jobId, "The JobId must not be null.");
-               checkNotNull(task, "The task execution id must not be null.");
-
-               if (requiredJarFiles == null) {
-                       requiredJarFiles = Collections.emptySet();
-               }
-               if (requiredClasspaths == null) {
-                       requiredClasspaths = Collections.emptySet();
-               }
-
+       public ClassLoaderLease registerClassLoaderLease(JobID jobId) {
                synchronized (lockObject) {
-                       LibraryCacheEntry entry = cacheEntries.get(jobId);
+                       LibraryCacheEntry libraryCacheEntry = 
cacheEntries.get(jobId);

Review comment:
       use `computeIfAbsent`?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
##########
@@ -103,8 +107,8 @@ public void testRecoveryRegisterAndDownload() throws 
Exception {
                        cache = new PermanentBlobCache(config, 
blobStoreService, serverAddress[0]);
 
                        // Register uploaded libraries
-                       ExecutionAttemptID executionId = new 
ExecutionAttemptID();
-                       libServer[0].registerTask(jobId, executionId, keys, 
Collections.<URL>emptyList());
+                       final LibraryCacheManager.ClassLoaderLease 
classLoaderLease = libServer[0].registerClassLoaderLease(jobId);
+                       classLoaderLease.getOrResolveClassLoader(keys, 
Collections.emptyList());

Review comment:
       Are we doing this to download stuff into the `cache`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to