I think the current behavior is fairly reasonable. Following a rebalance the entire state of the consumer changes - you may get an entirely new set of partitions. A common use-case for pause is to allow a consumer to keep polling and avoid getting new events while it is retrying to process existing events - well, following a rebalance, it is possible that another consumer owns the partition, is already re-processing these events and the entire state needs to be reset.
I usually recommend developers to treat rebalance as a restart (since you are getting a whole new set of partitions) and just follow whatever process you'd follow to set up after a restart. Since pauses don't survive restarts, I wouldn't expect them to survive a rebalance either. I hope this helps explain the behavior? On Mon, Nov 7, 2016 at 9:53 AM, Paul Mackles <pmack...@adobe.com> wrote: > Using the v0.9.0.1 consumer API, I recently learned that paused partitions > can unexpectedly become become unpaused during a rebalance. I also found an > old thread from the mailing list which corroborates this behavior: > > > http://grokbase.com/t/kafka/users/161wgzckze/new-consumer-pause-reset-behaviour > > > <http://grokbase.com/t/kafka/users/161wgzckze/new-consumer-pause-reset-behaviour>While > I can maintain the partition state myself, it seems like it would be a lot > easier if this were either handled internally by the consumer API (i.e. pause > the partitions that were previously paused before resuming) and/or make the > partition state available to the RebalanceListener. > > > I did not find any existing tickets in JIRA related to this so I am wondering > if this is a valid bug/enhancement or if someone found a decent workaround. > All of the consumer API examples that I have found do not appear to handle > this scenario. > > > Here is the code snippet from he client I have been working on: > > > consumer.pause(consumer.assignment().toArray(EMPTYTPARRAY)); > > while (!isWritable()) { > // WARNING: if there is a rebalance, this call may return some records!!! > consumer.poll(0); > Uninterruptibles.sleepUninterruptibly(pauseWait, TimeUnit.MILLISECONDS); > } > > consumer.resume(consumer.assignment().toArray(EMPTYTPARRAY)); > > > Thanks, > > Paul > > > -- Gwen Shapira Product Manager | Confluent 650.450.2760 | @gwenshap Follow us: Twitter | blog