Hi Michael, Your issue seems like a more general one with the new Kafka Consumer regarding unexpected rebalances: as for Kafka Streams, it's committing behavior is synchronous, i.e. triggering "consumer.commitSync" of the underlying new Kafka Consumer, which will fail if there is an ongoing rebalance, since the partitions being committed on may not be owned by the consumer anymore.
As for rebalance, there are several cases that can cause it: 1) topic changes, like creation of new topics, partition addition, topic deletes, etc. If you are not changing topics at the moment then you can exclude this case. 2) consumer failures detected by the heart beat protocol, and hence migrating partitions out of the failed consumer. Note that the heartbeat is wrapped in the poll() protocol, so if your consumer thread (and similarly Kafka Streams) takes long time to process polled records while your configured session.timeout.ms value is not large enough. So you can consider 1) increase session.timeout.ms value, 2) set max.poll.records to a reasonably small values to avoid your consumers being falsely considered as failed. More info about the consumer configs: http://kafka.apache.org/documentation.html#newconsumerconfigs Guozhang On Wed, Apr 6, 2016 at 6:26 AM, Michael D. Coon <mdco...@yahoo.com.invalid> wrote: > All, > I'm getting CommitFailedExceptions on a small prototype I built using > kafkaStreams. I'm not using the DSL, but the TopologyBuilder with several > processors chained together with a sink in between a few of them. When I > try committing through the ProcessorContext, I see exceptions being thrown > about commit failures due to group rebalance (not to mention delays in > processing during commit attempts). I'm using a single host, with 2 stream > threads and a 5-node Kafka cluster. I wouldn't think rebalancing would be > occurring after data starts flowing and I'm committing offsets. This was > something I saw with the new Kafka client APIs as well, and I had to work > around by creating static partition assignments to my consumers in my data > flow...otherwise, any dynamic allocation of new consumers to the group > caused this exception to be thrown and I could never commit my offsets. Are > you all not seeing this in your tests? Am I not supposed to commit through > the ProcessorContext? I'm committing once my interim processor writes its > output to the sink point in the flow; is that not the correct/expected > behavior/use? > Mike > > -- -- Guozhang