[ https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134688#comment-16134688 ]
ASF GitHub Bot commented on FLINK-7057: --------------------------------------- Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r134142624 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java --- @@ -108,11 +139,63 @@ public BlobCache( this.numFetchRetries = 0; } + // Initializing the clean up task + this.cleanupTimer = new Timer(true); + + cleanupInterval = blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000; + this.cleanupTimer.schedule(this, cleanupInterval, cleanupInterval); + // Add shutdown hook to delete storage directory shutdownHook = BlobUtils.addShutdownHook(this, LOG); } /** + * Registers use of job-related BLOBs. + * <p> + * Using any other method to access BLOBs, e.g. {@link #getFile}, is only valid within calls + * to {@link #registerJob(JobID)} and {@link #releaseJob(JobID)}. + * + * @param jobId + * ID of the job this blob belongs to + * + * @see #releaseJob(JobID) + */ + public void registerJob(JobID jobId) { + synchronized (jobRefCounters) { + RefCount ref = jobRefCounters.get(jobId); + if (ref == null) { + ref = new RefCount(); + jobRefCounters.put(jobId, ref); + } + ++ref.references; + } + } + + /** + * Unregisters use of job-related BLOBs and allow them to be released. + * + * @param jobId + * ID of the job this blob belongs to + * + * @see #registerJob(JobID) + */ + public void releaseJob(JobID jobId) { + synchronized (jobRefCounters) { + RefCount ref = jobRefCounters.get(jobId); + + if (ref == null) { + LOG.warn("improper use of releaseJob() without a matching number of registerJob() calls"); --- End diff -- Including jobId would help troubleshooting. > move BLOB ref-counting from LibraryCacheManager to BlobCache > ------------------------------------------------------------ > > Key: FLINK-7057 > URL: https://issues.apache.org/jira/browse/FLINK-7057 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network > Affects Versions: 1.4.0 > Reporter: Nico Kruber > Assignee: Nico Kruber > Fix For: 1.4.0 > > > Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR > files managed by it. Instead, we want the {{BlobCache}} to do that itself for > all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}} > level but rather per job. Therefore, the cleanup process should be adapted, > too. -- This message was sent by Atlassian JIRA (v6.4.14#64029)