Hi Santosh,

It’s best to avoid cross-posting. Let’s keep the discussion to SO.

Best regards,
Niklas

> On 12. Feb 2022, at 16:39, santosh joshi <santoshjoshi2...@gmail.com> wrote:
> 
> We are migrating to KafkaSource from FlinkKafkaConsumer. We have disabled 
> auto commit of offset and instead committing them manually to some external 
> store
> 
> We override FlinkKafkaConsumer and then on an overridden instance of 
> KafkaFetcher we try to store the offset in some external store by overriding 
> doCommitInternalOffsetsToKafka
> 
>  protected void doCommitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> 
> offsets,
>         @Nonnull KafkaCommitCallback commitCallback) throws Exception {
> //Store offset in S3
> }
> 
> Now In order to migrate we tried coping/overriding KafkaSource, 
> KafkaSourceBuilder and KafkaSourceReade, but looks like a lot of redundant 
> code which somehow does not look correct
> 
> In Custom KafkaSourceReader I tried overriding snapshotState
> 
> @Override
> public List<KafkaPartitionSplit> snapshotState(long checkpointId) {
>    // custom logic to store offset in s3
>    return super.snapshotState(checkpointId);
> }
> 
> Is this correct or Is there any other way to achieve the same.
> 
> I have asked the similar questions in Stackoverflow
> 
> 
> Regards,
> Santosh 

Reply via email to