[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16188030#comment-16188030
 ] 

ASF GitHub Bot commented on FLINK-7068:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r142136764
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---
    @@ -407,4 +419,72 @@ static void closeSilently(Socket socket, Logger LOG) {
        private BlobUtils() {
                throw new RuntimeException();
        }
    +
    +   /**
    +    * Moves the temporary <tt>incomingFile</tt> to its permanent location 
where it is available for
    +    * use.
    +    *
    +    * @param incomingFile
    +    *              temporary file created during transfer
    +    * @param jobId
    +    *              ID of the job this blob belongs to or <tt>null</tt> if 
job-unrelated
    +    * @param blobKey
    +    *              BLOB key identifying the file
    +    * @param storageFile
    +    *      (local) file where the blob is/should be stored
    +    * @param writeLock
    +    *      lock to acquire before doing the move
    +    * @param log
    +    *      logger for debug information
    +    * @param blobStore
    +    *      HA store (or <tt>null</tt> if unavailable)
    +    *
    +    * @throws IOException
    +    *              thrown if an I/O error occurs while moving the file or 
uploading it to the HA store
    +    */
    +   static void moveTempFileToStore(
    +                   File incomingFile, @Nullable JobID jobId, BlobKey 
blobKey, File storageFile,
    +                   Lock writeLock, Logger log, @Nullable BlobStore 
blobStore) throws IOException {
    +
    +           writeLock.lock();
    +
    +           try {
    +                   // first check whether the file already exists
    +                   if (!storageFile.exists()) {
    +                           try {
    +                                   // only move the file if it does not 
yet exist
    +                                   Files.move(incomingFile.toPath(), 
storageFile.toPath());
    +
    +                                   incomingFile = null;
    +
    +                           } catch (FileAlreadyExistsException ignored) {
    +                                   log.warn("Detected concurrent file 
modifications. This should only happen if multiple" +
    +                                           "BlobServer use the same 
storage directory.");
    +                                   // we cannot be sure at this point 
whether the file has already been uploaded to the blob
    +                                   // store or not. Even if the blobStore 
might shortly be in an inconsistent state, we have
    --- End diff --
    
    "we have to"


> change BlobService sub-classes for permanent and transient BLOBs
> ----------------------------------------------------------------
>
>                 Key: FLINK-7068
>                 URL: https://issues.apache.org/jira/browse/FLINK-7068
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination, Network
>    Affects Versions: 1.4.0
>            Reporter: Nico Kruber
>            Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to