[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16188030#comment-16188030 ]
ASF GitHub Bot commented on FLINK-7068: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r142136764 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java --- @@ -407,4 +419,72 @@ static void closeSilently(Socket socket, Logger LOG) { private BlobUtils() { throw new RuntimeException(); } + + /** + * Moves the temporary <tt>incomingFile</tt> to its permanent location where it is available for + * use. + * + * @param incomingFile + * temporary file created during transfer + * @param jobId + * ID of the job this blob belongs to or <tt>null</tt> if job-unrelated + * @param blobKey + * BLOB key identifying the file + * @param storageFile + * (local) file where the blob is/should be stored + * @param writeLock + * lock to acquire before doing the move + * @param log + * logger for debug information + * @param blobStore + * HA store (or <tt>null</tt> if unavailable) + * + * @throws IOException + * thrown if an I/O error occurs while moving the file or uploading it to the HA store + */ + static void moveTempFileToStore( + File incomingFile, @Nullable JobID jobId, BlobKey blobKey, File storageFile, + Lock writeLock, Logger log, @Nullable BlobStore blobStore) throws IOException { + + writeLock.lock(); + + try { + // first check whether the file already exists + if (!storageFile.exists()) { + try { + // only move the file if it does not yet exist + Files.move(incomingFile.toPath(), storageFile.toPath()); + + incomingFile = null; + + } catch (FileAlreadyExistsException ignored) { + log.warn("Detected concurrent file modifications. This should only happen if multiple" + + "BlobServer use the same storage directory."); + // we cannot be sure at this point whether the file has already been uploaded to the blob + // store or not. Even if the blobStore might shortly be in an inconsistent state, we have --- End diff -- "we have to" > change BlobService sub-classes for permanent and transient BLOBs > ---------------------------------------------------------------- > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network > Affects Versions: 1.4.0 > Reporter: Nico Kruber > Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)