sorry, one more thought --

I've realized that the difficulties are also because I'm trying to
guarantee read *exactly* once.  The standard consumer group guarantees
read *at least* once (with more than one read happening for those
messages that get read, but then the process dies before the offset is
committed to zookeeper).  I guess you probably wouldn't want read
exactly once w/out batch processing, or at least very low rate of
messages, to avoid a ton of zookeeper updates.  Hopefully this helps
explain the use case more.

thanks

On Thu, Nov 21, 2013 at 12:19 PM, Imran Rashid <im...@therashids.com> wrote:
> Hi,
>
> thanks again for the quick answer on this.  However, though this
> solution works, it is *really* complicated to get the user code
> correct if you are reading data with more than 1 thread.  Before I
> begin processing one batch of records, I have to make sure all of the
> workers reading from kafka streams have stopped.  However, those
> workers could be blocked inside an iterator.hasNext(), and could get
> unblocked at any time in the middle of my batch processing.  This
> makes it really tricky to ensure that the messages that made it into
> the batch are *exactly* the same as the offsets of all workers at the
> end of my batch processing, so that commitOffsets() is accurate.
>
> But, I think the good news is that this would become much simpler with
> a small api change.  Would it be OK to add another version of
> commitOffsets, but instead that took the offsets as a parameter,
> rather than using whatever the current position is?  That way workers
> could simply snapshot their current offset & their batch, and continue
> processing.  Another thread could process the batch & commit the
> offsets associated w/ the batch.  The coordination issues mostly
> disappear (and you also might get higher throughput, since workers can
> keep going during batch processing).
>
> Would such a change be reasonable?  I could probably submit a patch for it.
>
> On Wed, Nov 20, 2013 at 9:05 AM, Imran Rashid <im...@therashids.com> wrote:
>> perfect, thank you!
>>
>> On Wed, Nov 20, 2013 at 8:44 AM, Neha Narkhede <neha.narkh...@gmail.com> 
>> wrote:
>>> You can turn off automatic offset commit (auto.commit.enable=false) and use
>>> the commitOffsets() API. Note that this API will commit offsets for all
>>> partitions owned by the consumer.
>>>
>>> Thanks,
>>> Neha
>>> On Nov 20, 2013 6:39 AM, "Imran Rashid" <im...@therashids.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have an application which reads messages from a kafka queue, builds
>>>> up a batch of messages, and then performs some action on that batch.
>>>> So far I have just used the ConsumerGroup api.  However, I realized
>>>> there is a potential problem -- my app may die sometime in the middle
>>>> of the batch, and then those messages could get completely lost.
>>>> Kafka will think we've already read those messages, and so on restart
>>>> it will skip to messages post batch.
>>>>
>>>> 1) Does the ConsumerGroup api have some way to allow you to control
>>>> when the the position of the consumer-group is updated?  I couldn't
>>>> find anything relevant in ConsumerGroup, KafkaStream, or
>>>> ConsumerIterator, but perhaps I overlooked something.
>>>>
>>>> 2) Do I need to switch to SimpleConsumer?  It looks like that does not
>>>> use zookeeper at all, I'd rather not have to manage all of the
>>>> zookeeper stuff myself.
>>>>
>>>> This seems like a basic use case, so I'm probably missing something ...
>>>>
>>>> thanks,
>>>> Imran
>>>>
>>>>
>>>> (I'm using 0.8.0-beta1)
>>>>

Reply via email to