While performing some prototyping on 0.10.0.0 using the new client API I noticed that some some clients fail to drain their topic partitions.
The Kafka cluster is comprised of 3 nodes. The topic in question has been preloaded with messages. The topic has 50 partitions. The messages were loaded without a key, so they should be spread in a round robin fashion. The kafka-consumer-groups command shows that each partition has a log-end-offset of 137, except for one partition at 136. The worker is a simple single threaded client. As mentioned, it uses the new consumer API. The consumer is configured to fetch a single record at a time by setting the max.poll.record config property to 1. The worker handles commits and sets enable.auto.commit to false. The worker can take substantial time processing the messages. To avoid timing out the Kafka connection, the worker calls consumer.pause() with the results of consumer.assignment() when it starts processing the message, calls consumer.poll(0) at regular intervals while processing the message to trigger heartbeats to Kafka, and calls consumer.resume() with the result of a call to consumer.assignment() when it is done processing the message and it has committed the offset for the message using consumer.commitSync(). Note that when calling consumer.resume() I pass in the results of a fresh call to consumer.assignment(). Passing in the results of the results to the previous call to consumer.assignment(), the ones used when calling consumer.pause(), would result in an exception if partitions were reassigned while the worker was processing the message, as it may happen when workers join the consumer group. I presume this mean it call to assignment() generates a call to the consumer coordinator in the cluster to obtain the latest assignments rather than returning a locally cached copy of assignments. The test used four worker nodes running four workers each, for sixteen total workers. kafka-consumer-groups.sh shows that all partitions have been assigned to a worker, and that the workers successfully processed most partitions, 29 out of 50, to completion (lag is 0). 5 partition appear to not have been processed at all, with unknown shown for current-offset and lag, and 16 partitions have processed some messages but not all. In either case, the workers believe there are no more messages to fetch. When they call poll with a timeout, it eventually returns with no messages. The workers show no errors and continue to run. That indicates to me that the workers and cluster disagree on partition assignment. Thus, the consumer is not asking for messages on partitions the broker has assigned to it, and messages on those partitions are not processed. My guess is that partition assignments are being changed after by call to consumer.assignment() and consumer.resume(). Presumably I can solve this issue by implementing a ConsumerRebalanceListener and updating the assigning I call resume() with whenever onPartitionsRevoked and onPartitionsAssigned are called. Ideally, the Consumer interface would allow you to call pause() and resume() without a list of topic partitions, which would pause and resume fetching from all assigned partitions, which the client already is keeping track off. Thoughts? Suggestions?