Hi, thanks again for the quick answer on this. However, though this solution works, it is *really* complicated to get the user code correct if you are reading data with more than 1 thread. Before I begin processing one batch of records, I have to make sure all of the workers reading from kafka streams have stopped. However, those workers could be blocked inside an iterator.hasNext(), and could get unblocked at any time in the middle of my batch processing. This makes it really tricky to ensure that the messages that made it into the batch are *exactly* the same as the offsets of all workers at the end of my batch processing, so that commitOffsets() is accurate.
But, I think the good news is that this would become much simpler with a small api change. Would it be OK to add another version of commitOffsets, but instead that took the offsets as a parameter, rather than using whatever the current position is? That way workers could simply snapshot their current offset & their batch, and continue processing. Another thread could process the batch & commit the offsets associated w/ the batch. The coordination issues mostly disappear (and you also might get higher throughput, since workers can keep going during batch processing). Would such a change be reasonable? I could probably submit a patch for it. On Wed, Nov 20, 2013 at 9:05 AM, Imran Rashid <im...@therashids.com> wrote: > perfect, thank you! > > On Wed, Nov 20, 2013 at 8:44 AM, Neha Narkhede <neha.narkh...@gmail.com> > wrote: >> You can turn off automatic offset commit (auto.commit.enable=false) and use >> the commitOffsets() API. Note that this API will commit offsets for all >> partitions owned by the consumer. >> >> Thanks, >> Neha >> On Nov 20, 2013 6:39 AM, "Imran Rashid" <im...@therashids.com> wrote: >> >>> Hi, >>> >>> I have an application which reads messages from a kafka queue, builds >>> up a batch of messages, and then performs some action on that batch. >>> So far I have just used the ConsumerGroup api. However, I realized >>> there is a potential problem -- my app may die sometime in the middle >>> of the batch, and then those messages could get completely lost. >>> Kafka will think we've already read those messages, and so on restart >>> it will skip to messages post batch. >>> >>> 1) Does the ConsumerGroup api have some way to allow you to control >>> when the the position of the consumer-group is updated? I couldn't >>> find anything relevant in ConsumerGroup, KafkaStream, or >>> ConsumerIterator, but perhaps I overlooked something. >>> >>> 2) Do I need to switch to SimpleConsumer? It looks like that does not >>> use zookeeper at all, I'd rather not have to manage all of the >>> zookeeper stuff myself. >>> >>> This seems like a basic use case, so I'm probably missing something ... >>> >>> thanks, >>> Imran >>> >>> >>> (I'm using 0.8.0-beta1) >>>