请问这个邮件咋退订?


---- Replied Message ----
| From | bastien dine<bastien.d...@gmail.com> |
| Date | 06/15/2022 17:50 |
| To | Martijn Visser<martijnvis...@apache.org> |
| Cc | Jing Ge<j...@ververica.com>,
user <user@flink.apache.org> |
| Subject | Re: New KafkaSource API : Change in default behavior regarding 
starting offset |
Hello Martijn,


Thanks for the link to the release note, especially : 
"When resuming from the savepoint, please use 
setStartingOffsets(OffsetsInitializer.committedOffsets()) in the new 
KafkaSourceBuilder to transfer the offsets to the new source."
So earliest is the new default
We use for sure  .committedOffsets - we have it by default in our custom 
KafkaSource builder to be sure we do not read all the previous data (earliest)


What bother me is just this change in starting offset default behavior from 
FlinkKafkaConsumer to KafkaSource (this can lead to mistake)
In fact it happens that we drop some of our kafka source state to read again 
from kafka committed offset, but maybe nodoby does that ^^


Anyway thanks for the focus on the release note ! 


Best Regards,


------------------


Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io





Le mer. 15 juin 2022 à 10:58, Martijn Visser <martijnvis...@apache.org> a écrit 
:

Hi Bastien,


When the FlinkKafkaConsumer was deprecated in 1.14.0, the release notes 
included the instruction how to migrate from FlinkKafkaConsumer to 
KafkaConsumer [1]. Looking at the Kafka documentation [2], there is a section 
on how to upgrade to the latest connector version that I think is outdated. I'm 
leaning towards copying the migration instructions to the generic 
documentation. Do you think that would have sufficed? 


Best regards,


Martijn


[1] 
https://nightlies.apache.org/flink/flink-docs-master/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#upgrading-to-the-latest-connector-version


Op wo 15 jun. 2022 om 09:22 schreef bastien dine <bastien.d...@gmail.com>:

Hello jing,


This was the previous method in old Kafka consumer API, it has been removed in 
1.15, so source code is not in master anymore,
Yes I know for the new Offset initializer, committed offset + earliest as 
fallback can be used to have the same behavior as before
I just wanted to know whether this is a changed behavior or I am missing 
something






Bastien DINE
Freelance
Data Architect / Software Engineer / Sysadmin
http://bastiendine.io

   


Le mar. 14 juin 2022 à 23:08, Jing Ge <j...@ververica.com> a écrit :

Hi Bastien,


Thanks for asking. I didn't find any call of setStartFromGroupOffsets() within 
Flink in the master branch. Could you please point out the code that committed 
offset is used as default?   


W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets()
is used, an exception will be thrown at runtime in case there is no committed 
offset, which is useful if the user is intended to read from the committed 
offset but something is wrong. It might feel weird if it is used as default, 
because an exception will be thrown when users start new jobs with default 
settings.


Best regards,
Jing


On Tue, Jun 14, 2022 at 4:15 PM bastien dine <bastien.d...@gmail.com> wrote:

Hello everyone,


Does someone know why the starting offset behaviour has changed in the new 
Kafka Source ? 


This is now from earliest (code in KafkaSourceBuilder), doc says : 
"If offsets initializer is not specified, OffsetsInitializer.earliest() will be 
used by default." from : 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset


Before in old FlinkKafkaConsumer it was from committed offset (i.e : 
setStartFromGroupOffsets() method)


which match with this behaviour in new KafkaSource :   : 
OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST


This change can lead to big troubles if user pay no attention to this point 
when migrating from old KafkaConsumer to new KafkaSource,


Regards,
Bastien


------------------


Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io

Reply via email to