[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16188033#comment-16188033
 ] 

ASF GitHub Bot commented on FLINK-7068:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r142137151
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
 ---
    @@ -64,39 +65,60 @@ public FileSystemBlobStore(FileSystem fileSystem, 
String storagePath) throws IOE
        // - Put 
------------------------------------------------------------------
     
        @Override
    -   public void put(File localFile, JobID jobId, BlobKey blobKey) throws 
IOException {
    -           put(localFile, BlobUtils.getStorageLocationPath(basePath, 
jobId, blobKey));
    +   public boolean put(File localFile, JobID jobId, BlobKey blobKey) throws 
IOException {
    +           return put(localFile, 
BlobUtils.getStorageLocationPath(basePath, jobId, blobKey));
        }
     
    -   private void put(File fromFile, String toBlobPath) throws IOException {
    +   private boolean put(File fromFile, String toBlobPath) throws 
IOException {
                try (OutputStream os = fileSystem.create(new Path(toBlobPath), 
FileSystem.WriteMode.OVERWRITE)) {
                        LOG.debug("Copying from {} to {}.", fromFile, 
toBlobPath);
                        Files.copy(fromFile, os);
                }
    +           return true;
        }
     
        // - Get 
------------------------------------------------------------------
     
        @Override
    -   public void get(JobID jobId, BlobKey blobKey, File localFile) throws 
IOException {
    -           get(BlobUtils.getStorageLocationPath(basePath, jobId, blobKey), 
localFile);
    +   public boolean get(JobID jobId, BlobKey blobKey, File localFile) throws 
IOException {
    +           return get(BlobUtils.getStorageLocationPath(basePath, jobId, 
blobKey), localFile, blobKey);
        }
     
    -   private void get(String fromBlobPath, File toFile) throws IOException {
    +   private boolean get(String fromBlobPath, File toFile, BlobKey blobKey) 
throws IOException {
                checkNotNull(fromBlobPath, "Blob path");
                checkNotNull(toFile, "File");
    +           checkNotNull(blobKey, "Blob key");
     
                if (!toFile.exists() && !toFile.createNewFile()) {
                        throw new IOException("Failed to create target file to 
copy to");
                }
     
                final Path fromPath = new Path(fromBlobPath);
    +           MessageDigest md = BlobUtils.createMessageDigest();
    +
    +           final int buffSize = 4096; // like IOUtils#BLOCKSIZE, for 
chunked file copying
     
                boolean success = false;
                try (InputStream is = fileSystem.open(fromPath);
                        FileOutputStream fos = new FileOutputStream(toFile)) {
                        LOG.debug("Copying from {} to {}.", fromBlobPath, 
toFile);
    -                   IOUtils.copyBytes(is, fos); // closes the streams
    +
    +                   // not using IOUtils.copyBytes(is, fos) here to be able 
to create a hash on-the-fly
    +                   final byte[] buf = new byte[buffSize];
    +                   int bytesRead = is.read(buf);
    +                   while (bytesRead >= 0) {
    +                           fos.write(buf, 0, bytesRead);
    +                           md.update(buf, 0, bytesRead);
    +
    +                           bytesRead = is.read(buf);
    +                   }
    +
    +                   // verify that file contents are correct
    +                   final byte[] computedKey = md.digest();
    +                   if (!Arrays.equals(computedKey, blobKey.getHash())) {
    +                           throw new IOException("Detected data corruption 
during transfer");
    +                   }
    --- End diff --
    
    Nice addition :-)


> change BlobService sub-classes for permanent and transient BLOBs
> ----------------------------------------------------------------
>
>                 Key: FLINK-7068
>                 URL: https://issues.apache.org/jira/browse/FLINK-7068
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination, Network
>    Affects Versions: 1.4.0
>            Reporter: Nico Kruber
>            Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to