Hello jing, This was the previous method in old Kafka consumer API, it has been removed in 1.15, so source code is not in master anymore, Yes I know for the new Offset initializer, committed offset + earliest as fallback can be used to have the same behavior as before I just wanted to know whether this is a changed behavior or I am missing something
Bastien DINE Freelance Data Architect / Software Engineer / Sysadmin http://bastiendine.io Le mar. 14 juin 2022 à 23:08, Jing Ge <j...@ververica.com> a écrit : > Hi Bastien, > > Thanks for asking. I didn't find any call of setStartFromGroupOffsets() within > Flink in the master branch. Could you please point out the code that > committed offset is used as default? > > W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets() > is used, an exception will be thrown at runtime in case there is no > committed offset, which is useful if the user is intended to read from the > committed offset but something is wrong. It might feel weird if it is used > as default, because an exception will be thrown when users start new jobs > with default settings. > > Best regards, > Jing > > On Tue, Jun 14, 2022 at 4:15 PM bastien dine <bastien.d...@gmail.com> > wrote: > >> Hello everyone, >> >> Does someone know why the starting offset behaviour has changed in the >> new Kafka Source ? >> >> This is now from earliest (code in KafkaSourceBuilder), doc says : >> "If offsets initializer is not specified, OffsetsInitializer.earliest() will >> be used by default." from : >> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset >> >> Before in old FlinkKafkaConsumer it was from committed offset (i.e : >> setStartFromGroupOffsets() >> method) >> >> which match with this behaviour in new KafkaSource : : >> OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST >> >> This change can lead to big troubles if user pay no attention to this >> point when migrating from old KafkaConsumer to new KafkaSource, >> >> Regards, >> Bastien >> >> ------------------ >> >> Bastien DINE >> Data Architect / Software Engineer / Sysadmin >> bastiendine.io >> >