[ 
https://issues.apache.org/jira/browse/FLINK-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17256814#comment-17256814
 ] 

JieFang.He edited comment on FLINK-19067 at 12/31/20, 3:36 AM:
---------------------------------------------------------------

I think the reason is that the jobFiles are upload to the dispatcher node,but 
the task get jobFiles from resource_manager node

When the job submit to the rest,the rest then submit it to dispatcher,the 
jobFiles are the put to dispatcher

Here is the log
{code:java}
2020-12-31 00:42:29,255 DEBUG [BLOB connection for /127.0.0.1:43576] 
[FileSystemBlobStore]: Copying from 
/tmp/blobStore-5fe6b952-fece-4590-b637-04f19d8121dd/job_22ea0e6f01a4d7667aa1077e9bffe759/blob_p-b8b09e59290d54dbc0bfd5c7672d424e7f2c5178-3c4b3b70d11b7ca1164a615928023109
 to 
/data2/zdh/flink/storageDir/default/blob/job_22ea0e6f01a4d7667aa1077e9bffe759/blob_p-b8b09e59290d54dbc0bfd5c7672d424e7f2c5178-3c4b3b70d11b7ca1164a615928023109.
{code}
and here is the code in JobSubmitHandler
{code:java}
CompletableFuture<Acknowledge> jobSubmissionFuture = 
finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, 
timeout));
{code}
the gateway is define in DefaultDispatcherResourceManagerComponentFactory
{code:java}
final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = 
new RpcGatewayRetriever<>(
   rpcService,
   DispatcherGateway.class,
   DispatcherId::fromUuid,
   10,
   Time.milliseconds(50L));
{code}
{code:java}
webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
   configuration,
   dispatcherGatewayRetriever,
   resourceManagerGatewayRetriever,
   blobServer,
   executor,
   metricFetcher,
   highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
   fatalErrorHandler);
{code}
 

But the Task get the jobFiles from resource_manager

