To explain what you're seeing:  After you have run a consumer application
once, it will have stored its latest consumer offsets in Kafka.  Upon
restart -- e.g. after a brief shutdown time as you mentioned -- the
consumer application will continue processing from the point of these
stored consumer offsets.

The auto.offset.reset setting that Snehal mentioned above takes effect if
and only if there are *no* consumer offsets stored in Kafka yet (i.e. the
typical situation where auto.offset.reset does take effect is if you are
starting a consumer application for the very first time).  This means that
setting auto.offset.reset=latest won't be sufficient to solve your problem.

To solve your problem you also need to do one of the following, in addition
to setting auto.offset.reset=latest:

1. Delete the consumer offsets / the group (= group.id) of your consumer
application and start fresh.  Kafka's `kafka-consumer-groups.sh` command
allows you to delete the stored consumer offsets / the group (if you are
using the Confluent Platform, the command is called `kafka-consumer-group`,
i.e. it does not have the `.sh` suffix).  This is the approach that I would
recommend.

2. Alternatively, as a crude workaround, you could also change the group.id
setting of your consumer application whenever you restart it.  Changing the
group.id is, in this case, a workaround to starting the processing "from
scratch", because using a new, never-used-before group.id implies that
there are no stored consumer offsets in Kafka from previous runs.

For your convenience I copy-pasted the help display of
`kafka-consumer-groups.sh` below.  If your consumer application uses
Kafka's "new" consumer client, you must set the `--bootstrap-server` CLI
option.  If you are using the old consumer client, you must set the
`--zookeeper` CLI option.

Hope this helps,
Michael


$ ./kafka-consumer-groups
List all consumer groups, describe a consumer group, or delete consumer
group info.
Option                                 Description
------                                 -----------
--bootstrap-server <server to connect  REQUIRED (only when using new-
  to>                                    consumer): The server to connect
to.
--command-config <command config       Property file containing configs to
be
  property file>                         passed to Admin Client and
Consumer.
--delete                               Pass in groups to delete topic
                                         partition offsets and ownership
                                         information over the entire
consumer
                                         group. For instance --group g1 --
                                         group g2
                                       Pass in groups with a single topic to
                                         just delete the given topic's
                                         partition offsets and ownership
                                         information for the given consumer
                                         groups. For instance --group g1 --
                                         group g2 --topic t1
                                       Pass in just a topic to delete the
                                         given topic's partition offsets and
                                         ownership information for every
                                         consumer group. For instance
--topic
                                         t1
                                       WARNING: Group deletion only works
for
                                         old ZK-based consumer groups, and
                                         one has to use it carefully to only
                                         delete groups that are not active.
--describe                             Describe consumer group and list
                                         offset lag related to given group.
--group <consumer group>               The consumer group we wish to act on.
--list                                 List all consumer groups.
--new-consumer                         Use new consumer.
--topic <topic>                        The topic whose consumer group
                                         information should be deleted.
--zookeeper <urls>                     REQUIRED (unless new-consumer is
                                         used): The connection string for
the
                                         zookeeper connection in the form
                                         host:port. Multiple URLS can be
                                         given to allow fail-over.



On Tue, Jul 12, 2016 at 3:40 AM, BYEONG-GI KIM <bg...@bluedigm.com> wrote:

> Thank you for the reply.
>
> I thought that was what I found, but unfortunately wasn't.
>
> The previous messages still be consumed while the consumer has been
> re-executed with a few shutdown time...
>
>
>
> 2016-07-12 9:54 GMT+09:00 Snehal Nagmote <nagmote.sne...@gmail.com>:
>
> > Hello *,*
> >
> > If I understand your question correctly , what you are looking for is a
> > setting in consumer which will only read latest messages .
> >
> > auto.offset.reset = latest
> >
> > This way , when you start new consumer group , it will always start
> reading
> > from the recent offset,
> >
> >
> > Thanks,
> > Snehal
> >
> >
> >
> >
> >
> >
> >
> > On 11 July 2016 at 17:38, BYEONG-GI KIM <bg...@bluedigm.com> wrote:
> >
> > > Hello.
> > >
> > > Generally, a Kafka Consumer consumes stored messages from Kafka
> Broker(s)
> > > when the Consumer has been executed.
> > >
> > > I, however, want to create a function that only consumes incoming
> > messages
> > > after executing, instead of consuming the previously stored messages
> as I
> > > mentioned above, for real-time application.
> > >
> > > Is it doable? If yes, how could it be implemented? Any example code or
> > > reference would really be helpful for me.
> > >
> > > Thanks in advance!
> > >
> > > Best regards
> > >
> > > KIM
> > >
> >
>

Reply via email to