[ 
https://issues.apache.org/jira/browse/FLINK-20208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235419#comment-17235419
 ] 

Alexander Trushev edited comment on FLINK-20208 at 11/19/20, 12:48 PM:
-----------------------------------------------------------------------

The first issue.

I tested my prototype with DefaultRollingPolicy. There is no need to 
distinguish between pending and in-progress files. Both should be removed.

Assume
 * stateful source function that produces characters from the alphabet [0-9]
 * rolling policy - max 3 characters
 * 1 bucket and 1 parallelism

Case
 # write 3 characters(0, 1, 2) to .part-0-0.inprogress.UUID1. Bucket:
{noformat}
[.part-0-0.inprogress.UUID1 {status: inprogress, content: "012"}]
{noformat}
# rolling policy change status to pending. Bucket:
{noformat}
[.part-0-0.inprogress.UUID1 {status: pending, content: "012"}]
{noformat}
# write 2 characters(3, 4) to .part-0-1.inprogress.UUID2. Bucket:
{noformat}
[
  .part-0-0.inprogress.UUID1 {status: pending, content: "012"},
  .part-0-1.inprogress.UUID2 {status: inprogress, content: "34"}
]
{noformat}
# checkpoint 1. Stored state:
{noformat}
{
  sourceNextValue: 5,
  maxPartCounter: 2,
  pending: [{name: .part-0-0.inprogress.UUID1, offset: 3}],
  inprogress: {name: .part-0-1.inprogress.UUID2, offset: 2}
}
{noformat}
# notify checkpoint 1 completed. Renaming .part-0-0.inprogress.UUID1 -> 
part-0-0. Bucket:
{noformat}
[
  part-0-0 {status: finished, content: "012"},
  .part-0-1.inprogress.UUID2 {status: inprogress, content: "34"}
]
{noformat}
# write 5 characters(5, 6, 7, 8, 9). Bucket:
{noformat}
[
  part-0-0 {status: finished, content: "012"},
  .part-0-1.inprogress.UUID2 {status: pending, content: "345"},
  .part-0-2.inprogress.UUID3 {status: pending, content: "678"},
  .part-0-3.inprogress.UUID4 {status: inprogress, content: "9"}
]
{noformat}
# job failure. Recovery from chk-1. sourceNextValue=5, maxPartCounter=2, 
inprogress= .part-0-1.inprogress.UUID2 \{offset=2}
 # Bucket:
{noformat}
[
  part-0-0 {status: finished, content: "012"},
  .part-0-1.inprogress.UUID2 {status: inprogress, content: "34"},
  .part-0-2.inprogress.UUID3 {status: pending, content: "678"} - outdated,
  .part-0-3.inprogress.UUID4 {status: inprogress, content: "9"}  - outdated
]
{noformat}
# remove .part-0-2.inprogress.UUID3 (2 >= maxPartCounter)
 # remove .part-0-3.inprogress.UUID4 (3 >= maxPartCounter)
 # next character(5) is written to .part-0-1.inprogress.UUID2

So pending and inprogress files were removed.

[~gaoyunhaii] could you explain what means "pending files (which has been 
snapshotted and need to be re-committed)". Is it .part-0-0.inprogress.UUID1 
from my case? I don't see a problem with this.

About " So if before failover different subtasks have different number of 
buckets, one bucket is very likely to be assigned to another subtask". Looks 
like a potential issue.

Bucket-0 is filled by subtask-0, bucket-1 is filled by subtask-1. Then after 
job restart, subtask-0 receives bucket-1 and old subtaskIndex-0. In this case, 
outdated parts with subtaskIndex-1 from bucket-1 will not be removed because 
the condition "subtask index from filename equals to old subtaskIndex" always 
false. I have no idea yet if this is possible and how to fix it.


was (Author: trushev):
The first issue.

I tested my prototype with DefaultRollingPolicy. There is no need to 
distinguish between pending and in-progress files. Both should be removed.

Assume
 * stateful source function that produces characters from the alphabet [0-9]
 * rolling policy - max 3 characters
 * 1 bucket and 1 parallelism

