[ https://issues.apache.org/jira/browse/FLINK-20208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235419#comment-17235419 ]
Alexander Trushev edited comment on FLINK-20208 at 11/19/20, 12:48 PM: ----------------------------------------------------------------------- The first issue. I tested my prototype with DefaultRollingPolicy. There is no need to distinguish between pending and in-progress files. Both should be removed. Assume * stateful source function that produces characters from the alphabet [0-9] * rolling policy - max 3 characters * 1 bucket and 1 parallelism Case # write 3 characters(0, 1, 2) to .part-0-0.inprogress.UUID1. Bucket: {noformat} [.part-0-0.inprogress.UUID1 {status: inprogress, content: "012"}] {noformat} # rolling policy change status to pending. Bucket: {noformat} [.part-0-0.inprogress.UUID1 {status: pending, content: "012"}] {noformat} # write 2 characters(3, 4) to .part-0-1.inprogress.UUID2. Bucket: {noformat} [ .part-0-0.inprogress.UUID1 {status: pending, content: "012"}, .part-0-1.inprogress.UUID2 {status: inprogress, content: "34"} ] {noformat} # checkpoint 1. Stored state: {noformat} { sourceNextValue: 5, maxPartCounter: 2, pending: [{name: .part-0-0.inprogress.UUID1, offset: 3}], inprogress: {name: .part-0-1.inprogress.UUID2, offset: 2} } {noformat} # notify checkpoint 1 completed. Renaming .part-0-0.inprogress.UUID1 -> part-0-0. Bucket: {noformat} [ part-0-0 {status: finished, content: "012"}, .part-0-1.inprogress.UUID2 {status: inprogress, content: "34"} ] {noformat} # write 5 characters(5, 6, 7, 8, 9). Bucket: {noformat} [ part-0-0 {status: finished, content: "012"}, .part-0-1.inprogress.UUID2 {status: pending, content: "345"}, .part-0-2.inprogress.UUID3 {status: pending, content: "678"}, .part-0-3.inprogress.UUID4 {status: inprogress, content: "9"} ] {noformat} # job failure. Recovery from chk-1. sourceNextValue=5, maxPartCounter=2, inprogress= .part-0-1.inprogress.UUID2 \{offset=2} # Bucket: {noformat} [ part-0-0 {status: finished, content: "012"}, .part-0-1.inprogress.UUID2 {status: inprogress, content: "34"}, .part-0-2.inprogress.UUID3 {status: pending, content: "678"} - outdated, .part-0-3.inprogress.UUID4 {status: inprogress, content: "9"} - outdated ] {noformat} # remove .part-0-2.inprogress.UUID3 (2 >= maxPartCounter) # remove .part-0-3.inprogress.UUID4 (3 >= maxPartCounter) # next character(5) is written to .part-0-1.inprogress.UUID2 So pending and inprogress files were removed. [~gaoyunhaii] could you explain what means "pending files (which has been snapshotted and need to be re-committed)". Is it .part-0-0.inprogress.UUID1 from my case? I don't see a problem with this. About " So if before failover different subtasks have different number of buckets, one bucket is very likely to be assigned to another subtask". Looks like a potential issue. Bucket-0 is filled by subtask-0, bucket-1 is filled by subtask-1. Then after job restart, subtask-0 receives bucket-1 and old subtaskIndex-0. In this case, outdated parts with subtaskIndex-1 from bucket-1 will not be removed because the condition "subtask index from filename equals to old subtaskIndex" always false. I have no idea yet if this is possible and how to fix it. was (Author: trushev): The first issue. I tested my prototype with DefaultRollingPolicy. There is no need to distinguish between pending and in-progress files. Both should be removed. Assume * stateful source function that produces characters from the alphabet [0-9] * rolling policy - max 3 characters * 1 bucket and 1 parallelism Case # write 3 characters(0, 1, 2) to .part-0-0.inprogress.UUID1. Bucket: {noformat} [.part-0-0.inprogress.UUID1 {status: inprogress, content: "012"}] {noformat} # rolling policy change status to pending. Bucket: {noformat} [.part-0-0.inprogress.UUID1 {status: pending, content: "012"}] {noformat} # write 2 characters(3, 4) to .part-0-1.inprogress.UUID2. Bucket: {noformat} [ .part-0-0.inprogress.UUID1 {status: pending, content: "012"}, .part-0-1.inprogress.UUID2 {status: inprogress, content: "34"} ] {noformat} # checkpoint 1. Stored state: {noformat} { sourceNextValue: 5, maxPartCounter: 2, pending: [{name: .part-0-0.inprogress.UUID1, offset: 3}], inprogress: {name: .part-0-1.inprogress.UUID2, offset: 2} } {noformat} # notify checkpoint 1 completed. Renaming .part-0-0.inprogress.UUID1 -> part-0-0. Bucket: {noformat} [ part-0-0 {status: finished, content: "012"}, .part-0-1.inprogress.UUID2 {status: inprogress, content: "34"} ] {noformat} # write 5 characters(5, 6, 7, 8, 9). Bucket: {noformat} [ part-0-0 {status: finished, content: "012"}, .part-0-1.inprogress.UUID2 {status: pending, content: "345"}, .part-0-2.inprogress.UUID3 {status: pending, content: "678"}, .part-0-3.inprogress.UUID4 {status: inprogress, content: "9"} ] {noformat} # job failure. Recovery from chk-1. sourceNextValue=5, maxPartCounter=2, inprogress= .part-0-1.inprogress.UUID2 \{offset=2} # Bucket: {noformat} [ part-0-0 {status: finished, content: "012"}, .part-0-1.inprogress.UUID2 {status: inprogress, content: "34"}, .part-0-2.inprogress.UUID3 {status: pending, content: "678"} - outdated, .part-0-3.inprogress.UUID4 {status: inprogress, content: "9"} - outdated ] {noformat} # remove .part-0-2.inprogress.UUID3 (2 >= maxPartCounter) # remove .part-0-3.inprogress.UUID4 (3 >= maxPartCounter) # next character(5) is written to .part-0-1.inprogress.UUID2 So pending and inprogress files were removed. [~gaoyunhaii] could you explain what means "pending files (which has been snapshotted and need to be re-committed)". Is it .part-0-0.inprogress.UUID1 from my case? I don't see a problem with this. About " So if before failover different subtasks have different number of buckets, one bucket is very likely to be assigned to another subtask". Looks like a potential issue. Bucket-0 is filled by subtask-0, bucket-1 is filled by subtask-1. Then after job restart, subtask-0 receives bucket-1 and old subtaskIndex-0. In this case, outdated parts with subtaskIndex-1 from bucket-1 will not be removed because the condition "subtask index from filename equals to old subtaskIndex" always false. I have no idea yet if this is possible and how to fix it. > Remove outdated in-progress files in StreamingFileSink > ------------------------------------------------------ > > Key: FLINK-20208 > URL: https://issues.apache.org/jira/browse/FLINK-20208 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem > Affects Versions: 1.11.2 > Reporter: Alexander Trushev > Priority: Minor > > Assume a job has StreamingFileSink with OnCheckpointRollingPolicy > In the case: > # Acknowledged checkpoint > # Event is written to new .part-X-Y.UUID1 > # Job failure > # Job recovery from the checkpoint > # Event is written to new .part-X-Y.UUID2 > we have the outdated part file .part-X-Y.UUID1. Where X - subtask index, Y - > part counter. > *Proposal* > Add method > {code:java} > boolean shouldRemoveOutdatedParts() > {code} > to RollingPolicy. > Add configurable parameter to OnCheckpointRollingPolicy and to > DefaultRollingPolicy that will be returned by shouldRemoveOutdatedParts() (by > default false) > We can remove such outdated part files by the next algorithm while restoring > job from a checkpoint > # After buckets state initializing check shouldRemoveOutdatedParts. If true > then (2) > # For each bucket scan bucket directory > # If three conditions are true then remove part file: > part filename contains "inprogress"; > subtask index from filename equals to current subtask index; > part counter from filename more than or equals to current max part counter. > I propose to remove outdated files, because the similar proposal to overwrite > outdated files has not been implemented > [https://issues.apache.org/jira/browse/FLINK-11116|https://vk.com/away.php?to=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-11116&cc_key=] -- This message was sent by Atlassian Jira (v8.3.4#803005)