
Till Rohrmann commented on FLINK-12195:

The reason why we use the local modification timestamp is to solve the problem 
with some S3 implementations which give you only eventual consistency. See 
FLINK-8801 for more information. Since we cannot simply ask `fs.getFileStatus` 
I would suggest to close your PR and join the discussion in FLINK-8801.

> Incorrect resource time setting causes flink to fail to submit
> --------------------------------------------------------------
>                 Key: FLINK-12195
>                 URL: https://issues.apache.org/jira/browse/FLINK-12195
>             Project: Flink
>          Issue Type: Bug
>          Components: Deployment / YARN, FileSystems
>    Affects Versions: 1.6.3
>            Reporter: tangshangwen
>            Assignee: tangshangwen
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
> We used Tencent COS(object storage is similar to s3) as defaultFS, and when 
> we submitted the job, we ran into a YARN checking resource time mismatch that 
> prevented the job from being submitted
> {code:java}
> 2019-04-15 14:45:47,683 DEBUG 
> org.apache.hadoop.security.UserGroupInformation: PrivilegedActionException 
> as:hadoop (auth:SIMPLE) cause:java.io.IOException: Resource 
> cosn://xxx-xxx/user/hadoop/.flink/application_1555078596113_0014/logback.xml 
> changed on src filesystem (expected 1555259286000, was 1555310742000 
> {code}
> I found that flink uses the lastModified of the local file, and why is it not 
> the latest time for the remote file system?
> {code:java}
> LOG.debug("Copying from {} to {}", localSrcPath, dst);
> fs.copyFromLocalFile(false, true, localSrcPath, dst);
> // Note: If we used registerLocalResource(FileSystem, Path) here, we would 
> access the remote
> // file once again which has problems with eventually consistent 
> read-after-write file
> // systems. Instead, we decide to preserve the modification time at the remote
> // location because this and the size of the resource will be checked by YARN 
> based on
> // the values we provide to #registerLocalResource() below.
> fs.setTimes(dst, localFile.lastModified(), -1);
> // now create the resource instance
> LocalResource resource = registerLocalResource(dst, localFile.length(), 
> localFile.lastModified());
> return Tuple2.of(dst, resource);{code}
> Maybe it should be
> {code:java}
> // now create the resource instance
> LocalResource resource = registerLocalResource(dst, localFile.length(), 
> fs.getFileStatus(dst).getModificationTime());{code}

This message was sent by Atlassian JIRA

Reply via email to