Github user tzulitai commented on a diff in the pull request: --- Diff: docs/dev/connectors/ --- @@ -161,6 +161,46 @@ For convenience, Flink provides the following schemas: The KeyValue objectNode contains a "key" and "value" field which contain all fields, as well as an optional "metadata" field that exposes the offset/partition/topic for this message. +#### Kafka Consumers Start Position Configuration + +By default, the Flink Kafka Consumer starts reading partitions from the consumer group's (`` setting in the +consumer properties) committed offsets in Kafka brokers (or Zookeeper for Kafka 0.8). + +This behaviour can be explicitly overriden, as demonstrated below: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...); +myConsumer.setStartFromEarliest(); // start from the earliest record possible +myConsumer.setStartFromLatest(); // start from the latest record +myConsumer.setStartFromGroupOffsets(); // the default behaviour + +DataStream<String> stream = env.addSource(myConsumer); +... +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() + +val myConsumer = new FlinkKafkaConsumer08[String](...) +myConsumer.setStartFromEarliest() // start from the earliest record possible +myConsumer.setStartFromLatest() // start from the latest record +myConsumer.setStartFromGroupOffsets() // the default behaviour + +val stream = env.addSource(myConsumer) +... +{% endhighlight %} +</div> +</div> + +All versions of the Flink Kafka Consumer have the above explicit configuration methods for start position. When +configured to start from the earliest or latest record by calling either `setStartFromEarliest()` or `setStartFromLatest()`, +the consumer will ignore any committed group offsets in Kafka when determining the start position for partitions. --- End diff -- Good point, will add.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at or file a JIRA ticket with INFRA. ---