Hi Congxian, the leftover files were on the local disk of the TaskManager.
But looking better into the issue, I think the issue was the "logs". The
sink, in this case, was writing one line into the logger (I was writing 8
GB in total), and that makes more sense. So nothing wrong with the
Flink/Savepoint behaviour.

Thanks,
David

On Tue, Jul 21, 2020 at 12:37 PM Congxian Qiu <qcx978132...@gmail.com>
wrote:

> Hi David
>    Sorry for the late reply, seems I missed your previous email.
>    I'm not sure I fully understand here, do the leftover files on s3
> filesystem or the local disk of Taskmanager?. Currently, the savepoint data
> will directly write to output stream of the underlying file(here is s3
> file), you can have a look at the code here[1].
>
> [1]
> https://github.com/apache/flink/blob/1908b2ce6ffb8efc7d339136787494b4fe70846f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java#L160
>
> Best,
> Congxian
>
>
> David Magalhães <speeddra...@gmail.com> 于2020年7月21日周二 下午4:10写道:
>
>> Hi Till, I'm using s3:// schema, but not sure what was the default used
>> if s3a or s3p.
>>
>> then the state backend should try to directly write to the target file
>>> system
>>
>>
>> That was the behaviour that I saw the second time I've run this with more
>> slots. Does the savepoint write directly to S3 via streaming or write the
>> savepoint to memory first before sending to S3?
>>
>> Thanks,
>> David
>>
>> On Tue, Jul 21, 2020 at 7:42 AM Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> Hi David,
>>>
>>> which S3 file system implementation are you using? If I'm not mistaken,
>>> then the state backend should try to directly write to the target file
>>> system. If this should result in temporary files on your TM, then this
>>> might be a problem of the file system implementation. Having access to the
>>> logs could also help to better understand whats going on.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Jul 14, 2020 at 11:57 AM David Magalhães <speeddra...@gmail.com>
>>> wrote:
>>>
>>>> Hi Congxian, sorry for the late reply.
>>>>
>>>> I'm using the filesystem with an S3 path as the default state backend
>>>> in flink-conf.yml (state.backend: filesystem).
>>>> The Flink version I'm using is 1.10.1.
>>>>
>>>> By "The task manager did not clean up the state", I mean what the
>>>> taskmanager was writing on disk the savepoint file, but it didn't delete it
>>>> after the other taskmanager had an issue with the disk being full. The
>>>> expected scenario would be both taskmanagers remove the savepoint they were
>>>> trying to do from the disk, but only the one that reached 100% disk space
>>>> use did it.
>>>>
>>>> For my scenario, I'm using the Flink REST API to start/deploy jobs. A
>>>> retained checkpoint isn't supported in REST API and even if it was, I think
>>>> it doesn't fit my scenario (stop a job, and start the new one from the
>>>> saved state).
>>>>
>>>> Thanks,
>>>> David
>>>>
>>>> On Sat, Jul 11, 2020 at 8:14 AM Congxian Qiu <qcx978132...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi David
>>>>>
>>>>> As you say the savepoint use local disk, I assume that you use
>>>>> RocksDBStateBackend.
>>>>> What's the flink version are you using now?
>>>>>
>>>>> What do you mean "The task manager did not clean up the state"?, does
>>>>> that mean the local disk space did not  clean up, do the task encounter
>>>>> failover in this period?
>>>>>
>>>>> The snapshot speed will be limited by the network bandwidth and the
>>>>> local io performance.
>>>>> IIUC, currently only checkpoint support local recovery
>>>>>
>>>>> PS: If you want the snapshot complete quickly, maybe you can try
>>>>> retained checkpoint[1], and multiple threads uploads[2]
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/checkpoints.html#retained-checkpoints
>>>>> [2] https://issues.apache.org/jira/browse/FLINK-11008
>>>>>
>>>>> Best,
>>>>> Congxian
>>>>>
>>>>>
>>>>> David Magalhães <speeddra...@gmail.com> 于2020年7月10日周五 下午7:37写道:
>>>>>
>>>>>> Hi, yesterday when I was creating a savepoint (to S3, around 8GB of
>>>>>> state) using 2 TaskManager (8 GB) and it failed because one of the task
>>>>>> managers fill up the disk (probably didn't have enough RAM to save the
>>>>>> state into S3 directly,I don't know what was the disk space, and reached
>>>>>> 100% usage space and the other one reached 99%).
>>>>>>
>>>>>> After the crash, the task manager that reach 100% deleted the "failed
>>>>>> savepoint" from the local disk but the other one that reached 99% kept 
>>>>>> it.
>>>>>> Shouldn't this task manager also clean up the failed state?
>>>>>>
>>>>>> After cleaning up the disk of that task manager, I've increased the
>>>>>> parallelism to 6, created a new state of 8GB and all went smoothly, but 
>>>>>> it
>>>>>> took 8 minutes to start processing in the new job created with the 
>>>>>> previous
>>>>>> savepoint.
>>>>>>
>>>>>> [image: flink_grafana.png]
>>>>>> Here is the network IO from the 6 task managers used and I have a few
>>>>>> questions:
>>>>>>
>>>>>> - Isn't 25 Mbps of average speed a bit low? What could be the
>>>>>> limitation?
>>>>>> - For 8 GB of state, gives around 7 minutes to download it [ 8000 MB
>>>>>> /(25Mbps/8*6 task managers)/60 seconds ], that should match the 
>>>>>> consistent
>>>>>> part of 7/8 minute graph, and then started reading from Kafka topic.
>>>>>> - Can I mitigate this with task local recovery [1]? Or is this only
>>>>>> for a checkpoint ?
>>>>>> - We are using *m5.xlarge* (4 vcpu, 16GB RAM) with 2 slots per TM.
>>>>>>
>>>>>> Thanks,
>>>>>> David
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#task-local-recovery
>>>>>>
>>>>>

Reply via email to