Guozhang,
   Thanks for the advice; however, "max.poll.records" doesn't seem to be 
supported since it's not affecting how many records are coming back from the 
consumer.poll requests. However, I agree that the likely culprit in rebalancing 
is the delay in processing new records. I'm going to try and play with the max 
buffer size per partition setting to see if I can force the consumer to pause, 
and thus not inject too many records too quickly. It would be awesome if the 
max.poll.records setting was respected by the consumer/broker and it returned a 
max number of messages. I feel like this used to be supported in the older 
Kafka APIs. This setting would allow more tuning of how much data each of my 
stream job instances receives.

Mike
 

    On Wednesday, April 6, 2016 5:55 PM, Guozhang Wang <wangg...@gmail.com> 
wrote:
 

 Hi Michael,
Your issue seems like a more general one with the new Kafka Consumer regarding 
unexpected rebalances: as for Kafka Streams, it's committing behavior is 
synchronous, i.e. triggering "consumer.commitSync" of the underlying new Kafka 
Consumer, which will fail if there is an ongoing rebalance, since the 
partitions being committed on may not be owned by the consumer anymore.
As for rebalance, there are several cases that can cause it:
1) topic changes, like creation of new topics, partition addition, topic 
deletes, etc. 
If you are not changing topics at the moment then you can exclude this case.
2) consumer failures detected by the heart beat protocol, and hence migrating 
partitions out of the failed consumer.
Note that the heartbeat is wrapped in the poll() protocol, so if your consumer 
thread (and similarly Kafka Streams) takes long time to process polled records 
while your configured session.timeout.ms value is not large enough.
So you can consider 1) increase session.timeout.ms value, 2) set 
max.poll.records to a reasonably small values to avoid your consumers being 
falsely considered as failed.
More info about the consumer configs:
http://kafka.apache.org/documentation.html#newconsumerconfigs


Guozhang

On Wed, Apr 6, 2016 at 6:26 AM, Michael D. Coon <mdco...@yahoo.com.invalid> 
wrote:

All,
   I'm getting CommitFailedExceptions on a small prototype I built using 
kafkaStreams. I'm not using the DSL, but the TopologyBuilder with several 
processors chained together with a sink in between a few of them. When I try 
committing through the ProcessorContext, I see exceptions being thrown about 
commit failures due to group rebalance (not to mention delays in processing 
during commit attempts). I'm using a single host, with 2 stream threads and a 
5-node Kafka cluster. I wouldn't think rebalancing would be occurring after 
data starts flowing and I'm committing offsets. This was something I saw with 
the new Kafka client APIs as well, and I had to work around by creating static 
partition assignments to my consumers in my data flow...otherwise, any dynamic 
allocation of new consumers to the group caused this exception to be thrown and 
I could never commit my offsets. Are you all not seeing this in your tests? Am 
I not supposed to commit through the ProcessorContext? I'm committing once my 
interim processor writes its output to the sink point in the flow; is that not 
the correct/expected behavior/use?
Mike





-- 
-- Guozhang


  

Reply via email to