Hey Boris,

I think the problem is that you are using externalized checkpoints:

env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

Your checkpoints are retained in both failure and cancellation cases, so
the checkpoint files with grow indefinitely



On Wed, Jun 12, 2019 at 8:01 AM Congxian Qiu <qcx978132...@gmail.com> wrote:

> Hi Boris
> For the configure you gave, you can try to reduce the parallelism of the
> operator which contains states.
>
> Best,
> Congxian
>
>
> Boris Lublinsky <boris.lublin...@lightbend.com> 于2019年6月10日周一 下午9:43写道:
>
>> Here is code enabling checkpointing
>>
>> // Enable checkpointing
>> env.enableCheckpointing(60000 )   // 1 min
>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>> val checkpointingBackend = new FsStateBackend("file:///flink/checkpoints", 
>> true)
>> env.setStateBackend(checkpointingBackend)
>>
>>
>>
>> Boris Lublinsky
>> FDP Architect
>> boris.lublin...@lightbend.com
>> https://www.lightbend.com/
>>
>> On Jun 10, 2019, at 1:07 AM, Congxian Qiu <qcx978132...@gmail.com> wrote:
>>
>> Hi
>>
>> Which state backed(Heap or RocksDB) and checkpoint mode (fullsnapshot or
>> increment) do you use?
>>
>> Best,
>> Congxian
>>
>>
>> Boris Lublinsky <boris.lublin...@lightbend.com> 于2019年6月4日周二 上午6:45写道:
>>
>>> Is there a way to limit the amount of checkpoint file?
>>> The parameter that I set : state.checkpoints.num-retained: 5
>>> does not seem to have any effect. Is there anything else I can set to
>>> prevent infinite growth of checkpointing info?
>>>
>>> Boris Lublinsky
>>> FDP Architect
>>> boris.lublin...@lightbend.com
>>> https://www.lightbend.com/
>>>
>>>
>>

Reply via email to