To explain what you're seeing: After you have run a consumer application once, it will have stored its latest consumer offsets in Kafka. Upon restart -- e.g. after a brief shutdown time as you mentioned -- the consumer application will continue processing from the point of these stored consumer offsets.
The auto.offset.reset setting that Snehal mentioned above takes effect if and only if there are *no* consumer offsets stored in Kafka yet (i.e. the typical situation where auto.offset.reset does take effect is if you are starting a consumer application for the very first time). This means that setting auto.offset.reset=latest won't be sufficient to solve your problem. To solve your problem you also need to do one of the following, in addition to setting auto.offset.reset=latest: 1. Delete the consumer offsets / the group (= group.id) of your consumer application and start fresh. Kafka's `kafka-consumer-groups.sh` command allows you to delete the stored consumer offsets / the group (if you are using the Confluent Platform, the command is called `kafka-consumer-group`, i.e. it does not have the `.sh` suffix). This is the approach that I would recommend. 2. Alternatively, as a crude workaround, you could also change the group.id setting of your consumer application whenever you restart it. Changing the group.id is, in this case, a workaround to starting the processing "from scratch", because using a new, never-used-before group.id implies that there are no stored consumer offsets in Kafka from previous runs. For your convenience I copy-pasted the help display of `kafka-consumer-groups.sh` below. If your consumer application uses Kafka's "new" consumer client, you must set the `--bootstrap-server` CLI option. If you are using the old consumer client, you must set the `--zookeeper` CLI option. Hope this helps, Michael $ ./kafka-consumer-groups List all consumer groups, describe a consumer group, or delete consumer group info. Option Description ------ ----------- --bootstrap-server <server to connect REQUIRED (only when using new- to> consumer): The server to connect to. --command-config <command config Property file containing configs to be property file> passed to Admin Client and Consumer. --delete Pass in groups to delete topic partition offsets and ownership information over the entire consumer group. For instance --group g1 -- group g2 Pass in groups with a single topic to just delete the given topic's partition offsets and ownership information for the given consumer groups. For instance --group g1 -- group g2 --topic t1 Pass in just a topic to delete the given topic's partition offsets and ownership information for every consumer group. For instance --topic t1 WARNING: Group deletion only works for old ZK-based consumer groups, and one has to use it carefully to only delete groups that are not active. --describe Describe consumer group and list offset lag related to given group. --group <consumer group> The consumer group we wish to act on. --list List all consumer groups. --new-consumer Use new consumer. --topic <topic> The topic whose consumer group information should be deleted. --zookeeper <urls> REQUIRED (unless new-consumer is used): The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over. On Tue, Jul 12, 2016 at 3:40 AM, BYEONG-GI KIM <bg...@bluedigm.com> wrote: > Thank you for the reply. > > I thought that was what I found, but unfortunately wasn't. > > The previous messages still be consumed while the consumer has been > re-executed with a few shutdown time... > > > > 2016-07-12 9:54 GMT+09:00 Snehal Nagmote <nagmote.sne...@gmail.com>: > > > Hello *,* > > > > If I understand your question correctly , what you are looking for is a > > setting in consumer which will only read latest messages . > > > > auto.offset.reset = latest > > > > This way , when you start new consumer group , it will always start > reading > > from the recent offset, > > > > > > Thanks, > > Snehal > > > > > > > > > > > > > > > > On 11 July 2016 at 17:38, BYEONG-GI KIM <bg...@bluedigm.com> wrote: > > > > > Hello. > > > > > > Generally, a Kafka Consumer consumes stored messages from Kafka > Broker(s) > > > when the Consumer has been executed. > > > > > > I, however, want to create a function that only consumes incoming > > messages > > > after executing, instead of consuming the previously stored messages > as I > > > mentioned above, for real-time application. > > > > > > Is it doable? If yes, how could it be implemented? Any example code or > > > reference would really be helpful for me. > > > > > > Thanks in advance! > > > > > > Best regards > > > > > > KIM > > > > > >