[ https://issues.apache.org/jira/browse/FLINK-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17256814#comment-17256814 ]
JieFang.He edited comment on FLINK-19067 at 12/31/20, 3:36 AM: --------------------------------------------------------------- I think the reason is that the jobFiles are upload to the dispatcher node,but the task get jobFiles from resource_manager node When the job submit to the rest,the rest then submit it to dispatcher,the jobFiles are the put to dispatcher Here is the log {code:java} 2020-12-31 00:42:29,255 DEBUG [BLOB connection for /127.0.0.1:43576] [FileSystemBlobStore]: Copying from /tmp/blobStore-5fe6b952-fece-4590-b637-04f19d8121dd/job_22ea0e6f01a4d7667aa1077e9bffe759/blob_p-b8b09e59290d54dbc0bfd5c7672d424e7f2c5178-3c4b3b70d11b7ca1164a615928023109 to /data2/zdh/flink/storageDir/default/blob/job_22ea0e6f01a4d7667aa1077e9bffe759/blob_p-b8b09e59290d54dbc0bfd5c7672d424e7f2c5178-3c4b3b70d11b7ca1164a615928023109. {code} and here is the code in JobSubmitHandler {code:java} CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout)); {code} the gateway is define in DefaultDispatcherResourceManagerComponentFactory {code:java} final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>( rpcService, DispatcherGateway.class, DispatcherId::fromUuid, 10, Time.milliseconds(50L)); {code} {code:java} webMonitorEndpoint = restEndpointFactory.createRestEndpoint( configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever, blobServer, executor, metricFetcher, highAvailabilityServices.getClusterRestEndpointLeaderElectionService(), fatalErrorHandler); {code} But the Task get the jobFiles from resource_manager The code of BlobClient.downloadFromBlobServer {code:java} static void downloadFromBlobServer( @Nullable JobID jobId, BlobKey blobKey, File localJarFile, InetSocketAddress serverAddress, Configuration blobClientConfig, int numFetchRetries) throws IOException { ...... catch (Throwable t) { String message = "Failed to fetch BLOB " + jobId + "/" + blobKey + " from " + serverAddress + " and store it under " + localJarFile.getAbsolutePath(); {code} The serverAddress is define in TaskExecutor {code:java} final InetSocketAddress blobServerAddress = new InetSocketAddress( clusterInformation.getBlobServerHostname(), clusterInformation.getBlobServerPort()); blobCacheService.setBlobServerAddress(blobServerAddress); {code} The clusterInformation define in ResourceManagerRegistrationListener {code:java} if (resourceManagerConnection == connection) { try { establishResourceManagerConnection( resourceManagerGateway, resourceManagerId, taskExecutorRegistrationId, clusterInformation); {code} Where ResourceManagerRegistrationListener used {code:java} resourceManagerConnection = new TaskExecutorToResourceManagerConnection( log, getRpcService(), taskManagerConfiguration.getRetryingRegistrationConfiguration(), resourceManagerAddress.getAddress(), resourceManagerAddress.getResourceManagerId(), getMainThreadExecutor(), new ResourceManagerRegistrationListener(), taskExecutorRegistration); {code} was (Author: hejiefang): I think the reason is that the jobFiles are upload to the dispatcher node,but the task get jobFiles from resource_manager node When the job submit to the rest,the rest then submit it to dispatcher,the jobFiles are the put to dispatcher Here is the log {code:java} 2020-12-31 00:42:29,255 DEBUG [BLOB connection for /127.0.0.1:43576] [FileSystemBlobStore]: Copying from /tmp/blobStore-5fe6b952-fece-4590-b637-04f19d8121dd/job_22ea0e6f01a4d7667aa1077e9bffe759/blob_p-b8b09e59290d54dbc0bfd5c7672d424e7f2c5178-3c4b3b70d11b7ca1164a615928023109 to /data2/zdh/flink/storageDir/default/blob/job_22ea0e6f01a4d7667aa1077e9bffe759/blob_p-b8b09e59290d54dbc0bfd5c7672d424e7f2c5178-3c4b3b70d11b7ca1164a615928023109. {code} and here is the code in JobSubmitHandler {code:java} CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout)); {code} the gateway is define in DefaultDispatcherResourceManagerComponentFactory {code:java} final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>( rpcService, DispatcherGateway.class, DispatcherId::fromUuid, 10, Time.milliseconds(50L)); {code} {code:java} webMonitorEndpoint = restEndpointFactory.createRestEndpoint( configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever, blobServer, executor, metricFetcher, highAvailabilityServices.getClusterRestEndpointLeaderElectionService(), fatalErrorHandler); {code} But the Task get the jobFiles from resource_manager The code of BlobClient.downloadFromBlobServer {code:java} static void downloadFromBlobServer( @Nullable JobID jobId, BlobKey blobKey, File localJarFile, InetSocketAddress serverAddress, Configuration blobClientConfig, int numFetchRetries) throws IOException { ...... catch (Throwable t) { String message = "Failed to fetch BLOB " + jobId + "/" + blobKey + " from " + serverAddress + " and store it under " + localJarFile.getAbsolutePath(); {code} The serverAddress is define in TaskExecutor {code:java} final InetSocketAddress blobServerAddress = new InetSocketAddress( clusterInformation.getBlobServerHostname(), clusterInformation.getBlobServerPort()); blobCacheService.setBlobServerAddress(blobServerAddress); {code} The clusterInformation define in ResourceManagerRegistrationListener {code:java} if (resourceManagerConnection == connection) { try { establishResourceManagerConnection( resourceManagerGateway, resourceManagerId, taskExecutorRegistrationId, clusterInformation); {code} Where ResourceManagerRegistrationListener used {code:java} resourceManagerConnection = new TaskExecutorToResourceManagerConnection( log, getRpcService(), taskManagerConfiguration.getRetryingRegistrationConfiguration(), resourceManagerAddress.getAddress(), resourceManagerAddress.getResourceManagerId(), getMainThreadExecutor(), new ResourceManagerRegistrationListener(), taskExecutorRegistration); {code} > FileNotFoundException when run flink examples > --------------------------------------------- > > Key: FLINK-19067 > URL: https://issues.apache.org/jira/browse/FLINK-19067 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.11.1 > Reporter: JieFang.He > Priority: Major > Attachments: flink-jobmanager-deployer-hejiefang01.log, > flink-jobmanager-deployer-hejiefang02.log, > flink-taskmanager-deployer-hejiefang01.log, > flink-taskmanager-deployer-hejiefang02.log > > > 1、When run examples/batch/WordCount.jar,it will fail with the exception: > Caused by: java.io.FileNotFoundException: > /data2/flink/storageDir/default/blob/job_d29414828f614d5466e239be4d3889ac/blob_p-a2ebe1c5aa160595f214b4bd0f39d80e42ee2e93-f458f1c12dc023e78d25f191de1d7c4b > (No such file or directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.<init>(FileInputStream.java:138) > at > org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50) > at > org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:143) > at > org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:105) > at > org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:87) > at > org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:501) > at > org.apache.flink.runtime.blob.BlobServerConnection.get(BlobServerConnection.java:231) > at > org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:117) > > -- This message was sent by Atlassian Jira (v8.3.4#803005)