Guozhang,
Thanks for the advice; however, "max.poll.records" doesn't seem to be
supported since it's not affecting how many records are coming back from the
consumer.poll requests. However, I agree that the likely culprit in rebalancing
is the delay in processing new records. I'm going to try and play with the max
buffer size per partition setting to see if I can force the consumer to pause,
and thus not inject too many records too quickly. It would be awesome if the
max.poll.records setting was respected by the consumer/broker and it returned a
max number of messages. I feel like this used to be supported in the older
Kafka APIs. This setting would allow more tuning of how much data each of my
stream job instances receives.
Mike
On Wednesday, April 6, 2016 5:55 PM, Guozhang Wang <[email protected]>
wrote:
Hi Michael,
Your issue seems like a more general one with the new Kafka Consumer regarding
unexpected rebalances: as for Kafka Streams, it's committing behavior is
synchronous, i.e. triggering "consumer.commitSync" of the underlying new Kafka
Consumer, which will fail if there is an ongoing rebalance, since the
partitions being committed on may not be owned by the consumer anymore.
As for rebalance, there are several cases that can cause it:
1) topic changes, like creation of new topics, partition addition, topic
deletes, etc.
If you are not changing topics at the moment then you can exclude this case.
2) consumer failures detected by the heart beat protocol, and hence migrating
partitions out of the failed consumer.
Note that the heartbeat is wrapped in the poll() protocol, so if your consumer
thread (and similarly Kafka Streams) takes long time to process polled records
while your configured session.timeout.ms value is not large enough.
So you can consider 1) increase session.timeout.ms value, 2) set
max.poll.records to a reasonably small values to avoid your consumers being
falsely considered as failed.
More info about the consumer configs:
http://kafka.apache.org/documentation.html#newconsumerconfigs
Guozhang
On Wed, Apr 6, 2016 at 6:26 AM, Michael D. Coon <[email protected]>
wrote:
All,
I'm getting CommitFailedExceptions on a small prototype I built using
kafkaStreams. I'm not using the DSL, but the TopologyBuilder with several
processors chained together with a sink in between a few of them. When I try
committing through the ProcessorContext, I see exceptions being thrown about
commit failures due to group rebalance (not to mention delays in processing
during commit attempts). I'm using a single host, with 2 stream threads and a
5-node Kafka cluster. I wouldn't think rebalancing would be occurring after
data starts flowing and I'm committing offsets. This was something I saw with
the new Kafka client APIs as well, and I had to work around by creating static
partition assignments to my consumers in my data flow...otherwise, any dynamic
allocation of new consumers to the group caused this exception to be thrown and
I could never commit my offsets. Are you all not seeing this in your tests? Am
I not supposed to commit through the ProcessorContext? I'm committing once my
interim processor writes its output to the sink point in the flow; is that not
the correct/expected behavior/use?
Mike
--
-- Guozhang