[ https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697189#comment-15697189 ]
ASF GitHub Bot commented on FLINK-4280: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89664876 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java --- @@ -218,26 +221,57 @@ public void run() { } } - // seek the consumer to the initial offsets + List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>(); for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) { if (partition.isOffsetDefined()) { LOG.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; seeking the consumer " + "to position {}", partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1); consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1); } else { - // for partitions that do not have offsets restored from a checkpoint/savepoint, - // we need to define our internal offset state for them using the initial offsets retrieved from Kafka - // by the KafkaConsumer, so that they are correctly checkpointed and committed on the next checkpoint + partitionsWithNoOffset.add(partition.getKafkaTopicPartition()); + } + } - long fetchedOffset = consumer.position(partition.getKafkaPartitionHandle()); + if (partitionsWithNoOffset.size() == subscribedPartitions().length) { + // if all partitions have no initial offsets, that means we're starting fresh + switch (startupMode) { + case EARLIEST: + LOG.info("Setting starting point as earliest offset for partitions {}", partitionsWithNoOffset); + + seekPartitionsToBeginning(consumer, convertKafkaPartitions(subscribedPartitions())); --- End diff -- The problem with this one is that the `seekToBeginning` method broke compatibility from 0.8 to 0.9+. In 0.8, it's `seekToBeginning(TopicPartition...)` while in 0.9+ it's `seekToBeginning(Collection<TopicPartition>)`. I'll integrate these seek methods into the `KafkaConsumerCallBridge` introduced in a recent PR. I'll be inevitable that we must redundantly do the Array -> List conversion because our `subscribedPartitions` is an Array, while 0.9+ methods take an API. For the 0.8 methods, instead of converting the list back to an array, I'll just iterate over the list and call `seekPartitionsToBeginning` for each one. > New Flink-specific option to set starting position of Kafka consumer without > respecting external offsets in ZK / Broker > ----------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-4280 > URL: https://issues.apache.org/jira/browse/FLINK-4280 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Currently, to start reading from the "earliest" and "latest" position in > topics for the Flink Kafka consumer, users set the Kafka config > {{auto.offset.reset}} in the provided properties configuration. > However, the way this config actually works might be a bit misleading if > users were trying to find a way to "read topics from a starting position". > The way the {{auto.offset.reset}} config works in the Flink Kafka consumer > resembles Kafka's original intent for the setting: first, existing external > offsets committed to the ZK / brokers will be checked; if none exists, then > will {{auto.offset.reset}} be respected. > I propose to add Flink-specific ways to define the starting position, without > taking into account the external offsets. The original behaviour (reference > external offsets first) can be changed to be a user option, so that the > behaviour can be retained for frequent Kafka users that may need some > collaboration with existing non-Flink Kafka consumer applications. > How users will interact with the Flink Kafka consumer after this is added, > with a newly introduced {{flink.starting-position}} config: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "earliest/latest"); > props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a > warning) > props.setProperty("group.id", "...") // this won't have effect on the > starting position anymore (may still be used in external offset committing) > ... > {code} > Or, reference external offsets in ZK / broker: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "external-offsets"); > props.setProperty("auto.offset.reset", "earliest/latest"); // default will be > latest > props.setProperty("group.id", "..."); // will be used to lookup external > offsets in ZK / broker on startup > ... > {code} > A thing we would need to decide on is what would the default value be for > {{flink.starting-position}}. > Two merits I see in adding this: > 1. This compensates the way users generally interpret "read from a starting > position". As the Flink Kafka connector is somewhat essentially a > "high-level" Kafka consumer for Flink users, I think it is reasonable to add > Flink-specific functionality that users will find useful, although it wasn't > supported in Kafka's original consumer designs. > 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is > used only to expose progress to the outside world, and not used to manipulate > how Kafka topics are read in Flink (unless users opt to do so)" is even more > definite and solid. There was some discussion in this PR > (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I > think adding this "decouples" more Flink's internal offset checkpointing from > the external Kafka's offset store. -- This message was sent by Atlassian JIRA (v6.3.4#6332)