[ https://issues.apache.org/jira/browse/FLINK-20208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Alexander Trushev updated FLINK-20208: -------------------------------------- Description: Assume a job has StreamingFileSink with OnCheckpointRollingPolicy In the case: # Acknowledged checkpoint # Event is written to new .part-X-Y.UUID1 # Job failure # Job recovery from the checkpoint # Event is written to new .part-X-Y.UUID2 we have the outdated part file .part-X-Y.UUID1. Where X - subtask index, Y - part counter. *Proposal* Add method {code:java} boolean shouldRemoveOutdatedParts() {code} to RollingPolicy. Add configurable parameter to OnCheckpointRollingPolicy and to DefaultRollingPolicy that will be returned by shouldRemoveOutdatedParts() (by default false) We can remove such outdated part files by the next algorithm while restoring job from a checkpoint # After buckets state initializing check shouldRemoveOutdatedParts. If true then (2) # For each bucket scan bucket directory # If three conditions are true then remove part file: part filename contains "inprogress"; subtask index from filename equals to current subtask index; part counter from filename more than or equals to current max part counter. I propose to remove outdated files, because the similar proposal to overwrite outdated files has not been implemented [https://issues.apache.org/jira/browse/FLINK-11116|https://vk.com/away.php?to=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-11116&cc_key=] was: Assume a job has StreamingFileSink with OnCheckpointRollingPolicy In the case: # Acknowledged checkpoint # Event is written to new .part-X-Y.UUID1 # Job failure # Job recovery from the checkpoint # Event is written to new .part-X-Y.UUID2 we have the outdated part file .part-X-Y.UUID1. Where X - subtask index, Y - part counter. *Proposal* Add method {code:java} boolean shouldRemoveOutdatedParts() {code} to RollingPolicy. Add configurable parameter to OnCheckpointRollingPolicy and to DefaultRollingPolicy that will be returned by shouldRemoveOutdatedParts() (by default false) We can remove such outdated part files by the next algorithm while restoring job from a checkpoint # After buckets state initializing check shouldRemoveOutdatedParts. If true then (2) # For each inactive bucket scan bucket directory # If three conditions are true then remove part file: part filename contains "inprogress"; subtask index from filename equals to current subtask index; part counter from filename more than or equals to current max part counter. I propose to remove outdated files, because the similar proposal to overwrite outdated files has not been implemented [https://issues.apache.org/jira/browse/FLINK-11116|https://vk.com/away.php?to=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-11116&cc_key=] > Remove outdated in-progress files in StreamingFileSink > ------------------------------------------------------ > > Key: FLINK-20208 > URL: https://issues.apache.org/jira/browse/FLINK-20208 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem > Affects Versions: 1.11.2 > Reporter: Alexander Trushev > Priority: Minor > > Assume a job has StreamingFileSink with OnCheckpointRollingPolicy > In the case: > # Acknowledged checkpoint > # Event is written to new .part-X-Y.UUID1 > # Job failure > # Job recovery from the checkpoint > # Event is written to new .part-X-Y.UUID2 > we have the outdated part file .part-X-Y.UUID1. Where X - subtask index, Y - > part counter. > *Proposal* > Add method > {code:java} > boolean shouldRemoveOutdatedParts() > {code} > to RollingPolicy. > Add configurable parameter to OnCheckpointRollingPolicy and to > DefaultRollingPolicy that will be returned by shouldRemoveOutdatedParts() (by > default false) > We can remove such outdated part files by the next algorithm while restoring > job from a checkpoint > # After buckets state initializing check shouldRemoveOutdatedParts. If true > then (2) > # For each bucket scan bucket directory > # If three conditions are true then remove part file: > part filename contains "inprogress"; > subtask index from filename equals to current subtask index; > part counter from filename more than or equals to current max part counter. > I propose to remove outdated files, because the similar proposal to overwrite > outdated files has not been implemented > [https://issues.apache.org/jira/browse/FLINK-11116|https://vk.com/away.php?to=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-11116&cc_key=] -- This message was sent by Atlassian Jira (v8.3.4#803005)