I think the current behavior is fairly reasonable. Following a
rebalance the entire state of the consumer changes - you may get an
entirely new set of partitions. A common use-case for pause is to
allow a consumer to keep polling and avoid getting new events while it
is retrying to process existing events - well, following a rebalance,
it is possible that another consumer owns the partition, is already
re-processing these events and the entire state needs to be reset.
I usually recommend developers to treat rebalance as a restart (since
you are getting a whole new set of partitions) and just follow
whatever process you'd follow to set up after a restart. Since pauses
don't survive restarts, I wouldn't expect them to survive a rebalance
either.

I hope this helps explain the behavior?

On Mon, Nov 7, 2016 at 9:53 AM, Paul Mackles <pmack...@adobe.com> wrote:
> Using the  v0.9.0.1 consumer API, I recently learned that paused partitions 
> can unexpectedly become become unpaused during a rebalance. I also found an 
> old thread from the mailing list which corroborates this behavior:
>
>
> http://grokbase.com/t/kafka/users/161wgzckze/new-consumer-pause-reset-behaviour
>
>
> <http://grokbase.com/t/kafka/users/161wgzckze/new-consumer-pause-reset-behaviour>While
>  I can maintain the partition state myself, it seems like it would be a lot 
> easier if this were either handled internally by the consumer API (i.e. pause 
> the partitions that were previously paused before resuming) and/or make the 
> partition state available to the RebalanceListener.
>
>
> I did not find any existing tickets in JIRA related to this so I am wondering 
> if this is a valid bug/enhancement or if someone found a decent workaround. 
> All of the consumer API examples that I have found do not appear to handle 
> this scenario.
>
>
> Here is the code snippet from he client I have been working on:
>
>
> consumer.pause(consumer.assignment().toArray(EMPTYTPARRAY));
>
> while (!isWritable()) {
>   // WARNING: if there is a rebalance, this call may return some records!!!
>   consumer.poll(0);
>   Uninterruptibles.sleepUninterruptibly(pauseWait, TimeUnit.MILLISECONDS);
> }
>
> consumer.resume(consumer.assignment().toArray(EMPTYTPARRAY));
>
>
> Thanks,
>
> Paul
>
>
>



-- 
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog

Reply via email to