[ https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15690827#comment-15690827 ]
ASF GitHub Bot commented on FLINK-4280: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89365136 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java --- @@ -139,24 +143,65 @@ public void runFetchLoop() throws Exception { PeriodicOffsetCommitter periodicCommitter = null; try { - // read offsets from ZooKeeper for partitions that did not restore offsets - { - List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>(); - for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) { - if (!partition.isOffsetDefined()) { - partitionsWithNoOffset.add(partition.getKafkaTopicPartition()); - } + List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>(); + for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) { + if (!partition.isOffsetDefined()) { + partitionsWithNoOffset.add(partition.getKafkaTopicPartition()); } + } + + if (partitionsWithNoOffset.size() == subscribedPartitions().length) { + // if all partitions have no initial offsets, that means we're starting fresh without any restored state + switch (startupMode) { + case EARLIEST: + LOG.info("Setting starting point as earliest offset for partitions {}", partitionsWithNoOffset); + + for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) { + partition.setOffset(OffsetRequest.EarliestTime()); + } + break; + case LATEST: + LOG.info("Setting starting point as latest offset for partitions {}", partitionsWithNoOffset); + + for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) { + partition.setOffset(OffsetRequest.LatestTime()); + } + break; + default: + case GROUP_OFFSETS: + LOG.info("Using group offsets in Zookeeper of group.id {} as starting point for partitions {}", + kafkaConfig.getProperty("group.id"), partitionsWithNoOffset); + + Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset); + for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) { + Long offset = zkOffsets.get(partition.getKafkaTopicPartition()); + if (offset != null) { + // the committed offset in ZK represents the next record to process, + // so we subtract it by 1 to correctly represent internal state + partition.setOffset(offset - 1); + } else { + // if we can't find an offset for a partition in ZK when using GROUP_OFFSETS, + // we default to "auto.offset.reset" like the Kafka high-level consumer + LOG.warn("No group offset can be found for partition {} in Zookeeper;" + + " resetting starting offset to 'auto.offset.reset'", partition); + + partition.setOffset(invalidOffsetBehavior); + } + } + } + } else if (partitionsWithNoOffset.size() > 0 && partitionsWithNoOffset.size() < subscribedPartitions().length) { --- End diff -- I was adding this as a preparation for the kafka partition discovery task. But it'd probably make sense to remove it for this PR to avoid confusion. > New Flink-specific option to set starting position of Kafka consumer without > respecting external offsets in ZK / Broker > ----------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-4280 > URL: https://issues.apache.org/jira/browse/FLINK-4280 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Currently, to start reading from the "earliest" and "latest" position in > topics for the Flink Kafka consumer, users set the Kafka config > {{auto.offset.reset}} in the provided properties configuration. > However, the way this config actually works might be a bit misleading if > users were trying to find a way to "read topics from a starting position". > The way the {{auto.offset.reset}} config works in the Flink Kafka consumer > resembles Kafka's original intent for the setting: first, existing external > offsets committed to the ZK / brokers will be checked; if none exists, then > will {{auto.offset.reset}} be respected. > I propose to add Flink-specific ways to define the starting position, without > taking into account the external offsets. The original behaviour (reference > external offsets first) can be changed to be a user option, so that the > behaviour can be retained for frequent Kafka users that may need some > collaboration with existing non-Flink Kafka consumer applications. > How users will interact with the Flink Kafka consumer after this is added, > with a newly introduced {{flink.starting-position}} config: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "earliest/latest"); > props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a > warning) > props.setProperty("group.id", "...") // this won't have effect on the > starting position anymore (may still be used in external offset committing) > ... > {code} > Or, reference external offsets in ZK / broker: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "external-offsets"); > props.setProperty("auto.offset.reset", "earliest/latest"); // default will be > latest > props.setProperty("group.id", "..."); // will be used to lookup external > offsets in ZK / broker on startup > ... > {code} > A thing we would need to decide on is what would the default value be for > {{flink.starting-position}}. > Two merits I see in adding this: > 1. This compensates the way users generally interpret "read from a starting > position". As the Flink Kafka connector is somewhat essentially a > "high-level" Kafka consumer for Flink users, I think it is reasonable to add > Flink-specific functionality that users will find useful, although it wasn't > supported in Kafka's original consumer designs. > 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is > used only to expose progress to the outside world, and not used to manipulate > how Kafka topics are read in Flink (unless users opt to do so)" is even more > definite and solid. There was some discussion in this PR > (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I > think adding this "decouples" more Flink's internal offset checkpointing from > the external Kafka's offset store. -- This message was sent by Atlassian JIRA (v6.3.4#6332)