Shikhar, Thanks for pointing me to KIP-62. Once implemented, it will make workers that take a long time processing messages a lot simpler to implement. Until then, we have to continue using the pause/poll/resume pattern. That said, as fares I can tell, this pattern has not been well documented.
It appears the issue I observed is the result of consumer rebalancing. When a consumer with paused partitions calls poll to trigger a heartbeat, the client will process any pending consumer rebalances. The rebalance will potentially result in the addition of newly assigned unpaused partitions. Worse is the fact that already assigned partitions that were paused and that continue to be assigned to the client after the rebalance will be become unpaused. I consider this a bug in the client. Paused partitions should not be unpaused during a rebalance if they continue to be assigned to the client. So pause/poll/resume is not sufficient for a worker that handles messages with long processing times. One must also implement a ConsumerRebalanceListener that pauses all assigned partitions if the consumer is in the middle of processing a message. On Fri, Jul 1, 2016 at 11:52 AM, Shikhar Bhushan <shik...@confluent.io> wrote: > Hi Elias, > > KIP-62 > < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread > > > has a discussion of current options, and the improvements that are coming. > > Best, > > Shikhar > > On Thu, Jun 30, 2016 at 6:02 PM Elias Levy <fearsome.lucid...@gmail.com> > wrote: > > > What is the officially recommended method to heartbeat using the new Java > > consumer during long message processing times? > > > > I thought I could accomplish this by setting max.poll.records to 1 in the > > client, calling consumer.pause(consumer.assignment()) when starting to > > process a record, calling consumer.resume(consumer.paused()) when done > > processing a record and committing its offset, and calling > consumer.poll(0) > > intermittently while processing the record. > > > > The testing shows that consumer.poll(0) will return records, rather than > > returning nil or an empty ConsumerRecords. > > >