[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16188026#comment-16188026 ]
ASF GitHub Bot commented on FLINK-7068: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r142129505 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java --- @@ -120,6 +125,76 @@ public BlobClient(InetSocketAddress serverAddress, Configuration clientConfig) t } } + /** + * Downloads the given BLOB from the given server and stores its contents to a (local) file. + * + * <p>Transient BLOB files are deleted after a successful copy of the server's data into the + * given <tt>localJarFile</tt>. + * + * @param jobId + * job ID the BLOB belongs to or <tt>null</tt> if job-unrelated + * @param blobKey + * BLOB key + * @param localJarFile + * the local file to write to + * @param serverAddress + * address of the server to download from + * @param blobClientConfig + * client configuration for the connection + * @param numFetchRetries + * number of retries before failing + * + * @throws IOException + * if an I/O error occurs during the download + */ + static void downloadFromBlobServer( + @Nullable JobID jobId, BlobKey blobKey, File localJarFile, + InetSocketAddress serverAddress, Configuration blobClientConfig, int numFetchRetries) + throws IOException { + + final byte[] buf = new byte[BUFFER_SIZE]; + LOG.info("Downloading {}/{} from {}", jobId, blobKey, serverAddress); + + // loop over retries + int attempt = 0; + while (true) { + try ( + final BlobClient bc = new BlobClient(serverAddress, blobClientConfig); + final InputStream is = bc.getInternal(jobId, blobKey); + final OutputStream os = new FileOutputStream(localJarFile) + ) { + while (true) { + final int read = is.read(buf); + if (read < 0) { + break; + } + os.write(buf, 0, read); + } + + return; + } + catch (Throwable t) { + String message = "Failed to fetch BLOB " + jobId + "/" + blobKey + " from " + serverAddress + + " and store it under " + localJarFile.getAbsolutePath(); + if (attempt < numFetchRetries) { + if (LOG.isDebugEnabled()) { + LOG.debug(message + " Retrying...", t); --- End diff -- Shouldn't this also be an error if the other log statement is an error as well? > change BlobService sub-classes for permanent and transient BLOBs > ---------------------------------------------------------------- > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network > Affects Versions: 1.4.0 > Reporter: Nico Kruber > Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)