Hi Michael,

Your issue seems like a more general one with the new Kafka Consumer
regarding unexpected rebalances: as for Kafka Streams, it's committing
behavior is synchronous, i.e. triggering "consumer.commitSync" of the
underlying new Kafka Consumer, which will fail if there is an ongoing
rebalance, since the partitions being committed on may not be owned by the
consumer anymore.

As for rebalance, there are several cases that can cause it:

1) topic changes, like creation of new topics, partition addition, topic
deletes, etc.

If you are not changing topics at the moment then you can exclude this case.

2) consumer failures detected by the heart beat protocol, and hence
migrating partitions out of the failed consumer.

Note that the heartbeat is wrapped in the poll() protocol, so if your
consumer thread (and similarly Kafka Streams) takes long time to process
polled records while your configured session.timeout.ms value is not large
enough.

So you can consider 1) increase session.timeout.ms value, 2) set
max.poll.records to a reasonably small values to avoid your consumers being
falsely considered as failed.

More info about the consumer configs:

http://kafka.apache.org/documentation.html#newconsumerconfigs


Guozhang

On Wed, Apr 6, 2016 at 6:26 AM, Michael D. Coon <mdco...@yahoo.com.invalid>
wrote:

> All,
>    I'm getting CommitFailedExceptions on a small prototype I built using
> kafkaStreams. I'm not using the DSL, but the TopologyBuilder with several
> processors chained together with a sink in between a few of them. When I
> try committing through the ProcessorContext, I see exceptions being thrown
> about commit failures due to group rebalance (not to mention delays in
> processing during commit attempts). I'm using a single host, with 2 stream
> threads and a 5-node Kafka cluster. I wouldn't think rebalancing would be
> occurring after data starts flowing and I'm committing offsets. This was
> something I saw with the new Kafka client APIs as well, and I had to work
> around by creating static partition assignments to my consumers in my data
> flow...otherwise, any dynamic allocation of new consumers to the group
> caused this exception to be thrown and I could never commit my offsets. Are
> you all not seeing this in your tests? Am I not supposed to commit through
> the ProcessorContext? I'm committing once my interim processor writes its
> output to the sink point in the flow; is that not the correct/expected
> behavior/use?
> Mike
>
>


-- 
-- Guozhang

Reply via email to