Another option is to set enable.auto.commit=false and never commit the offset, it should always go back to latest that way.
On Tue, Jul 12, 2016 at 1:34 PM Michael Noll <mich...@confluent.io> wrote: > To explain what you're seeing: After you have run a consumer application > once, it will have stored its latest consumer offsets in Kafka. Upon > restart -- e.g. after a brief shutdown time as you mentioned -- the > consumer application will continue processing from the point of these > stored consumer offsets. > > The auto.offset.reset setting that Snehal mentioned above takes effect if > and only if there are *no* consumer offsets stored in Kafka yet (i.e. the > typical situation where auto.offset.reset does take effect is if you are > starting a consumer application for the very first time). This means that > setting auto.offset.reset=latest won't be sufficient to solve your problem. > > To solve your problem you also need to do one of the following, in addition > to setting auto.offset.reset=latest: > > 1. Delete the consumer offsets / the group (= group.id) of your consumer > application and start fresh. Kafka's `kafka-consumer-groups.sh` command > allows you to delete the stored consumer offsets / the group (if you are > using the Confluent Platform, the command is called `kafka-consumer-group`, > i.e. it does not have the `.sh` suffix). This is the approach that I would > recommend. > > 2. Alternatively, as a crude workaround, you could also change the > group.id > setting of your consumer application whenever you restart it. Changing the > group.id is, in this case, a workaround to starting the processing "from > scratch", because using a new, never-used-before group.id implies that > there are no stored consumer offsets in Kafka from previous runs. > > For your convenience I copy-pasted the help display of > `kafka-consumer-groups.sh` below. If your consumer application uses > Kafka's "new" consumer client, you must set the `--bootstrap-server` CLI > option. If you are using the old consumer client, you must set the > `--zookeeper` CLI option. > > Hope this helps, > Michael > > > $ ./kafka-consumer-groups > List all consumer groups, describe a consumer group, or delete consumer > group info. > Option Description > ------ ----------- > --bootstrap-server <server to connect REQUIRED (only when using new- > to> consumer): The server to connect > to. > --command-config <command config Property file containing configs to > be > property file> passed to Admin Client and > Consumer. > --delete Pass in groups to delete topic > partition offsets and ownership > information over the entire > consumer > group. For instance --group g1 -- > group g2 > Pass in groups with a single topic > to > just delete the given topic's > partition offsets and ownership > information for the given consumer > groups. For instance --group g1 -- > group g2 --topic t1 > Pass in just a topic to delete the > given topic's partition offsets > and > ownership information for every > consumer group. For instance > --topic > t1 > WARNING: Group deletion only works > for > old ZK-based consumer groups, and > one has to use it carefully to > only > delete groups that are not active. > --describe Describe consumer group and list > offset lag related to given group. > --group <consumer group> The consumer group we wish to act > on. > --list List all consumer groups. > --new-consumer Use new consumer. > --topic <topic> The topic whose consumer group > information should be deleted. > --zookeeper <urls> REQUIRED (unless new-consumer is > used): The connection string for > the > zookeeper connection in the form > host:port. Multiple URLS can be > given to allow fail-over. > > > > On Tue, Jul 12, 2016 at 3:40 AM, BYEONG-GI KIM <bg...@bluedigm.com> wrote: > > > Thank you for the reply. > > > > I thought that was what I found, but unfortunately wasn't. > > > > The previous messages still be consumed while the consumer has been > > re-executed with a few shutdown time... > > > > > > > > 2016-07-12 9:54 GMT+09:00 Snehal Nagmote <nagmote.sne...@gmail.com>: > > > > > Hello *,* > > > > > > If I understand your question correctly , what you are looking for is a > > > setting in consumer which will only read latest messages . > > > > > > auto.offset.reset = latest > > > > > > This way , when you start new consumer group , it will always start > > reading > > > from the recent offset, > > > > > > > > > Thanks, > > > Snehal > > > > > > > > > > > > > > > > > > > > > > > > On 11 July 2016 at 17:38, BYEONG-GI KIM <bg...@bluedigm.com> wrote: > > > > > > > Hello. > > > > > > > > Generally, a Kafka Consumer consumes stored messages from Kafka > > Broker(s) > > > > when the Consumer has been executed. > > > > > > > > I, however, want to create a function that only consumes incoming > > > messages > > > > after executing, instead of consuming the previously stored messages > > as I > > > > mentioned above, for real-time application. > > > > > > > > Is it doable? If yes, how could it be implemented? Any example code > or > > > > reference would really be helpful for me. > > > > > > > > Thanks in advance! > > > > > > > > Best regards > > > > > > > > KIM > > > > > > > > > >