Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r125827511 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java --- @@ -477,97 +406,96 @@ else if (contentAddressable == CONTENT_ADDRESSABLE) { clientSocket.close(); } finally { - if (fos != null) { - try { - fos.close(); - } catch (Throwable t) { - LOG.warn("Cannot close stream to BLOB staging file", t); - } - } if (incomingFile != null) { - if (!incomingFile.delete()) { + if (!incomingFile.delete() && incomingFile.exists()) { LOG.warn("Cannot delete BLOB server staging file " + incomingFile.getAbsolutePath()); } } } } /** - * Handles an incoming DELETE request from a BLOB client. - * - * @param inputStream The input stream to read the request from. - * @param outputStream The output stream to write the response to. - * @throws java.io.IOException Thrown if an I/O error occurs while reading the request data from the input stream. + * Reads a full file from <tt>inputStream</tt> into <tt>incomingFile</tt> returning its checksum. + * + * @param inputStream + * stream to read from + * @param incomingFile + * file to write to + * @param buf + * An auxiliary buffer for data serialization/deserialization + * + * @return the received file's content hash as a BLOB key + * + * @throws IOException + * thrown if an I/O error occurs while reading/writing data from/to the respective streams */ - private void delete(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException { + private static BlobKey readFileFully( + final InputStream inputStream, final File incomingFile, final byte[] buf) + throws IOException { + MessageDigest md = BlobUtils.createMessageDigest(); + FileOutputStream fos = new FileOutputStream(incomingFile); try { - int type = inputStream.read(); - if (type < 0) { - throw new EOFException("Premature end of DELETE request"); - } - - if (type == CONTENT_ADDRESSABLE) { - BlobKey key = BlobKey.readFromInputStream(inputStream); - File blobFile = blobServer.getStorageLocation(key); - - writeLock.lock(); - - try { - // we should make the local and remote file deletion atomic, otherwise we might risk not - // removing the remote file in case of a concurrent put operation - if (blobFile.exists() && !blobFile.delete()) { - throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath()); - } - - blobStore.delete(key); - } finally { - writeLock.unlock(); + while (true) { + final int bytesExpected = readLength(inputStream); + if (bytesExpected == -1) { + // done + break; + } + if (bytesExpected > BUFFER_SIZE) { + throw new IOException( + "Unexpected number of incoming bytes: " + bytesExpected); } - } - else if (type == NAME_ADDRESSABLE) { - byte[] jidBytes = new byte[JobID.SIZE]; - readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID"); - JobID jobID = JobID.fromByteArray(jidBytes); - String key = readKey(buf, inputStream); + readFully(inputStream, buf, 0, bytesExpected, "buffer"); + fos.write(buf, 0, bytesExpected); - File blobFile = this.blobServer.getStorageLocation(jobID, key); + md.update(buf, 0, bytesExpected); + } + return new BlobKey(md.digest()); + } finally { --- End diff -- You're right - although I did not see any flush in the `FileOutputStream` itself, it may be needed by the underlying file system so that a write is only guaranteed complete after flushing (or closing) the file - I'll adapt this code just to be safe.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---