The code of BlobClient.downloadFromBlobServer
{code:java}
static void downloadFromBlobServer(
      @Nullable JobID jobId,
      BlobKey blobKey,
      File localJarFile,
      InetSocketAddress serverAddress,
      Configuration blobClientConfig,
      int numFetchRetries) throws IOException {
    ......
      catch (Throwable t) {
         String message = "Failed to fetch BLOB " + jobId + "/" + blobKey + " 
from " + serverAddress +
            " and store it under " + localJarFile.getAbsolutePath();
{code}
The serverAddress is define in TaskExecutor
{code:java}
final InetSocketAddress blobServerAddress = new InetSocketAddress(
   clusterInformation.getBlobServerHostname(),
   clusterInformation.getBlobServerPort());

blobCacheService.setBlobServerAddress(blobServerAddress);
{code}
The clusterInformation define in ResourceManagerRegistrationListener
{code:java}
if (resourceManagerConnection == connection) {
   try {
      establishResourceManagerConnection(
         resourceManagerGateway,
         resourceManagerId,
         taskExecutorRegistrationId,
         clusterInformation);
{code}
Where ResourceManagerRegistrationListener used
{code:java}
resourceManagerConnection =
   new TaskExecutorToResourceManagerConnection(
      log,
      getRpcService(),
      taskManagerConfiguration.getRetryingRegistrationConfiguration(),
      resourceManagerAddress.getAddress(),
      resourceManagerAddress.getResourceManagerId(),
      getMainThreadExecutor(),
      new ResourceManagerRegistrationListener(),
      taskExecutorRegistration);
{code}
 

 


was (Author: hejiefang):
I think the reason is that the jobFiles are upload to the dispatcher node,but 
the task get jobFiles from resource_manager node

When the job submit to the rest,the rest then submit it to dispatcher,the 
jobFiles are the put to dispatcher

Here is the log

 
{code:java}
2020-12-31 00:42:29,255 DEBUG [BLOB connection for /127.0.0.1:43576] 
[FileSystemBlobStore]: Copying from 
/tmp/blobStore-5fe6b952-fece-4590-b637-04f19d8121dd/job_22ea0e6f01a4d7667aa1077e9bffe759/blob_p-b8b09e59290d54dbc0bfd5c7672d424e7f2c5178-3c4b3b70d11b7ca1164a615928023109
 to 
/data2/zdh/flink/storageDir/default/blob/job_22ea0e6f01a4d7667aa1077e9bffe759/blob_p-b8b09e59290d54dbc0bfd5c7672d424e7f2c5178-3c4b3b70d11b7ca1164a615928023109.
{code}
and here is the code in JobSubmitHandler

 

 
{code:java}
CompletableFuture<Acknowledge> jobSubmissionFuture = 
finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, 
timeout));
{code}
the gateway is define in DefaultDispatcherResourceManagerComponentFactory

 
{code:java}
final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = 
new RpcGatewayRetriever<>(
   rpcService,
   DispatcherGateway.class,
   DispatcherId::fromUuid,
   10,
   Time.milliseconds(50L));
{code}
 
{code:java}
webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
   configuration,
   dispatcherGatewayRetriever,
   resourceManagerGatewayRetriever,
   blobServer,
   executor,
   metricFetcher,
   highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
   fatalErrorHandler);
{code}
 

 

But the Task get the jobFiles from resource_manager

The code of BlobClient.downloadFromBlobServer

 
{code:java}
static void downloadFromBlobServer(
      @Nullable JobID jobId,
      BlobKey blobKey,
      File localJarFile,
      InetSocketAddress serverAddress,
      Configuration blobClientConfig,
      int numFetchRetries) throws IOException {
    ......
      catch (Throwable t) {
         String message = "Failed to fetch BLOB " + jobId + "/" + blobKey + " 
from " + serverAddress +
            " and store it under " + localJarFile.getAbsolutePath();
{code}
The serverAddress is define in TaskExecutor

 

 
{code:java}
final InetSocketAddress blobServerAddress = new InetSocketAddress(
   clusterInformation.getBlobServerHostname(),
   clusterInformation.getBlobServerPort());

blobCacheService.setBlobServerAddress(blobServerAddress);
{code}
The clusterInformation define in ResourceManagerRegistrationListener

 
{code:java}
if (resourceManagerConnection == connection) {
   try {
      establishResourceManagerConnection(
         resourceManagerGateway,
         resourceManagerId,
         taskExecutorRegistrationId,
         clusterInformation);
{code}
Where ResourceManagerRegistrationListener used

 
{code:java}
resourceManagerConnection =
   new TaskExecutorToResourceManagerConnection(
      log,
      getRpcService(),
      taskManagerConfiguration.getRetryingRegistrationConfiguration(),
      resourceManagerAddress.getAddress(),
      resourceManagerAddress.getResourceManagerId(),
      getMainThreadExecutor(),
      new ResourceManagerRegistrationListener(),
      taskExecutorRegistration);
{code}
 

 

> FileNotFoundException when run flink examples
> ---------------------------------------------
>
>                 Key: FLINK-19067
>                 URL: https://issues.apache.org/jira/browse/FLINK-19067
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.11.1
>            Reporter: JieFang.He
>            Priority: Major
>         Attachments: flink-jobmanager-deployer-hejiefang01.log, 
> flink-jobmanager-deployer-hejiefang02.log, 
> flink-taskmanager-deployer-hejiefang01.log, 
> flink-taskmanager-deployer-hejiefang02.log
>
>
> 1、When run examples/batch/WordCount.jar,it will fail with the exception:
> Caused by: java.io.FileNotFoundException: 
> /data2/flink/storageDir/default/blob/job_d29414828f614d5466e239be4d3889ac/blob_p-a2ebe1c5aa160595f214b4bd0f39d80e42ee2e93-f458f1c12dc023e78d25f191de1d7c4b
>  (No such file or directory)
>  at java.io.FileInputStream.open0(Native Method)
>  at java.io.FileInputStream.open(FileInputStream.java:195)
>  at java.io.FileInputStream.<init>(FileInputStream.java:138)
>  at 
> org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
>  at 
> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:143)
>  at 
> org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:105)
>  at 
> org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:87)
>  at 
> org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:501)
>  at 
> org.apache.flink.runtime.blob.BlobServerConnection.get(BlobServerConnection.java:231)
>  at 
> org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:117)
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to