[ https://issues.apache.org/jira/browse/FLINK-12195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
tangshangwen updated FLINK-12195: --------------------------------- Description: We used Tencent COS(object storage is similar to s3) as defaultFS, and when we submitted the job, we ran into a YARN checking resource time mismatch that prevented the job from being submitted {code:java} 2019-04-15 14:45:47,683 DEBUG org.apache.hadoop.security.UserGroupInformation: PrivilegedActionException as:hadoop (auth:SIMPLE) cause:java.io.IOException: Resource cosn://xxx-xxx/user/hadoop/.flink/application_1555078596113_0014/logback.xml changed on src filesystem (expected 1555259286000, was 1555310742000 {code} I found that flink uses the lastModified of the local file, and why is it not the latest time for the remote file system? {code:java} LOG.debug("Copying from {} to {}", localSrcPath, dst); fs.copyFromLocalFile(false, true, localSrcPath, dst); // Note: If we used registerLocalResource(FileSystem, Path) here, we would access the remote // file once again which has problems with eventually consistent read-after-write file // systems. Instead, we decide to preserve the modification time at the remote // location because this and the size of the resource will be checked by YARN based on // the values we provide to #registerLocalResource() below. fs.setTimes(dst, localFile.lastModified(), -1); // now create the resource instance LocalResource resource = registerLocalResource(dst, localFile.length(), localFile.lastModified()); return Tuple2.of(dst, resource);{code} Maybe it should be {code:java} // now create the resource instance LocalResource resource = registerLocalResource(dst, localFile.length(), fs.getFileStatus(dst).getModificationTime());{code} was: We used Tencent cos(object storage is similar to s3) as defaultFS, and when we submitted the job, we ran into a YARN checking resource time mismatch that prevented the job from being submitted {code:java} 2019-04-15 14:45:47,683 DEBUG org.apache.hadoop.security.UserGroupInformation: PrivilegedActionException as:hadoop (auth:SIMPLE) cause:java.io.IOException: Resource cosn://xxx-xxx/user/hadoop/.flink/application_1555078596113_0014/logback.xml changed on src filesystem (expected 1555259286000, was 1555310742000 {code} I found that flink uses the lastModified of the local file, and why is it not the latest time for the remote file system? {code:java} LOG.debug("Copying from {} to {}", localSrcPath, dst); fs.copyFromLocalFile(false, true, localSrcPath, dst); // Note: If we used registerLocalResource(FileSystem, Path) here, we would access the remote // file once again which has problems with eventually consistent read-after-write file // systems. Instead, we decide to preserve the modification time at the remote // location because this and the size of the resource will be checked by YARN based on // the values we provide to #registerLocalResource() below. fs.setTimes(dst, localFile.lastModified(), -1); // now create the resource instance LocalResource resource = registerLocalResource(dst, localFile.length(), localFile.lastModified()); return Tuple2.of(dst, resource);{code} Maybe it should be {code:java} // now create the resource instance LocalResource resource = registerLocalResource(dst, localFile.length(), fs.getFileStatus(dst).getModificationTime());{code} > Incorrect resource time setting causes flink to fail to submit > -------------------------------------------------------------- > > Key: FLINK-12195 > URL: https://issues.apache.org/jira/browse/FLINK-12195 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN > Affects Versions: 1.6.3 > Reporter: tangshangwen > Priority: Major > > We used Tencent COS(object storage is similar to s3) as defaultFS, and when > we submitted the job, we ran into a YARN checking resource time mismatch that > prevented the job from being submitted > > {code:java} > 2019-04-15 14:45:47,683 DEBUG > org.apache.hadoop.security.UserGroupInformation: PrivilegedActionException > as:hadoop (auth:SIMPLE) cause:java.io.IOException: Resource > cosn://xxx-xxx/user/hadoop/.flink/application_1555078596113_0014/logback.xml > changed on src filesystem (expected 1555259286000, was 1555310742000 > {code} > I found that flink uses the lastModified of the local file, and why is it not > the latest time for the remote file system? > {code:java} > LOG.debug("Copying from {} to {}", localSrcPath, dst); > fs.copyFromLocalFile(false, true, localSrcPath, dst); > // Note: If we used registerLocalResource(FileSystem, Path) here, we would > access the remote > // file once again which has problems with eventually consistent > read-after-write file > // systems. Instead, we decide to preserve the modification time at the remote > // location because this and the size of the resource will be checked by YARN > based on > // the values we provide to #registerLocalResource() below. > fs.setTimes(dst, localFile.lastModified(), -1); > // now create the resource instance > LocalResource resource = registerLocalResource(dst, localFile.length(), > localFile.lastModified()); > return Tuple2.of(dst, resource);{code} > Maybe it should be > {code:java} > // now create the resource instance > LocalResource resource = registerLocalResource(dst, localFile.length(), > fs.getFileStatus(dst).getModificationTime());{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)