Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4238#discussion_r125217930
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
    @@ -107,146 +133,268 @@ public BlobCache(
                        this.numFetchRetries = 0;
                }
     
    +           // Initializing the clean up task
    +           this.cleanupTimer = new Timer(true);
    +
    +           cleanupInterval = blobClientConfig.getLong(
    +                   ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
    +                   
ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
    +           this.cleanupTimer.schedule(this, cleanupInterval, 
cleanupInterval);
    +
                // Add shutdown hook to delete storage directory
                shutdownHook = BlobUtils.addShutdownHook(this, LOG);
        }
     
    +   @Override
    +   public void registerJob(JobID jobId) {
    +           synchronized (lockObject) {
    +                   RefCount ref = jobRefCounters.get(jobId);
    +                   if (ref == null) {
    +                           ref = new RefCount();
    +                           jobRefCounters.put(jobId, ref);
    +                   }
    +                   ++ref.references;
    +           }
    +   }
    +
    +   @Override
    +   public void releaseJob(JobID jobId) {
    +           synchronized (lockObject) {
    +                   RefCount ref = jobRefCounters.get(jobId);
    +
    +                   if (ref == null) {
    +                           LOG.warn("improper use of releaseJob() without 
a matching number of registerJob() calls");
    +                           return;
    +                   }
    +
    +                   --ref.references;
    +                   if (ref.references == 0) {
    +                           ref.keepUntil = System.currentTimeMillis() + 
cleanupInterval;
    +                   }
    +           }
    +   }
    +
    +   /**
    +    * Returns local copy of the (job-unrelated) file for the BLOB with the 
given key.
    +    * <p>
    +    * The method will first attempt to serve the BLOB from its local 
cache. If the BLOB is not in
    +    * the cache, the method will try to download it from this cache's BLOB 
server.
    +    *
    +    * @param key
    +    *              The key of the desired BLOB.
    +    *
    +    * @return file referring to the local storage location of the BLOB.
    +    *
    +    * @throws IOException
    +    *              Thrown if an I/O error occurs while downloading the 
BLOBs from the BLOB server.
    +    */
    +   @Override
    +   public File getFile(BlobKey key) throws IOException {
    +           return getFileInternal(null, key);
    +   }
    +
        /**
    -    * Returns the URL for the BLOB with the given key. The method will 
first attempt to serve
    -    * the BLOB from its local cache. If the BLOB is not in the cache, the 
method will try to download it
    -    * from this cache's BLOB server.
    +    * Returns local copy of the file for the BLOB with the given key.
    +    * <p>
    +    * The method will first attempt to serve the BLOB from its local 
cache. If the BLOB is not in
    +    * the cache, the method will try to download it from this cache's BLOB 
server.
         *
    -    * @param requiredBlob The key of the desired BLOB.
    -    * @return URL referring to the local storage location of the BLOB.
    -    * @throws IOException Thrown if an I/O error occurs while downloading 
the BLOBs from the BLOB server.
    +    * @param jobId
    +    *              ID of the job this blob belongs to
    +    * @param key
    +    *              The key of the desired BLOB.
    +    *
    +    * @return file referring to the local storage location of the BLOB.
    +    *
    +    * @throws IOException
    +    *              Thrown if an I/O error occurs while downloading the 
BLOBs from the BLOB server.
         */
    -   public URL getURL(final BlobKey requiredBlob) throws IOException {
    +   @Override
    +   public File getFile(@Nonnull JobID jobId, BlobKey key) throws 
IOException {
    --- End diff --
    
    I think so far the convention is that fields without an annotation are 
considered `@Nonnull` and only fields which are annotated with `@Nullable` can 
be `null`. Otherwise `key` should also be marked as `@Nonnull`.

---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to