Case
 # write 3 characters(0, 1, 2) to .part-0-0.inprogress.UUID1. Bucket:
{noformat}
[.part-0-0.inprogress.UUID1 {status: inprogress, content: "012"}]
{noformat}

 # rolling policy change status to pending. Bucket:
{noformat}
[.part-0-0.inprogress.UUID1 {status: pending, content: "012"}]
{noformat}

 # write 2 characters(3, 4) to .part-0-1.inprogress.UUID2. Bucket:
{noformat}
[
  .part-0-0.inprogress.UUID1 {status: pending, content: "012"},
  .part-0-1.inprogress.UUID2 {status: inprogress, content: "34"}
]
{noformat}

 # checkpoint 1. Stored state:
{noformat}
{
  sourceNextValue: 5,
  maxPartCounter: 2,
  pending: [{name: .part-0-0.inprogress.UUID1, offset: 3}],
  inprogress: {name: .part-0-1.inprogress.UUID2, offset: 2}
}
{noformat}

 # notify checkpoint 1 completed. Renaming .part-0-0.inprogress.UUID1 -> 
part-0-0. Bucket:
{noformat}
[
  part-0-0 {status: finished, content: "012"},
  .part-0-1.inprogress.UUID2 {status: inprogress, content: "34"}
]
{noformat}

 # write 5 characters(5, 6, 7, 8, 9). Bucket:
{noformat}
[
  part-0-0 {status: finished, content: "012"},
  .part-0-1.inprogress.UUID2 {status: pending, content: "345"},
  .part-0-2.inprogress.UUID3 {status: pending, content: "678"},
  .part-0-3.inprogress.UUID4 {status: inprogress, content: "9"}
]
{noformat}

 # job failure. Recovery from chk-1. sourceNextValue=5, maxPartCounter=2, 
inprogress= .part-0-1.inprogress.UUID2 \{offset=2}
 # Bucket:
{noformat}
[
  part-0-0 {status: finished, content: "012"},
  .part-0-1.inprogress.UUID2 {status: inprogress, content: "34"},
  .part-0-2.inprogress.UUID3 {status: pending, content: "678"} - outdated,
  .part-0-3.inprogress.UUID4 {status: inprogress, content: "9"}  - outdated
]
{noformat}

 # remove .part-0-2.inprogress.UUID3 (2 >= maxPartCounter)
 # remove .part-0-3.inprogress.UUID4 (3 >= maxPartCounter)
 # next character(5) is written to .part-0-1.inprogress.UUID2

So pending and inprogress files were removed.

[~gaoyunhaii] could you explain what means "pending files (which has been 
snapshotted and need to be re-committed)". Is it .part-0-0.inprogress.UUID1 
from my case? I don't see a problem with this.

About " So if before failover different subtasks have different number of 
buckets, one bucket is very likely to be assigned to another subtask". Looks 
like a potential issue.

Bucket-0 is filled by subtask-0, bucket-1 is filled by subtask-1. Then after 
job restart, subtask-0 receives bucket-1 and old subtaskIndex-0. In this case, 
outdated parts with subtaskIndex-1 from bucket-1 will not be removed because 
the condition "subtask index from filename equals to old subtaskIndex" always 
false. I have no idea yet if this is possible and how to fix it.

> Remove outdated in-progress files in StreamingFileSink
> ------------------------------------------------------
>
>                 Key: FLINK-20208
>                 URL: https://issues.apache.org/jira/browse/FLINK-20208
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / FileSystem
>    Affects Versions: 1.11.2
>            Reporter: Alexander Trushev
>            Priority: Minor
>
> Assume a job has StreamingFileSink with OnCheckpointRollingPolicy
> In the case:
>  # Acknowledged checkpoint
>  # Event is written to new .part-X-Y.UUID1
>  # Job failure
>  # Job recovery from the checkpoint
>  # Event is written to new .part-X-Y.UUID2
> we have the outdated part file .part-X-Y.UUID1. Where X - subtask index, Y - 
> part counter.
> *Proposal*
>  Add method
> {code:java}
> boolean shouldRemoveOutdatedParts()
> {code}
> to RollingPolicy.
>  Add configurable parameter to OnCheckpointRollingPolicy and to 
> DefaultRollingPolicy that will be returned by shouldRemoveOutdatedParts() (by 
> default false)
> We can remove such outdated part files by the next algorithm while restoring 
> job from a checkpoint
>  # After buckets state initializing check shouldRemoveOutdatedParts. If true 
> then (2)
>  # For each bucket scan bucket directory
>  # If three conditions are true then remove part file:
>  part filename contains "inprogress";
>  subtask index from filename equals to current subtask index;
>  part counter from filename more than or equals to current max part counter.
> I propose to remove outdated files, because the similar proposal to overwrite 
> outdated files has not been implemented
> [https://issues.apache.org/jira/browse/FLINK-11116|https://vk.com/away.php?to=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-11116&cc_key=]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to