Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r101075570 --- Diff: docs/dev/connectors/kafka.md --- @@ -161,6 +161,46 @@ For convenience, Flink provides the following schemas: The KeyValue objectNode contains a "key" and "value" field which contain all fields, as well as an optional "metadata" field that exposes the offset/partition/topic for this message. +#### Kafka Consumers Start Position Configuration + +By default, the Flink Kafka Consumer starts reading partitions from the consumer group's (`group.id` setting in the +consumer properties) committed offsets in Kafka brokers (or Zookeeper for Kafka 0.8). + +This behaviour can be explicitly overriden, as demonstrated below: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...); +myConsumer.setStartFromEarliest(); // start from the earliest record possible +myConsumer.setStartFromLatest(); // start from the latest record +myConsumer.setStartFromGroupOffsets(); // the default behaviour --- End diff -- Okay, cool. Can you add that to the docs as well?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---