sorry, one more thought -- I've realized that the difficulties are also because I'm trying to guarantee read *exactly* once. The standard consumer group guarantees read *at least* once (with more than one read happening for those messages that get read, but then the process dies before the offset is committed to zookeeper). I guess you probably wouldn't want read exactly once w/out batch processing, or at least very low rate of messages, to avoid a ton of zookeeper updates. Hopefully this helps explain the use case more.
thanks On Thu, Nov 21, 2013 at 12:19 PM, Imran Rashid <im...@therashids.com> wrote: > Hi, > > thanks again for the quick answer on this. However, though this > solution works, it is *really* complicated to get the user code > correct if you are reading data with more than 1 thread. Before I > begin processing one batch of records, I have to make sure all of the > workers reading from kafka streams have stopped. However, those > workers could be blocked inside an iterator.hasNext(), and could get > unblocked at any time in the middle of my batch processing. This > makes it really tricky to ensure that the messages that made it into > the batch are *exactly* the same as the offsets of all workers at the > end of my batch processing, so that commitOffsets() is accurate. > > But, I think the good news is that this would become much simpler with > a small api change. Would it be OK to add another version of > commitOffsets, but instead that took the offsets as a parameter, > rather than using whatever the current position is? That way workers > could simply snapshot their current offset & their batch, and continue > processing. Another thread could process the batch & commit the > offsets associated w/ the batch. The coordination issues mostly > disappear (and you also might get higher throughput, since workers can > keep going during batch processing). > > Would such a change be reasonable? I could probably submit a patch for it. > > On Wed, Nov 20, 2013 at 9:05 AM, Imran Rashid <im...@therashids.com> wrote: >> perfect, thank you! >> >> On Wed, Nov 20, 2013 at 8:44 AM, Neha Narkhede <neha.narkh...@gmail.com> >> wrote: >>> You can turn off automatic offset commit (auto.commit.enable=false) and use >>> the commitOffsets() API. Note that this API will commit offsets for all >>> partitions owned by the consumer. >>> >>> Thanks, >>> Neha >>> On Nov 20, 2013 6:39 AM, "Imran Rashid" <im...@therashids.com> wrote: >>> >>>> Hi, >>>> >>>> I have an application which reads messages from a kafka queue, builds >>>> up a batch of messages, and then performs some action on that batch. >>>> So far I have just used the ConsumerGroup api. However, I realized >>>> there is a potential problem -- my app may die sometime in the middle >>>> of the batch, and then those messages could get completely lost. >>>> Kafka will think we've already read those messages, and so on restart >>>> it will skip to messages post batch. >>>> >>>> 1) Does the ConsumerGroup api have some way to allow you to control >>>> when the the position of the consumer-group is updated? I couldn't >>>> find anything relevant in ConsumerGroup, KafkaStream, or >>>> ConsumerIterator, but perhaps I overlooked something. >>>> >>>> 2) Do I need to switch to SimpleConsumer? It looks like that does not >>>> use zookeeper at all, I'd rather not have to manage all of the >>>> zookeeper stuff myself. >>>> >>>> This seems like a basic use case, so I'm probably missing something ... >>>> >>>> thanks, >>>> Imran >>>> >>>> >>>> (I'm using 0.8.0-beta1) >>>>