Hi Congxian, the leftover files were on the local disk of the TaskManager. But looking better into the issue, I think the issue was the "logs". The sink, in this case, was writing one line into the logger (I was writing 8 GB in total), and that makes more sense. So nothing wrong with the Flink/Savepoint behaviour.
Thanks, David On Tue, Jul 21, 2020 at 12:37 PM Congxian Qiu <qcx978132...@gmail.com> wrote: > Hi David > Sorry for the late reply, seems I missed your previous email. > I'm not sure I fully understand here, do the leftover files on s3 > filesystem or the local disk of Taskmanager?. Currently, the savepoint data > will directly write to output stream of the underlying file(here is s3 > file), you can have a look at the code here[1]. > > [1] > https://github.com/apache/flink/blob/1908b2ce6ffb8efc7d339136787494b4fe70846f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java#L160 > > Best, > Congxian > > > David Magalhães <speeddra...@gmail.com> 于2020年7月21日周二 下午4:10写道: > >> Hi Till, I'm using s3:// schema, but not sure what was the default used >> if s3a or s3p. >> >> then the state backend should try to directly write to the target file >>> system >> >> >> That was the behaviour that I saw the second time I've run this with more >> slots. Does the savepoint write directly to S3 via streaming or write the >> savepoint to memory first before sending to S3? >> >> Thanks, >> David >> >> On Tue, Jul 21, 2020 at 7:42 AM Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> Hi David, >>> >>> which S3 file system implementation are you using? If I'm not mistaken, >>> then the state backend should try to directly write to the target file >>> system. If this should result in temporary files on your TM, then this >>> might be a problem of the file system implementation. Having access to the >>> logs could also help to better understand whats going on. >>> >>> Cheers, >>> Till >>> >>> On Tue, Jul 14, 2020 at 11:57 AM David Magalhães <speeddra...@gmail.com> >>> wrote: >>> >>>> Hi Congxian, sorry for the late reply. >>>> >>>> I'm using the filesystem with an S3 path as the default state backend >>>> in flink-conf.yml (state.backend: filesystem). >>>> The Flink version I'm using is 1.10.1. >>>> >>>> By "The task manager did not clean up the state", I mean what the >>>> taskmanager was writing on disk the savepoint file, but it didn't delete it >>>> after the other taskmanager had an issue with the disk being full. The >>>> expected scenario would be both taskmanagers remove the savepoint they were >>>> trying to do from the disk, but only the one that reached 100% disk space >>>> use did it. >>>> >>>> For my scenario, I'm using the Flink REST API to start/deploy jobs. A >>>> retained checkpoint isn't supported in REST API and even if it was, I think >>>> it doesn't fit my scenario (stop a job, and start the new one from the >>>> saved state). >>>> >>>> Thanks, >>>> David >>>> >>>> On Sat, Jul 11, 2020 at 8:14 AM Congxian Qiu <qcx978132...@gmail.com> >>>> wrote: >>>> >>>>> Hi David >>>>> >>>>> As you say the savepoint use local disk, I assume that you use >>>>> RocksDBStateBackend. >>>>> What's the flink version are you using now? >>>>> >>>>> What do you mean "The task manager did not clean up the state"?, does >>>>> that mean the local disk space did not clean up, do the task encounter >>>>> failover in this period? >>>>> >>>>> The snapshot speed will be limited by the network bandwidth and the >>>>> local io performance. >>>>> IIUC, currently only checkpoint support local recovery >>>>> >>>>> PS: If you want the snapshot complete quickly, maybe you can try >>>>> retained checkpoint[1], and multiple threads uploads[2] >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/checkpoints.html#retained-checkpoints >>>>> [2] https://issues.apache.org/jira/browse/FLINK-11008 >>>>> >>>>> Best, >>>>> Congxian >>>>> >>>>> >>>>> David Magalhães <speeddra...@gmail.com> 于2020年7月10日周五 下午7:37写道: >>>>> >>>>>> Hi, yesterday when I was creating a savepoint (to S3, around 8GB of >>>>>> state) using 2 TaskManager (8 GB) and it failed because one of the task >>>>>> managers fill up the disk (probably didn't have enough RAM to save the >>>>>> state into S3 directly,I don't know what was the disk space, and reached >>>>>> 100% usage space and the other one reached 99%). >>>>>> >>>>>> After the crash, the task manager that reach 100% deleted the "failed >>>>>> savepoint" from the local disk but the other one that reached 99% kept >>>>>> it. >>>>>> Shouldn't this task manager also clean up the failed state? >>>>>> >>>>>> After cleaning up the disk of that task manager, I've increased the >>>>>> parallelism to 6, created a new state of 8GB and all went smoothly, but >>>>>> it >>>>>> took 8 minutes to start processing in the new job created with the >>>>>> previous >>>>>> savepoint. >>>>>> >>>>>> [image: flink_grafana.png] >>>>>> Here is the network IO from the 6 task managers used and I have a few >>>>>> questions: >>>>>> >>>>>> - Isn't 25 Mbps of average speed a bit low? What could be the >>>>>> limitation? >>>>>> - For 8 GB of state, gives around 7 minutes to download it [ 8000 MB >>>>>> /(25Mbps/8*6 task managers)/60 seconds ], that should match the >>>>>> consistent >>>>>> part of 7/8 minute graph, and then started reading from Kafka topic. >>>>>> - Can I mitigate this with task local recovery [1]? Or is this only >>>>>> for a checkpoint ? >>>>>> - We are using *m5.xlarge* (4 vcpu, 16GB RAM) with 2 slots per TM. >>>>>> >>>>>> Thanks, >>>>>> David >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#task-local-recovery >>>>>> >>>>>