[ https://issues.apache.org/jira/browse/FLINK-24851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17445000#comment-17445000 ]
Arseniy Tashoyan commented on FLINK-24851: ------------------------------------------ [~renqs] thank you for your proposal - looks good to me. I would also suggest to have an overloaded method: {code:java} setProperties(Map<String, String> settings) {code} This makes coding easier - no need to create the Properties object. > KafkaSourceBuilder: auto.offset.reset is ignored > ------------------------------------------------ > > Key: FLINK-24851 > URL: https://issues.apache.org/jira/browse/FLINK-24851 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.14.0 > Reporter: Arseniy Tashoyan > Assignee: liwei li > Priority: Major > Labels: pull-request-available > > Creating KafkaSource like this: > {code:scala} > val props = new Properties() > props.put("bootstrap.servers", "localhost:9092") > props.put("group.id", "group1") > props.put("auto.offset.reset", "latest") > val kafkaSource = KafkaSource.builder[String]() > .setProperties(props) > .build() > {code} > The actually used value for _"auto.offset.reset"_ is *"earliest"* instead of > configured *"latest"*. > This occurs because _"auto.offset.reset"_ gets overridden by > _startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase()_. > The default value for _startingOffsetsInitializer_ is _"earliest"_. > This behavior is misleading. > This behavior imposes an inconvenience on configuring the Kafka connector. We > cannot use the Kafka setting _"auto.offset.reset"_ as-is. Instead we must > extract this particular setting from other settings and propagate to > _KafkaSourceBuilder.setStartingOffsets()_: > {code:scala} > val kafkaSource = KafkaSource.builder[String]() > .setProperties(props) > .setStartingOffsets( > OffsetsInitializer.committedOffsets( > OffsetResetStrategy.valueOf( > props.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) > .asInstanceOf[String] > .toUpperCase(Locale.ROOT) > ) > ) > ) > .build() > {code} > The expected behavior is to use the value of _"auto.offset.reset"_ provided > by _KafkaSourceBuilder.setProperties()_ - unless overridden via > _KafkaSourceBuilder. setStartingOffsets()_. -- This message was sent by Atlassian Jira (v8.20.1#820001)