Another option is to set enable.auto.commit=false and never commit the
offset, it should always go back to latest that way.

On Tue, Jul 12, 2016 at 1:34 PM Michael Noll <mich...@confluent.io> wrote:

> To explain what you're seeing:  After you have run a consumer application
> once, it will have stored its latest consumer offsets in Kafka.  Upon
> restart -- e.g. after a brief shutdown time as you mentioned -- the
> consumer application will continue processing from the point of these
> stored consumer offsets.
>
> The auto.offset.reset setting that Snehal mentioned above takes effect if
> and only if there are *no* consumer offsets stored in Kafka yet (i.e. the
> typical situation where auto.offset.reset does take effect is if you are
> starting a consumer application for the very first time).  This means that
> setting auto.offset.reset=latest won't be sufficient to solve your problem.
>
> To solve your problem you also need to do one of the following, in addition
> to setting auto.offset.reset=latest:
>
> 1. Delete the consumer offsets / the group (= group.id) of your consumer
> application and start fresh.  Kafka's `kafka-consumer-groups.sh` command
> allows you to delete the stored consumer offsets / the group (if you are
> using the Confluent Platform, the command is called `kafka-consumer-group`,
> i.e. it does not have the `.sh` suffix).  This is the approach that I would
> recommend.
>
> 2. Alternatively, as a crude workaround, you could also change the
> group.id
> setting of your consumer application whenever you restart it.  Changing the
> group.id is, in this case, a workaround to starting the processing "from
> scratch", because using a new, never-used-before group.id implies that
> there are no stored consumer offsets in Kafka from previous runs.
>
> For your convenience I copy-pasted the help display of
> `kafka-consumer-groups.sh` below.  If your consumer application uses
> Kafka's "new" consumer client, you must set the `--bootstrap-server` CLI
> option.  If you are using the old consumer client, you must set the
> `--zookeeper` CLI option.
>
> Hope this helps,
> Michael
>
>
> $ ./kafka-consumer-groups
> List all consumer groups, describe a consumer group, or delete consumer
> group info.
> Option                                 Description
> ------                                 -----------
> --bootstrap-server <server to connect  REQUIRED (only when using new-
>   to>                                    consumer): The server to connect
> to.
> --command-config <command config       Property file containing configs to
> be
>   property file>                         passed to Admin Client and
> Consumer.
> --delete                               Pass in groups to delete topic
>                                          partition offsets and ownership
>                                          information over the entire
> consumer
>                                          group. For instance --group g1 --
>                                          group g2
>                                        Pass in groups with a single topic
> to
>                                          just delete the given topic's
>                                          partition offsets and ownership
>                                          information for the given consumer
>                                          groups. For instance --group g1 --
>                                          group g2 --topic t1
>                                        Pass in just a topic to delete the
>                                          given topic's partition offsets
> and
>                                          ownership information for every
>                                          consumer group. For instance
> --topic
>                                          t1
>                                        WARNING: Group deletion only works
> for
>                                          old ZK-based consumer groups, and
>                                          one has to use it carefully to
> only
>                                          delete groups that are not active.
> --describe                             Describe consumer group and list
>                                          offset lag related to given group.
> --group <consumer group>               The consumer group we wish to act
> on.
> --list                                 List all consumer groups.
> --new-consumer                         Use new consumer.
> --topic <topic>                        The topic whose consumer group
>                                          information should be deleted.
> --zookeeper <urls>                     REQUIRED (unless new-consumer is
>                                          used): The connection string for
> the
>                                          zookeeper connection in the form
>                                          host:port. Multiple URLS can be
>                                          given to allow fail-over.
>
>
>
> On Tue, Jul 12, 2016 at 3:40 AM, BYEONG-GI KIM <bg...@bluedigm.com> wrote:
>
> > Thank you for the reply.
> >
> > I thought that was what I found, but unfortunately wasn't.
> >
> > The previous messages still be consumed while the consumer has been
> > re-executed with a few shutdown time...
> >
> >
> >
> > 2016-07-12 9:54 GMT+09:00 Snehal Nagmote <nagmote.sne...@gmail.com>:
> >
> > > Hello *,*
> > >
> > > If I understand your question correctly , what you are looking for is a
> > > setting in consumer which will only read latest messages .
> > >
> > > auto.offset.reset = latest
> > >
> > > This way , when you start new consumer group , it will always start
> > reading
> > > from the recent offset,
> > >
> > >
> > > Thanks,
> > > Snehal
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On 11 July 2016 at 17:38, BYEONG-GI KIM <bg...@bluedigm.com> wrote:
> > >
> > > > Hello.
> > > >
> > > > Generally, a Kafka Consumer consumes stored messages from Kafka
> > Broker(s)
> > > > when the Consumer has been executed.
> > > >
> > > > I, however, want to create a function that only consumes incoming
> > > messages
> > > > after executing, instead of consuming the previously stored messages
> > as I
> > > > mentioned above, for real-time application.
> > > >
> > > > Is it doable? If yes, how could it be implemented? Any example code
> or
> > > > reference would really be helpful for me.
> > > >
> > > > Thanks in advance!
> > > >
> > > > Best regards
> > > >
> > > > KIM
> > > >
> > >
> >
>

Reply via email to