Hi Andrey,

Thanks,
what is recommendation for :  env.getCheckpointConfig.
*setMaxConcurrentCheckpoints*(concurrentchckpt) ?

1 or higher based on what factor.


Regards,
Vijay


On Tue, Aug 25, 2020 at 8:55 AM Andrey Zagrebin <azagre...@apache.org>
wrote:

> Hi Vijay,
>
> I think it depends on your job requirements, in particular how many
> records are processed per second and how much resources you have to process
> them.
>
> If the checkpointing interval is short then the checkpointing overhead can
> be too high and you need more resources to efficiently keep up with the
> incoming streaming.
>
> If the checkpointing interval is long, more records are batched together
> and the throughput is better.
> On the other hand, the observed latency is lower because the batched
> results get flushed into the files and become visible in the external
> system only when checkpoint occurs to provide exactly once guarantee.
>
> Best,
> Andrey
>
> On Mon, Aug 24, 2020 at 6:18 PM Vijayendra Yadav <contact....@gmail.com>
> wrote:
>
>> Hi Team,
>>
>> Bulk Formats can only have `OnCheckpointRollingPolicy`, which rolls
>> (ONLY) on every checkpoint.
>>
>> *.withRollingPolicy(OnCheckpointRollingPolicy.build())*
>>
>> Question: What are recommended values related to checkpointing to
>> fsstate, should it be more frequent checkpoints, or longer intervals, how
>> many concurrent checkpoints needs to be allowed, how much should be an
>> ideal pause between each checkpoint.
>> Is there a way to control batch size here other than time ? any
>> recommendations to all the parameters listed below?
>> *Note: *I am trying to improve sink throughput.
>>
>>
>> env.enableCheckpointing(chckptintervalmilli)
>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.valueOf(ChckptMode))
>>
>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(chckptintervalmilligap)
>> env.getCheckpointConfig.setCheckpointTimeout(chckptduration)
>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(concurrentchckpt)
>>
>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.valueOf(CheckpointCleanup))
>>
>>  env.getCheckpointConfig.setPreferCheckpointForRecovery(CheckpointForCleanup)
>>
>> Thanks,
>> Vijay
>>
>

Reply via email to