While performing some prototyping on 0.10.0.0 using the new client API I
noticed that some some clients fail to drain their topic partitions.

The Kafka cluster is comprised of 3 nodes.  The topic in question has been
preloaded with messages.  The topic has 50 partitions.  The messages were
loaded without a key, so they should be spread in a round robin fashion.
The kafka-consumer-groups command shows that each partition has a
log-end-offset of 137, except for one partition at 136.

The worker is a simple single threaded client.  As mentioned, it uses the
new consumer API.  The consumer is configured to fetch a single record at a
time by setting the max.poll.record config property to 1.  The worker
handles commits and sets enable.auto.commit to false.

The worker can take substantial time processing the messages.  To avoid
timing out the Kafka connection, the worker calls consumer.pause() with the
results of consumer.assignment() when it starts processing the message,
calls consumer.poll(0) at regular intervals while processing the message to
trigger heartbeats to Kafka, and calls consumer.resume() with the result of
a call to consumer.assignment() when it is done processing the message and
it has committed the offset for the message using consumer.commitSync().

Note that when calling consumer.resume() I pass in the results of a fresh
call to consumer.assignment().  Passing in the results of the results to
the previous call to consumer.assignment(), the ones used when calling
consumer.pause(), would result in an exception if partitions were
reassigned while the worker was processing the message, as it may happen
when workers join the consumer group.  I presume this mean it call to
assignment() generates a call to the consumer coordinator in the cluster to
obtain the latest assignments rather than returning a locally cached copy
of assignments.

The test used four worker nodes running four workers each, for sixteen
total workers.

kafka-consumer-groups.sh shows that all partitions have been assigned to a
worker, and that the workers successfully processed most partitions, 29 out
of 50, to completion (lag is 0).

5 partition appear to not have been processed at all, with unknown shown
for current-offset and lag, and 16 partitions have processed some messages
but not all.  In either case, the workers believe there are no more
messages to fetch.  When they call poll with a timeout, it eventually
returns with no messages.  The workers show no errors and continue to run.

That indicates to me that the workers and cluster disagree on partition
assignment.  Thus, the consumer is not asking for messages on partitions
the broker has assigned to it, and messages on those partitions are not
processed.

My guess is that partition assignments are being changed after by call to
consumer.assignment() and consumer.resume().

Presumably I can solve this issue by implementing a
ConsumerRebalanceListener and updating the assigning I call resume() with
whenever onPartitionsRevoked and onPartitionsAssigned are called.

Ideally, the Consumer interface would allow you to call pause() and
resume() without a list of topic partitions, which would pause and resume
fetching from all assigned partitions, which the client already is keeping
track off.

Thoughts?  Suggestions?

Reply via email to