退订请发送任意消息至user-unsubscr...@flink.apache.org In order to unsubscribe, please send an email to user-unsubscr...@flink.apache.org
Thanks Best regards, Jing From: liangzai <liangzai...@163.com> Date: Sun, Jun 19, 2022 at 4:37 AM Subject: Re: New KafkaSource API: Change in default behavior regarding starting offset To: bastien dine <bastien.d...@gmail.com> Cc: Martijn Visser <martijnvis...@apache.org>, Jing Ge <j...@ververica.com>, user <user@flink.apache.org> 请问这个邮件咋退订? ---- Replied Message ---- >From bastien dine<bastien.d...@gmail.com> <bastien.d...@gmail.com> Date 06/15/2022 17:50 To Martijn Visser<martijnvis...@apache.org> <martijnvis...@apache.org> Cc Jing Ge<j...@ververica.com> <j...@ververica.com>, user <user@flink.apache.org> <user@flink.apache.org> Subject Re: New KafkaSource API : Change in default behavior regarding starting offset Hello Martijn, Thanks for the link to the release note, especially : "When resuming from the savepoint, please use setStartingOffsets(OffsetsInitializer.committedOffsets()) in the new KafkaSourceBuilder to transfer the offsets to the new source." So earliest is the new default We use for sure .committedOffsets - we have it by default in our custom KafkaSource builder to be sure we do not read all the previous data (earliest) What bother me is just this change in starting offset default behavior from FlinkKafkaConsumer to KafkaSource (this can lead to mistake) In fact it happens that we drop some of our kafka source state to read again from kafka committed offset, but maybe nodoby does that ^^ Anyway thanks for the focus on the release note ! Best Regards, ------------------ Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io Le mer. 15 juin 2022 à 10:58, Martijn Visser <martijnvis...@apache.org> a écrit : > Hi Bastien, > > When the FlinkKafkaConsumer was deprecated in 1.14.0, the release notes > included the instruction how to migrate from FlinkKafkaConsumer to > KafkaConsumer [1]. Looking at the Kafka documentation [2], there is a > section on how to upgrade to the latest connector version that I think is > outdated. I'm leaning towards copying the migration instructions to the > generic documentation. Do you think that would have sufficed? > > Best regards, > > Martijn > > [1] > https://nightlies.apache.org/flink/flink-docs-master/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer > [2] > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#upgrading-to-the-latest-connector-version > > Op wo 15 jun. 2022 om 09:22 schreef bastien dine <bastien.d...@gmail.com>: > >> Hello jing, >> >> This was the previous method in old Kafka consumer API, it has been >> removed in 1.15, so source code is not in master anymore, >> Yes I know for the new Offset initializer, committed offset + earliest as >> fallback can be used to have the same behavior as before >> I just wanted to know whether this is a changed behavior or I am missing >> something >> >> >> >> Bastien DINE >> Freelance >> Data Architect / Software Engineer / Sysadmin >> http://bastiendine.io >> >> >> >> Le mar. 14 juin 2022 à 23:08, Jing Ge <j...@ververica.com> a écrit : >> >>> Hi Bastien, >>> >>> Thanks for asking. I didn't find any call of setStartFromGroupOffsets() >>> within >>> Flink in the master branch. Could you please point out the code that >>> committed offset is used as default? >>> >>> W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets() >>> is used, an exception will be thrown at runtime in case there is no >>> committed offset, which is useful if the user is intended to read from the >>> committed offset but something is wrong. It might feel weird if it is used >>> as default, because an exception will be thrown when users start new jobs >>> with default settings. >>> >>> Best regards, >>> Jing >>> >>> On Tue, Jun 14, 2022 at 4:15 PM bastien dine <bastien.d...@gmail.com> >>> wrote: >>> >>>> Hello everyone, >>>> >>>> Does someone know why the starting offset behaviour has changed in the >>>> new Kafka Source ? >>>> >>>> This is now from earliest (code in KafkaSourceBuilder), doc says : >>>> "If offsets initializer is not specified, OffsetsInitializer.earliest() >>>> will >>>> be used by default." from : >>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset >>>> >>>> Before in old FlinkKafkaConsumer it was from committed offset (i.e : >>>> setStartFromGroupOffsets() >>>> method) >>>> >>>> which match with this behaviour in new KafkaSource : : >>>> OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST >>>> >>>> This change can lead to big troubles if user pay no attention to this >>>> point when migrating from old KafkaConsumer to new KafkaSource, >>>> >>>> Regards, >>>> Bastien >>>> >>>> ------------------ >>>> >>>> Bastien DINE >>>> Data Architect / Software Engineer / Sysadmin >>>> bastiendine.io >>>> >>>