[ https://issues.apache.org/jira/browse/FLINK-5083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671880#comment-15671880 ]
Kostas Kloudas commented on FLINK-5083: --------------------------------------- Thanks for reporting it! There is a pending pull request here: https://github.com/apache/flink/pull/2797 that removes the deleting all together. The reason is that deletion of lingering files does not play well with rescaling, which re-shuffles the different state of individual tasks. Given that this PR is about to be merged, I suppose that this issue will be resolved. In addition I also have another PR for the RollingSink ready to open as soon as the aforementioned one gets merged. > Race condition in Rolling/Bucketing Sink pending files cleanup > -------------------------------------------------------------- > > Key: FLINK-5083 > URL: https://issues.apache.org/jira/browse/FLINK-5083 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.2.0, 1.1.3 > Reporter: Cliff Resnick > > In both Open and Restore methods there is code that: > 1. gets a recursive listing from baseDir > 2. iterates listing and name checks filenames based on subtaskIndex and other > criteria to find pending or in-progress files. If found delete. > The problem is that the recursive listing gets all files for all > subtaskIndexes. The race error is when #hasNext is called as part of the > iteration, a hidden existence check is made on the "next" file, which was > deleted by another task after-listing but pre-iteration, so an error is > thrown and the job fails. > Depending on the number of pending files, this condition may outlast the > number of job retries, each failing on a different file. > A solution would be use #listStatus instead. The hadoop FileSystem supports a > PathFilter in its #listStatus calls, but not in the recursive #listFiles > call. The cleanup is performed from the baseDir so the recursive listing > would have to be in Flink. > This touches on another issue. Over time, the directory listing is bound to > get very large, and re-listing everything from the baseDir may get > increasingly expensive, especially if the Fs is S3. Maybe we can have a > Bucketer callback to return a list of cleanup root directories based on the > current file? I'm guessing most people are using time based bucketing, so > there's only so much of a period where cleanup will matter. If so, then this > would solve for the above recursive listing problem. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)