Hi David thanks for the confirmation, good to know that. Best, Congxian
David Magalhães <speeddra...@gmail.com> 于2020年7月21日周二 下午11:42写道: > Hi Congxian, the leftover files were on the local disk of the TaskManager. > But looking better into the issue, I think the issue was the "logs". The > sink, in this case, was writing one line into the logger (I was writing 8 > GB in total), and that makes more sense. So nothing wrong with the > Flink/Savepoint behaviour. > > Thanks, > David > > On Tue, Jul 21, 2020 at 12:37 PM Congxian Qiu <qcx978132...@gmail.com> > wrote: > >> Hi David >> Sorry for the late reply, seems I missed your previous email. >> I'm not sure I fully understand here, do the leftover files on s3 >> filesystem or the local disk of Taskmanager?. Currently, the savepoint data >> will directly write to output stream of the underlying file(here is s3 >> file), you can have a look at the code here[1]. >> >> [1] >> https://github.com/apache/flink/blob/1908b2ce6ffb8efc7d339136787494b4fe70846f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java#L160 >> >> Best, >> Congxian >> >> >> David Magalhães <speeddra...@gmail.com> 于2020年7月21日周二 下午4:10写道: >> >>> Hi Till, I'm using s3:// schema, but not sure what was the default used >>> if s3a or s3p. >>> >>> then the state backend should try to directly write to the target file >>>> system >>> >>> >>> That was the behaviour that I saw the second time I've run this with >>> more slots. Does the savepoint write directly to S3 via streaming or write >>> the savepoint to memory first before sending to S3? >>> >>> Thanks, >>> David >>> >>> On Tue, Jul 21, 2020 at 7:42 AM Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> Hi David, >>>> >>>> which S3 file system implementation are you using? If I'm not mistaken, >>>> then the state backend should try to directly write to the target file >>>> system. If this should result in temporary files on your TM, then this >>>> might be a problem of the file system implementation. Having access to the >>>> logs could also help to better understand whats going on. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Tue, Jul 14, 2020 at 11:57 AM David Magalhães <speeddra...@gmail.com> >>>> wrote: >>>> >>>>> Hi Congxian, sorry for the late reply. >>>>> >>>>> I'm using the filesystem with an S3 path as the default state backend >>>>> in flink-conf.yml (state.backend: filesystem). >>>>> The Flink version I'm using is 1.10.1. >>>>> >>>>> By "The task manager did not clean up the state", I mean what the >>>>> taskmanager was writing on disk the savepoint file, but it didn't delete >>>>> it >>>>> after the other taskmanager had an issue with the disk being full. The >>>>> expected scenario would be both taskmanagers remove the savepoint they >>>>> were >>>>> trying to do from the disk, but only the one that reached 100% disk space >>>>> use did it. >>>>> >>>>> For my scenario, I'm using the Flink REST API to start/deploy jobs. A >>>>> retained checkpoint isn't supported in REST API and even if it was, I >>>>> think >>>>> it doesn't fit my scenario (stop a job, and start the new one from the >>>>> saved state). >>>>> >>>>> Thanks, >>>>> David >>>>> >>>>> On Sat, Jul 11, 2020 at 8:14 AM Congxian Qiu <qcx978132...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi David >>>>>> >>>>>> As you say the savepoint use local disk, I assume that you use >>>>>> RocksDBStateBackend. >>>>>> What's the flink version are you using now? >>>>>> >>>>>> What do you mean "The task manager did not clean up the state"?, does >>>>>> that mean the local disk space did not clean up, do the task encounter >>>>>> failover in this period? >>>>>> >>>>>> The snapshot speed will be limited by the network bandwidth and the >>>>>> local io performance. >>>>>> IIUC, currently only checkpoint support local recovery >>>>>> >>>>>> PS: If you want the snapshot complete quickly, maybe you can try >>>>>> retained checkpoint[1], and multiple threads uploads[2] >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/checkpoints.html#retained-checkpoints >>>>>> [2] https://issues.apache.org/jira/browse/FLINK-11008 >>>>>> >>>>>> Best, >>>>>> Congxian >>>>>> >>>>>> >>>>>> David Magalhães <speeddra...@gmail.com> 于2020年7月10日周五 下午7:37写道: >>>>>> >>>>>>> Hi, yesterday when I was creating a savepoint (to S3, around 8GB of >>>>>>> state) using 2 TaskManager (8 GB) and it failed because one of the task >>>>>>> managers fill up the disk (probably didn't have enough RAM to save the >>>>>>> state into S3 directly,I don't know what was the disk space, and reached >>>>>>> 100% usage space and the other one reached 99%). >>>>>>> >>>>>>> After the crash, the task manager that reach 100% deleted the >>>>>>> "failed savepoint" from the local disk but the other one that reached >>>>>>> 99% >>>>>>> kept it. Shouldn't this task manager also clean up the failed state? >>>>>>> >>>>>>> After cleaning up the disk of that task manager, I've increased the >>>>>>> parallelism to 6, created a new state of 8GB and all went smoothly, but >>>>>>> it >>>>>>> took 8 minutes to start processing in the new job created with the >>>>>>> previous >>>>>>> savepoint. >>>>>>> >>>>>>> [image: flink_grafana.png] >>>>>>> Here is the network IO from the 6 task managers used and I have a >>>>>>> few questions: >>>>>>> >>>>>>> - Isn't 25 Mbps of average speed a bit low? What could be the >>>>>>> limitation? >>>>>>> - For 8 GB of state, gives around 7 minutes to download it [ 8000 MB >>>>>>> /(25Mbps/8*6 task managers)/60 seconds ], that should match the >>>>>>> consistent >>>>>>> part of 7/8 minute graph, and then started reading from Kafka topic. >>>>>>> - Can I mitigate this with task local recovery [1]? Or is this only >>>>>>> for a checkpoint ? >>>>>>> - We are using *m5.xlarge* (4 vcpu, 16GB RAM) with 2 slots per TM. >>>>>>> >>>>>>> Thanks, >>>>>>> David >>>>>>> >>>>>>> [1] >>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#task-local-recovery >>>>>>> >>>>>>