[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16173142#comment-16173142 ]
ASF GitHub Bot commented on FLINK-7068: --------------------------------------- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r139962735 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -389,95 +413,332 @@ public File getFile(JobID jobId, BlobKey key) throws IOException { * * @param jobId * ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated) - * @param requiredBlob + * @param blobKey * blob key associated with the requested file + * @param highlyAvailable + * whether to the requested file is highly available (HA) * * @return file referring to the local storage location of the BLOB * * @throws IOException * Thrown if the file retrieval failed. */ - private File getFileInternal(@Nullable JobID jobId, BlobKey requiredBlob) throws IOException { - checkArgument(requiredBlob != null, "BLOB key cannot be null."); + private File getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean highlyAvailable) throws IOException { + checkArgument(blobKey != null, "BLOB key cannot be null."); - final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, requiredBlob); + final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey); + readWriteLock.readLock().lock(); - if (localFile.exists()) { + try { + getFileInternal(jobId, blobKey, highlyAvailable, localFile); return localFile; + } finally { + readWriteLock.readLock().unlock(); } - else { + } + + /** + * Helper to retrieve the local path of a file associated with a job and a blob key. + * <p> + * The blob server looks the blob key up in its local storage. If the file exists, it is + * returned. If the file does not exist, it is retrieved from the HA blob store (if available) + * or a {@link FileNotFoundException} is thrown. + * <p> + * <strong>Assumes the read lock has already been acquired.</strong> + * + * @param jobId + * ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated) + * @param blobKey + * blob key associated with the requested file + * @param highlyAvailable + * whether to the requested file is highly available (HA) + * @param localFile + * (local) file where the blob is/should be stored + * + * @throws IOException + * Thrown if the file retrieval failed. + */ + void getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean highlyAvailable, File localFile) throws IOException { + // assume readWriteLock.readLock() was already locked (cannot really check that) + + if (localFile.exists()) { + return; + } else if (highlyAvailable) { + // Try the HA blob store + // first we have to release the read lock in order to acquire the write lock + readWriteLock.readLock().unlock(); + + // use a temporary file (thread-safe without locking) + File incomingFile = null; try { - // Try the blob store - blobStore.get(jobId, requiredBlob, localFile); + incomingFile = createTemporaryFilename(); + blobStore.get(jobId, blobKey, incomingFile); + + BlobUtils.moveTempFileToStore( + incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), LOG, null); --- End diff -- Agreed - it's not too nice. The cost of changing that would be another try-catch block duplication at every use of this method though, so all usages would have this instead: ``` writeLock.lock(); try { BlobUtils.moveTempFileToStore(incomingFile, jobId, blobKey, storageFile, LOG, blobStore); } finally { writeLock.unlock(); } ``` `BlobUtils` on the other hand is already quiet tightly coupled with the `BlobServer`/`Cache` classes anyway with mostly package-private methods for only those ... not quite sure what would be better and went this way. > change BlobService sub-classes for permanent and transient BLOBs > ---------------------------------------------------------------- > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network > Affects Versions: 1.4.0 > Reporter: Nico Kruber > Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)