Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r176661729 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java --- @@ -267,101 +208,60 @@ private static Thread createShutdownHook(final FileCache cache, final Logger log // ------------------------------------------------------------------------ /** - * Asynchronous file copy process. - */ - private static class CopyProcess implements Callable<Path> { - - private final Path filePath; - private final Path cachedPath; - private boolean executable; - - public CopyProcess(DistributedCacheEntry e, Path cachedPath) { - this.filePath = new Path(e.filePath); - this.executable = e.isExecutable; - this.cachedPath = cachedPath; - } - - @Override - public Path call() throws IOException { - // let exceptions propagate. we can retrieve them later from - // the future and report them upon access to the result - copy(filePath, cachedPath, this.executable); - return cachedPath; - } - } - - /** - * If no task is using this file after 5 seconds, clear it. + * Asynchronous file copy process from blob server. */ - private static class DeleteProcess implements Runnable { + private static class CopyFromBlobProcess implements Callable<Path> { - private final Object lock; - private final Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries; - - private final String name; + private final PermanentBlobKey blobKey; + private final Path target; + private final boolean directory; + private final boolean executable; private final JobID jobID; + private final PermanentBlobService blobService; - public DeleteProcess(Object lock, Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries, - String name, JobID jobID) { - this.lock = lock; - this.entries = entries; - this.name = name; - this.jobID = jobID; + CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, PermanentBlobService blobService, Path target) { + try { + this.executable = e.isExecutable; + this.directory = e.isZipped; + this.jobID = jobID; + this.blobService = blobService; + this.blobKey = InstantiationUtil.deserializeObject(e.blobKey, Thread.currentThread().getContextClassLoader()); + this.target = target; + } catch (Exception ex) { + throw new RuntimeException(ex); + } } @Override - public void run() { - try { - synchronized (lock) { - Map<String, Tuple4<Integer, File, Path, Future<Path>>> jobEntries = entries.get(jobID); - - if (jobEntries != null) { - Tuple4<Integer, File, Path, Future<Path>> entry = jobEntries.get(name); - - if (entry != null) { - int count = entry.f0; - if (count > 1) { - // multiple references still - entry.f0 = count - 1; - } - else { - // we remove the last reference - jobEntries.remove(name); - if (jobEntries.isEmpty()) { - entries.remove(jobID); - } - - // abort the copy - entry.f3.cancel(true); - - // remove the file - File file = new File(entry.f2.toString()); - if (file.exists()) { - if (file.isDirectory()) { - FileUtils.deleteDirectory(file); - } - else if (!file.delete()) { - LOG.error("Could not delete locally cached file " + file.getAbsolutePath()); - } - } - - // remove the job wide temp directory, if it is now empty - File parent = entry.f1; - if (parent.isDirectory()) { - String[] children = parent.list(); - if (children == null || children.length == 0) { - //noinspection ResultOfMethodCallIgnored - parent.delete(); - } - } + public Path call() throws IOException { + final File file = blobService.getFile(jobID, blobKey); + + if (directory) { + try (ZipInputStream zis = new ZipInputStream(new FileInputStream(file))) { + ZipEntry entry; + while ((entry = zis.getNextEntry()) != null) { --- End diff -- the entry should be closed by calling `zis.closeEntry()`.
---