[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16188033#comment-16188033 ]
ASF GitHub Bot commented on FLINK-7068: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r142137151 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java --- @@ -64,39 +65,60 @@ public FileSystemBlobStore(FileSystem fileSystem, String storagePath) throws IOE // - Put ------------------------------------------------------------------ @Override - public void put(File localFile, JobID jobId, BlobKey blobKey) throws IOException { - put(localFile, BlobUtils.getStorageLocationPath(basePath, jobId, blobKey)); + public boolean put(File localFile, JobID jobId, BlobKey blobKey) throws IOException { + return put(localFile, BlobUtils.getStorageLocationPath(basePath, jobId, blobKey)); } - private void put(File fromFile, String toBlobPath) throws IOException { + private boolean put(File fromFile, String toBlobPath) throws IOException { try (OutputStream os = fileSystem.create(new Path(toBlobPath), FileSystem.WriteMode.OVERWRITE)) { LOG.debug("Copying from {} to {}.", fromFile, toBlobPath); Files.copy(fromFile, os); } + return true; } // - Get ------------------------------------------------------------------ @Override - public void get(JobID jobId, BlobKey blobKey, File localFile) throws IOException { - get(BlobUtils.getStorageLocationPath(basePath, jobId, blobKey), localFile); + public boolean get(JobID jobId, BlobKey blobKey, File localFile) throws IOException { + return get(BlobUtils.getStorageLocationPath(basePath, jobId, blobKey), localFile, blobKey); } - private void get(String fromBlobPath, File toFile) throws IOException { + private boolean get(String fromBlobPath, File toFile, BlobKey blobKey) throws IOException { checkNotNull(fromBlobPath, "Blob path"); checkNotNull(toFile, "File"); + checkNotNull(blobKey, "Blob key"); if (!toFile.exists() && !toFile.createNewFile()) { throw new IOException("Failed to create target file to copy to"); } final Path fromPath = new Path(fromBlobPath); + MessageDigest md = BlobUtils.createMessageDigest(); + + final int buffSize = 4096; // like IOUtils#BLOCKSIZE, for chunked file copying boolean success = false; try (InputStream is = fileSystem.open(fromPath); FileOutputStream fos = new FileOutputStream(toFile)) { LOG.debug("Copying from {} to {}.", fromBlobPath, toFile); - IOUtils.copyBytes(is, fos); // closes the streams + + // not using IOUtils.copyBytes(is, fos) here to be able to create a hash on-the-fly + final byte[] buf = new byte[buffSize]; + int bytesRead = is.read(buf); + while (bytesRead >= 0) { + fos.write(buf, 0, bytesRead); + md.update(buf, 0, bytesRead); + + bytesRead = is.read(buf); + } + + // verify that file contents are correct + final byte[] computedKey = md.digest(); + if (!Arrays.equals(computedKey, blobKey.getHash())) { + throw new IOException("Detected data corruption during transfer"); + } --- End diff -- Nice addition :-) > change BlobService sub-classes for permanent and transient BLOBs > ---------------------------------------------------------------- > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network > Affects Versions: 1.4.0 > Reporter: Nico Kruber > Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)