Hey Boris, I think the problem is that you are using externalized checkpoints:
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) Your checkpoints are retained in both failure and cancellation cases, so the checkpoint files with grow indefinitely On Wed, Jun 12, 2019 at 8:01 AM Congxian Qiu <qcx978132...@gmail.com> wrote: > Hi Boris > For the configure you gave, you can try to reduce the parallelism of the > operator which contains states. > > Best, > Congxian > > > Boris Lublinsky <boris.lublin...@lightbend.com> 于2019年6月10日周一 下午9:43写道: > >> Here is code enabling checkpointing >> >> // Enable checkpointing >> env.enableCheckpointing(60000 ) // 1 min >> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) >> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) >> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) >> val checkpointingBackend = new FsStateBackend("file:///flink/checkpoints", >> true) >> env.setStateBackend(checkpointingBackend) >> >> >> >> Boris Lublinsky >> FDP Architect >> boris.lublin...@lightbend.com >> https://www.lightbend.com/ >> >> On Jun 10, 2019, at 1:07 AM, Congxian Qiu <qcx978132...@gmail.com> wrote: >> >> Hi >> >> Which state backed(Heap or RocksDB) and checkpoint mode (fullsnapshot or >> increment) do you use? >> >> Best, >> Congxian >> >> >> Boris Lublinsky <boris.lublin...@lightbend.com> 于2019年6月4日周二 上午6:45写道: >> >>> Is there a way to limit the amount of checkpoint file? >>> The parameter that I set : state.checkpoints.num-retained: 5 >>> does not seem to have any effect. Is there anything else I can set to >>> prevent infinite growth of checkpointing info? >>> >>> Boris Lublinsky >>> FDP Architect >>> boris.lublin...@lightbend.com >>> https://www.lightbend.com/ >>> >>> >>