My read of the documentation is that no records should be returned when the partition is paused. I have this consumer loop which is meant to keep the heartbeat going while the processing is busy:
while (!closed.get) { val records = client.poll(timeout) if (records.count() > 0 && !work.offer(records, 100, TimeUnit.MILLISECONDS)) { client.pause(Seq(partition).asJava) do { val empty = client.poll(timeout) assert(empty == null || empty.isEmpty) doCommits() } while (!work.offer(records, 100, TimeUnit.MILLISECONDS)) client.resume(Seq(partition).asJava) } doCommits() } When the main processing is backed up, work.offer returns false and I enter the inner loop, pausing the partition. However, the assertion in the inner loop fails b/c poll is still returning records. How can I keep the heartbeat going without consuming any data?