One more thing I'm noticing in the logs. I see periodic node disconnection messages due to "timeout". I set my metadata.fetch.timeout.ms to 60000, request.timeout.ms to 30000 and timeout.ms to 30000 and those should be more than enough time waiting for metadata responses. I also set my offset commit period to 60000 and these disconnected messages seem to overlap with offset commit threshold...meaning it seems to be happening when the offset commit attempts are being made. The "api_key" in the failed request is "1"...I'd have to dig into the code to know what the corresponds to.
On Thursday, April 7, 2016 7:35 AM, Michael D. Coon <mdco...@yahoo.com.INVALID> wrote: Guozhang, Thanks for the advice; however, "max.poll.records" doesn't seem to be supported since it's not affecting how many records are coming back from the consumer.poll requests. However, I agree that the likely culprit in rebalancing is the delay in processing new records. I'm going to try and play with the max buffer size per partition setting to see if I can force the consumer to pause, and thus not inject too many records too quickly. It would be awesome if the max.poll.records setting was respected by the consumer/broker and it returned a max number of messages. I feel like this used to be supported in the older Kafka APIs. This setting would allow more tuning of how much data each of my stream job instances receives. Mike On Wednesday, April 6, 2016 5:55 PM, Guozhang Wang <wangg...@gmail.com> wrote: Hi Michael, Your issue seems like a more general one with the new Kafka Consumer regarding unexpected rebalances: as for Kafka Streams, it's committing behavior is synchronous, i.e. triggering "consumer.commitSync" of the underlying new Kafka Consumer, which will fail if there is an ongoing rebalance, since the partitions being committed on may not be owned by the consumer anymore. As for rebalance, there are several cases that can cause it: 1) topic changes, like creation of new topics, partition addition, topic deletes, etc. If you are not changing topics at the moment then you can exclude this case. 2) consumer failures detected by the heart beat protocol, and hence migrating partitions out of the failed consumer. Note that the heartbeat is wrapped in the poll() protocol, so if your consumer thread (and similarly Kafka Streams) takes long time to process polled records while your configured session.timeout.ms value is not large enough. So you can consider 1) increase session.timeout.ms value, 2) set max.poll.records to a reasonably small values to avoid your consumers being falsely considered as failed. More info about the consumer configs: http://kafka.apache.org/documentation.html#newconsumerconfigs Guozhang On Wed, Apr 6, 2016 at 6:26 AM, Michael D. Coon <mdco...@yahoo.com.invalid> wrote: All, I'm getting CommitFailedExceptions on a small prototype I built using kafkaStreams. I'm not using the DSL, but the TopologyBuilder with several processors chained together with a sink in between a few of them. When I try committing through the ProcessorContext, I see exceptions being thrown about commit failures due to group rebalance (not to mention delays in processing during commit attempts). I'm using a single host, with 2 stream threads and a 5-node Kafka cluster. I wouldn't think rebalancing would be occurring after data starts flowing and I'm committing offsets. This was something I saw with the new Kafka client APIs as well, and I had to work around by creating static partition assignments to my consumers in my data flow...otherwise, any dynamic allocation of new consumers to the group caused this exception to be thrown and I could never commit my offsets. Are you all not seeing this in your tests? Am I not supposed to commit through the ProcessorContext? I'm committing once my interim processor writes its output to the sink point in the flow; is that not the correct/expected behavior/use? Mike -- -- Guozhang