Hi Max, Sure, I was planning to do so, but wanted to see if it was a reasonable feature to add before opening a JIRA :) Here's the new JIRA: https://issues.apache.org/jira/browse/FLINK-4280
Regards, Gordon On Fri, Jul 29, 2016 at 4:03 PM, Maximilian Michels <m...@apache.org> wrote: > Hi Tai, > > Should definitely be possible. Would you mind opening a JIRA issue > with the description you posted? > > Thanks, > Max > > On Thu, Jul 28, 2016 at 11:16 AM, Tai Gordon <tzuli...@gmail.com> wrote: > > Hi Kevin, > > > > Just a re-clarification: for Kafka 0.9 it would be “earliest”, & > “smallest” > > for the older Kafka 0.8. > > > > I’m wondering whether or not it is reasonable to add a Flink-specific way > > to set the consumer’s starting position to “earliest” and “latest”, > without > > respecting the external Kafka offset store. Perhaps we can change the > > current behaviour (checking committed offsets in Kafka as starting point) > > as a user option, and add new options to read from “earliest” and > “latest” > > regardless of the groupId and externally committed offsets. I think this > > better matches how users usually interpret the functionality of setting > > starting positions, while also keeping the “auto.offset.reset” behaviour > > that frequent Kafka users are used to. Also, this would also more clearly > > define that under the context of Flink, the external Kafka offset store > is > > used only to expose the consumers progress to the outside world, and not > > used to manipulate how topics are read. > > > > Just an idea I have in mind, not sure if it would be a reasonable add. > It’d > > be great to hear what other think of this. > > > > Regards, > > Gordon > > > > > > On July 28, 2016 at 4:44:02 PM, Kevin Jacobs (kevin.jac...@cern.ch) > wrote: > > > > Thank you Gordon and Max, > > > > Thank you Gordon, that explains the behaviour a bit better to me. I am > > now adding the timestamp to the group ID and that is a good workaround > > for now. The "smallest" option is unfortunately not available in this > > version of the FlinkKafkaConsumer class. > > > > Cheers, > > Kevin > > > > > > On 28.07.2016 10:39, Maximilian Michels wrote: > >> Hi Kevin, > >> > >> You need to use properties.setProperty("auto.offset.reset", > >> "smallest") for Kafka 9 to start from the smallest offset. Note, that > >> in Kafka 8 you need to use properties.setProperty("auto.offset.reset", > >> "earliest") to achieve the same behavior. > >> > >> Kafka keeps track of the offsets per group id. If you have already > >> read from a topic with a certain group id and want to restart from the > >> smallest offset available, you need to generate a unique group id. > >> > >> Cheers, > >> Max > >> > >> On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs <kevin.jac...@cern.ch> > > wrote: > >>> Hi, > >>> > >>> I am currently facing strange behaviour of the FlinkKafkaConsumer09 > > class. I > >>> am using Flink 1.0.3. > >>> > >>> These are my properties: > >>> > >>> val properties = new Properties() > >>> properties.setProperty("bootstrap.servers", config.urlKafka) > >>> properties.setProperty("group.id", COLLECTOR_NAME) > >>> properties.setProperty("auto.offset.reset", *"earliest"*) > >>> > >>> According to the new consumer API of Kafka, this should result in the > >>> following: > >>> > >>> /auto.offset.reset: * smallest : automatically reset the offset to the > >>> smallest offset/ (source: > >>> https://kafka.apache.org/documentation.html#newconsumerapi) > >>> > >>> However, it starts from the latest item in my topic. Is this a bug or > am > > I > >>> doing something wrong? > >>> > >>> Regards, > >>> Kevin > >>> > -- Tzu-Li (Gordon) Tai