Hi Andrey, Thanks, what is recommendation for : env.getCheckpointConfig. *setMaxConcurrentCheckpoints*(concurrentchckpt) ?
1 or higher based on what factor. Regards, Vijay On Tue, Aug 25, 2020 at 8:55 AM Andrey Zagrebin <azagre...@apache.org> wrote: > Hi Vijay, > > I think it depends on your job requirements, in particular how many > records are processed per second and how much resources you have to process > them. > > If the checkpointing interval is short then the checkpointing overhead can > be too high and you need more resources to efficiently keep up with the > incoming streaming. > > If the checkpointing interval is long, more records are batched together > and the throughput is better. > On the other hand, the observed latency is lower because the batched > results get flushed into the files and become visible in the external > system only when checkpoint occurs to provide exactly once guarantee. > > Best, > Andrey > > On Mon, Aug 24, 2020 at 6:18 PM Vijayendra Yadav <contact....@gmail.com> > wrote: > >> Hi Team, >> >> Bulk Formats can only have `OnCheckpointRollingPolicy`, which rolls >> (ONLY) on every checkpoint. >> >> *.withRollingPolicy(OnCheckpointRollingPolicy.build())* >> >> Question: What are recommended values related to checkpointing to >> fsstate, should it be more frequent checkpoints, or longer intervals, how >> many concurrent checkpoints needs to be allowed, how much should be an >> ideal pause between each checkpoint. >> Is there a way to control batch size here other than time ? any >> recommendations to all the parameters listed below? >> *Note: *I am trying to improve sink throughput. >> >> >> env.enableCheckpointing(chckptintervalmilli) >> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.valueOf(ChckptMode)) >> >> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(chckptintervalmilligap) >> env.getCheckpointConfig.setCheckpointTimeout(chckptduration) >> env.getCheckpointConfig.setMaxConcurrentCheckpoints(concurrentchckpt) >> >> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.valueOf(CheckpointCleanup)) >> >> env.getCheckpointConfig.setPreferCheckpointForRecovery(CheckpointForCleanup) >> >> Thanks, >> Vijay >> >