[ https://issues.apache.org/jira/browse/FLINK-24697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17683211#comment-17683211 ]
Qingsheng Ren commented on FLINK-24697: --------------------------------------- Thanks for the comment [~hejiefang] [~catyee] ! I checked the code and the default behavior was indeed changed in the case that if user specifies a new group id without committed offset, the reset strategy is NONE instead of LATEST as previous Flink versions. We do need a discussion or FLIP if the default behavior is different as this is part of public API, so this was a mistake :( As of the correctness of the behavior itself, personally I prefer to use NONE reset strategy if user sets a new group id, or more accurately, user should explicitly specify the offset reset strategy if the source is set to start from group offset. Default strategy should match with user's implicit expectation, and I've seen expectations from both sides (EARLIEST and LATEST). As I mentioned above, this requires discussion and consensus in the community. > Kafka table source cannot change the auto.offset.reset setting for > 'group-offsets' startup mode > ----------------------------------------------------------------------------------------------- > > Key: FLINK-24697 > URL: https://issues.apache.org/jira/browse/FLINK-24697 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka > Affects Versions: 1.14.0, 1.15.0 > Reporter: Hang Ruan > Assignee: Hang Ruan > Priority: Minor > Labels: pull-request-available > Fix For: 1.15.0 > > > Because Flink 1.13 SQL does not use the new Source API in FLIP-27, the > behavior to start from group offsets in flink 1.13 will use the kafka > 'auto.offset.reset' default value(latest), when the 'auto.offset.reset' > configuration is not set in table options. But in flink 1.13 we could change > the behavior by setting 'auto.offset.reset' to other values. See the method > {{setStartFromGroupOffsets in the class FlinkKafkaConsumerBase.}} > Flink 1.14 uses the new Source API, but we have no ways to change the default > 'auto.offset.reset' value when use 'group-offsets' startup mode. In > DataStream API, we could change it by > `kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy))`. > So we need the way to change auto offset reset configuration. > The design is that when 'auto.offset.reset' is set, the 'group-offsets' > startup mode will use the provided auto offset reset strategy, or else 'none' > reset strategy in order to be consistent with the DataStream API. -- This message was sent by Atlassian Jira (v8.20.10#820010)