I have question , if my service went down and i want to consume back whichever was not consumed from a given consumer group ,
how to handle that ? its either earliest or latest message , how about consuming all un-commited messages ? Keep learning keep moving ..... On Wed, Aug 23, 2017 at 10:04 AM, Dan Markhasin <minimi...@gmail.com> wrote: > Is your storm consumer set to auto.offset.reset="earliest"? > > On 22 August 2017 at 10:05, Elyahou Ittah <elyaho...@fiverr.com> wrote: > > > I checked the __consumer_offsets topic and here is an extraction from > this > > log for the same consumer group, a specific topic (users) and specific > > partition (15): > > > > [storm_kafka_topology,users,15]::[OffsetMetadata[8327,{ > > topic-partition=users-15, > > offset=8327, numFails=0, thread='Thread-11-kafkaSpout-executor[4 > > 4]'}],CommitTime 1503230031557,ExpirationTime 1503316431557] > > [storm_kafka_topology,users,15]::[OffsetMetadata[8330,{ > > topic-partition=users-15, > > offset=8330, numFails=0, thread='Thread-11-kafkaSpout-executor[4 > > 4]'}],CommitTime 1503230332504,ExpirationTime 1503316732504] > > [storm_kafka_topology,users,15]::[OffsetMetadata[6512,{ > > topic-partition=users-15, > > offset=6512, numFails=0, thread='Thread-11-kafkaSpout-executor[4 > > 4]'}],CommitTime 1503230748612,ExpirationTime 1503317148612] > > [storm_kafka_topology,users,15]::[OffsetMetadata[8172,{ > > topic-partition=users-15, > > offset=8172, numFails=0, thread='Thread-11-kafkaSpout-executor[4 > > 4]'}],CommitTime 1503230791209,ExpirationTime 1503317191209] > > [storm_kafka_topology,users,15]::[OffsetMetadata[8330,{ > > topic-partition=users-15, > > offset=8330, numFails=0, thread='Thread-11-kafkaSpout-executor[4 > > 4]'}],CommitTime 1503230821337,ExpirationTime 1503317221337] > > [storm_kafka_topology,users,15]::[OffsetMetadata[8333,{ > > topic-partition=users-15, > > offset=8333, numFails=0, thread='Thread-11-kafkaSpout-executor[4 > > 4]'}],CommitTime 1503231513311,ExpirationTime 1503317913311] > > [storm_kafka_topology,users,15]::[OffsetMetadata[8338,{ > > topic-partition=users-15, > > offset=8338, numFails=0, thread='Thread-11-kafkaSpout-executor[4 > > 4]'}],CommitTime 1503231603513,ExpirationTime 1503318003513] > > [storm_kafka_topology,users,15]::[OffsetMetadata[8344,{ > > topic-partition=users-15, > > offset=8344, numFails=0, thread='Thread-11-kafkaSpout-executor[4 > > 4]'}],CommitTime 1503231693829,ExpirationTime 1503318093829] > > > > we can see here that the consumer was at offset 8330 at Sunday, August > 20, > > 2017 11:53:51.557 AM and at offset 6512 somes minutes after (the kafka > > restart occured at this time) > > > > What can explain the consumer group to rewind partition like this ? > > > > On Mon, Aug 21, 2017 at 11:10 AM, Elyahou Ittah <elyaho...@fiverr.com> > > wrote: > > > > > attached log file > > > > > > On Mon, Aug 21, 2017 at 11:06 AM, Elyahou Ittah <elyaho...@fiverr.com> > > > wrote: > > > > > >> I am consuming from kafka using KafkaSpout of Storm and also in ruby > > >> using ruby-kafka gem (both use new consumer API). > > >> > > >> I noticed that after a rolling restart of the kafka cluster. The > > >> kafkaSpout reconsumed all kafka messages ignoring the committed > > offsets... > > >> > > >> What can cause this behavior ? > > >> > > >> Attached kafka logs at this time (storm consumers are storm_consumer_1 > > >> and storm_consumer_2 and ruby consumer is ruby_kafka_consumer) > > >> > > >> I see these many lines like these for storm consumer but not for ruby > > >> consumer: > > >> > > >> [2017-08-20 12:03:54,270] INFO [GroupCoordinator 0]: Group > > >> storm_consumer_2 with generation 52 is now empty > (__consumer_offsets-48) > > >> (kafka.coordinator.group.GroupCoordinator) > > >> [2017-08-20 12:03:54,701] INFO [GroupCoordinator 0]: Group > > >> storm_consumer_2 with generation 56 is now empty > (__consumer_offsets-48) > > >> (kafka.coordinator.group.GroupCoordinator) > > >> > > > > > > > > >