My read of the documentation is that no records should be returned when the
partition is paused. I have this consumer loop which is meant to keep the
heartbeat going while the processing is busy:
while (!closed.get) {
val records = client.poll(timeout)
if (records.count() > 0 && !work.offer(records, 100, TimeUnit.MILLISECONDS)) {
client.pause(Seq(partition).asJava)
do {
val empty = client.poll(timeout)
assert(empty == null || empty.isEmpty)
doCommits()
} while (!work.offer(records, 100, TimeUnit.MILLISECONDS))
client.resume(Seq(partition).asJava)
}
doCommits()
}
When the main processing is backed up, work.offer returns false and I
enter the inner loop, pausing the partition. However, the assertion
in the inner loop fails b/c poll is still returning records.
How can I keep the heartbeat going without consuming any data?