Hi all,

My problem: If the consumer fetches too much data and the processing of the
records is not fast enough, commit() fails because there was a rebalance.

I cannot reduce 'max.partition.fetch.bytes' because there might be large
messages.

I don't want to increase the 'session.timeout.ms', because it would be too
large to detect failures.

I understand that the new consumer API only sends the heartbeats and
manages rebalances during the call to poll(). But if I call poll(0), there
is still a chance it will return even more data. So I keep the heart beats,
but I may accumulate too much data, eventually leading to OOM.

I would like something:
foreach record in consumer.poll() {
  process(record)
  consumer.doHeartBeatsAndRebalanceSoKeepMeStillAlive()
}

Is this possible?

Reply via email to