Hi Santosh, It’s best to avoid cross-posting. Let’s keep the discussion to SO.
Best regards, Niklas > On 12. Feb 2022, at 16:39, santosh joshi <santoshjoshi2...@gmail.com> wrote: > > We are migrating to KafkaSource from FlinkKafkaConsumer. We have disabled > auto commit of offset and instead committing them manually to some external > store > > We override FlinkKafkaConsumer and then on an overridden instance of > KafkaFetcher we try to store the offset in some external store by overriding > doCommitInternalOffsetsToKafka > > protected void doCommitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> > offsets, > @Nonnull KafkaCommitCallback commitCallback) throws Exception { > //Store offset in S3 > } > > Now In order to migrate we tried coping/overriding KafkaSource, > KafkaSourceBuilder and KafkaSourceReade, but looks like a lot of redundant > code which somehow does not look correct > > In Custom KafkaSourceReader I tried overriding snapshotState > > @Override > public List<KafkaPartitionSplit> snapshotState(long checkpointId) { > // custom logic to store offset in s3 > return super.snapshotState(checkpointId); > } > > Is this correct or Is there any other way to achieve the same. > > I have asked the similar questions in Stackoverflow > > > Regards, > Santosh