[ https://issues.apache.org/jira/browse/FLINK-20208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235430#comment-17235430 ]
Yun Gao edited comment on FLINK-20208 at 11/19/20, 12:51 PM: ------------------------------------------------------------- Hi [~trushev] very thanks for more investigation! I think the case I referred to is when failover happens between step 4 (checkpoint) and step 5 (checkpoint completed), in this case after failover, we would need to commit .part-0-0.inprogress.UUID1 (since '012' would not be replayed). On the other side, it would also be possible there are more records are processed during step 4 and step 5, thus between these period some other in-progress files might be generated, and these in-progress files need to be removed. was (Author: gaoyunhaii): Hi [~trushev] very thanks for more investigation! I think the case I referred to is when failover happens between step 4 (checkpoint) and step 5 (checkpoint completed), in this case after failover, we would need to commit .part-0-0.inprogress.UUID1 (since '012' would not be replayed). One the other side, it would also be possible there are more records are processed during step 4 and step 5, thus between these period some other in-progress files might be generated, and these in-progress files need to be removed. > Remove outdated in-progress files in StreamingFileSink > ------------------------------------------------------ > > Key: FLINK-20208 > URL: https://issues.apache.org/jira/browse/FLINK-20208 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem > Affects Versions: 1.11.2 > Reporter: Alexander Trushev > Priority: Minor > > Assume a job has StreamingFileSink with OnCheckpointRollingPolicy > In the case: > # Acknowledged checkpoint > # Event is written to new .part-X-Y.UUID1 > # Job failure > # Job recovery from the checkpoint > # Event is written to new .part-X-Y.UUID2 > we have the outdated part file .part-X-Y.UUID1. Where X - subtask index, Y - > part counter. > *Proposal* > Add method > {code:java} > boolean shouldRemoveOutdatedParts() > {code} > to RollingPolicy. > Add configurable parameter to OnCheckpointRollingPolicy and to > DefaultRollingPolicy that will be returned by shouldRemoveOutdatedParts() (by > default false) > We can remove such outdated part files by the next algorithm while restoring > job from a checkpoint > # After buckets state initializing check shouldRemoveOutdatedParts. If true > then (2) > # For each bucket scan bucket directory > # If three conditions are true then remove part file: > part filename contains "inprogress"; > subtask index from filename equals to current subtask index; > part counter from filename more than or equals to current max part counter. > I propose to remove outdated files, because the similar proposal to overwrite > outdated files has not been implemented > [https://issues.apache.org/jira/browse/FLINK-11116|https://vk.com/away.php?to=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-11116&cc_key=] -- This message was sent by Atlassian Jira (v8.3.4#803005)