Hi Max,

Sure, I was planning to do so, but wanted to see if it was a reasonable
feature to add before opening a JIRA :)
Here's the new JIRA: https://issues.apache.org/jira/browse/FLINK-4280

Regards,
Gordon

On Fri, Jul 29, 2016 at 4:03 PM, Maximilian Michels <m...@apache.org> wrote:

> Hi Tai,
>
> Should definitely be possible. Would you mind opening a JIRA issue
> with the description you posted?
>
> Thanks,
> Max
>
> On Thu, Jul 28, 2016 at 11:16 AM, Tai Gordon <tzuli...@gmail.com> wrote:
> > Hi Kevin,
> >
> > Just a re-clarification: for Kafka 0.9 it would be “earliest”, &
> “smallest”
> > for the older Kafka 0.8.
> >
> > I’m wondering whether or not it is reasonable to add a Flink-specific way
> > to set the consumer’s starting position to “earliest” and “latest”,
> without
> > respecting the external Kafka offset store. Perhaps we can change the
> > current behaviour (checking committed offsets in Kafka as starting point)
> > as a user option, and add new options to read from “earliest” and
> “latest”
> > regardless of the groupId and externally committed offsets. I think this
> > better matches how users usually interpret the functionality of setting
> > starting positions, while also keeping the “auto.offset.reset” behaviour
> > that frequent Kafka users are used to. Also, this would also more clearly
> > define that under the context of Flink, the external Kafka offset store
> is
> > used only to expose the consumers progress to the outside world, and not
> > used to manipulate how topics are read.
> >
> > Just an idea I have in mind, not sure if it would be a reasonable add.
> It’d
> > be great to hear what other think of this.
> >
> > Regards,
> > Gordon
> >
> >
> > On July 28, 2016 at 4:44:02 PM, Kevin Jacobs (kevin.jac...@cern.ch)
> wrote:
> >
> > Thank you Gordon and Max,
> >
> > Thank you Gordon, that explains the behaviour a bit better to me. I am
> > now adding the timestamp to the group ID and that is a good workaround
> > for now. The "smallest" option is unfortunately not available in this
> > version of the FlinkKafkaConsumer class.
> >
> > Cheers,
> > Kevin
> >
> >
> > On 28.07.2016 10:39, Maximilian Michels wrote:
> >> Hi Kevin,
> >>
> >> You need to use properties.setProperty("auto.offset.reset",
> >> "smallest") for Kafka 9 to start from the smallest offset. Note, that
> >> in Kafka 8 you need to use properties.setProperty("auto.offset.reset",
> >> "earliest") to achieve the same behavior.
> >>
> >> Kafka keeps track of the offsets per group id. If you have already
> >> read from a topic with a certain group id and want to restart from the
> >> smallest offset available, you need to generate a unique group id.
> >>
> >> Cheers,
> >> Max
> >>
> >> On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs <kevin.jac...@cern.ch>
> > wrote:
> >>> Hi,
> >>>
> >>> I am currently facing strange behaviour of the FlinkKafkaConsumer09
> > class. I
> >>> am using Flink 1.0.3.
> >>>
> >>> These are my properties:
> >>>
> >>> val properties = new Properties()
> >>> properties.setProperty("bootstrap.servers", config.urlKafka)
> >>> properties.setProperty("group.id", COLLECTOR_NAME)
> >>> properties.setProperty("auto.offset.reset", *"earliest"*)
> >>>
> >>> According to the new consumer API of Kafka, this should result in the
> >>> following:
> >>>
> >>> /auto.offset.reset: * smallest : automatically reset the offset to the
> >>> smallest offset/ (source:
> >>> https://kafka.apache.org/documentation.html#newconsumerapi)
> >>>
> >>> However, it starts from the latest item in my topic. Is this a bug or
> am
> > I
> >>> doing something wrong?
> >>>
> >>> Regards,
> >>> Kevin
> >>>
>



-- 
Tzu-Li (Gordon) Tai

Reply via email to