Hi Kevin, Just a re-clarification: for Kafka 0.9 it would be “earliest”, & “smallest” for the older Kafka 0.8.
I’m wondering whether or not it is reasonable to add a Flink-specific way to set the consumer’s starting position to “earliest” and “latest”, without respecting the external Kafka offset store. Perhaps we can change the current behaviour (checking committed offsets in Kafka as starting point) as a user option, and add new options to read from “earliest” and “latest” regardless of the groupId and externally committed offsets. I think this better matches how users usually interpret the functionality of setting starting positions, while also keeping the “auto.offset.reset” behaviour that frequent Kafka users are used to. Also, this would also more clearly define that under the context of Flink, the external Kafka offset store is used only to expose the consumers progress to the outside world, and not used to manipulate how topics are read. Just an idea I have in mind, not sure if it would be a reasonable add. It’d be great to hear what other think of this. Regards, Gordon On July 28, 2016 at 4:44:02 PM, Kevin Jacobs (kevin.jac...@cern.ch) wrote: Thank you Gordon and Max, Thank you Gordon, that explains the behaviour a bit better to me. I am now adding the timestamp to the group ID and that is a good workaround for now. The "smallest" option is unfortunately not available in this version of the FlinkKafkaConsumer class. Cheers, Kevin On 28.07.2016 10:39, Maximilian Michels wrote: > Hi Kevin, > > You need to use properties.setProperty("auto.offset.reset", > "smallest") for Kafka 9 to start from the smallest offset. Note, that > in Kafka 8 you need to use properties.setProperty("auto.offset.reset", > "earliest") to achieve the same behavior. > > Kafka keeps track of the offsets per group id. If you have already > read from a topic with a certain group id and want to restart from the > smallest offset available, you need to generate a unique group id. > > Cheers, > Max > > On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs <kevin.jac...@cern.ch> wrote: >> Hi, >> >> I am currently facing strange behaviour of the FlinkKafkaConsumer09 class. I >> am using Flink 1.0.3. >> >> These are my properties: >> >> val properties = new Properties() >> properties.setProperty("bootstrap.servers", config.urlKafka) >> properties.setProperty("group.id", COLLECTOR_NAME) >> properties.setProperty("auto.offset.reset", *"earliest"*) >> >> According to the new consumer API of Kafka, this should result in the >> following: >> >> /auto.offset.reset: * smallest : automatically reset the offset to the >> smallest offset/ (source: >> https://kafka.apache.org/documentation.html#newconsumerapi) >> >> However, it starts from the latest item in my topic. Is this a bug or am I >> doing something wrong? >> >> Regards, >> Kevin >>