hi. Please use English in the user mail list. If you want to unsubscribe the mail list, you can send mail to user-unsubscr...@flink.apache.org <user-zh-unsubscr...@flink.apache.org> .
Best, Shengkai liangzai <liangzai...@163.com> 于2022年6月19日周日 10:36写道: > 请问这个邮件咋退订? > > > ---- Replied Message ---- > From bastien dine<bastien.d...@gmail.com> <bastien.d...@gmail.com> > Date 06/15/2022 17:50 > To Martijn Visser<martijnvis...@apache.org> <martijnvis...@apache.org> > Cc Jing Ge<j...@ververica.com> <j...@ververica.com>, > user <user@flink.apache.org> <user@flink.apache.org> > Subject Re: New KafkaSource API : Change in default behavior regarding > starting offset > Hello Martijn, > > Thanks for the link to the release note, especially : > "When resuming from the savepoint, please use > setStartingOffsets(OffsetsInitializer.committedOffsets()) in the new > KafkaSourceBuilder to transfer the offsets to the new source." > So earliest is the new default > We use for sure .committedOffsets - we have it by default in our custom > KafkaSource builder to be sure we do not read all the previous data > (earliest) > > What bother me is just this change in starting offset default behavior > from FlinkKafkaConsumer to KafkaSource (this can lead to mistake) > In fact it happens that we drop some of our kafka source state to read > again from kafka committed offset, but maybe nodoby does that ^^ > > Anyway thanks for the focus on the release note ! > > Best Regards, > > ------------------ > > Bastien DINE > Data Architect / Software Engineer / Sysadmin > bastiendine.io > > > Le mer. 15 juin 2022 à 10:58, Martijn Visser <martijnvis...@apache.org> a > écrit : > >> Hi Bastien, >> >> When the FlinkKafkaConsumer was deprecated in 1.14.0, the release notes >> included the instruction how to migrate from FlinkKafkaConsumer to >> KafkaConsumer [1]. Looking at the Kafka documentation [2], there is a >> section on how to upgrade to the latest connector version that I think is >> outdated. I'm leaning towards copying the migration instructions to the >> generic documentation. Do you think that would have sufficed? >> >> Best regards, >> >> Martijn >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-master/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer >> [2] >> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#upgrading-to-the-latest-connector-version >> >> Op wo 15 jun. 2022 om 09:22 schreef bastien dine <bastien.d...@gmail.com >> >: >> >>> Hello jing, >>> >>> This was the previous method in old Kafka consumer API, it has been >>> removed in 1.15, so source code is not in master anymore, >>> Yes I know for the new Offset initializer, committed offset + earliest >>> as fallback can be used to have the same behavior as before >>> I just wanted to know whether this is a changed behavior or I am missing >>> something >>> >>> >>> >>> Bastien DINE >>> Freelance >>> Data Architect / Software Engineer / Sysadmin >>> http://bastiendine.io >>> >>> >>> >>> Le mar. 14 juin 2022 à 23:08, Jing Ge <j...@ververica.com> a écrit : >>> >>>> Hi Bastien, >>>> >>>> Thanks for asking. I didn't find any call of setStartFromGroupOffsets() >>>> within >>>> Flink in the master branch. Could you please point out the code that >>>> committed offset is used as default? >>>> >>>> W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets() >>>> is used, an exception will be thrown at runtime in case there is no >>>> committed offset, which is useful if the user is intended to read from the >>>> committed offset but something is wrong. It might feel weird if it is used >>>> as default, because an exception will be thrown when users start new jobs >>>> with default settings. >>>> >>>> Best regards, >>>> Jing >>>> >>>> On Tue, Jun 14, 2022 at 4:15 PM bastien dine <bastien.d...@gmail.com> >>>> wrote: >>>> >>>>> Hello everyone, >>>>> >>>>> Does someone know why the starting offset behaviour has changed in the >>>>> new Kafka Source ? >>>>> >>>>> This is now from earliest (code in KafkaSourceBuilder), doc says : >>>>> "If offsets initializer is not specified, >>>>> OffsetsInitializer.earliest() will be used by default." from : >>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset >>>>> >>>>> Before in old FlinkKafkaConsumer it was from committed offset (i.e : >>>>> setStartFromGroupOffsets() >>>>> method) >>>>> >>>>> which match with this behaviour in new KafkaSource : : >>>>> OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST >>>>> >>>>> This change can lead to big troubles if user pay no attention to this >>>>> point when migrating from old KafkaConsumer to new KafkaSource, >>>>> >>>>> Regards, >>>>> Bastien >>>>> >>>>> ------------------ >>>>> >>>>> Bastien DINE >>>>> Data Architect / Software Engineer / Sysadmin >>>>> bastiendine.io >>>>> >>>>