My read of the documentation is that no records should be returned when the
partition is paused.  I have this consumer loop which is meant to keep the
heartbeat going while the processing is busy:

while (!closed.get) {
  val records = client.poll(timeout)
  if (records.count() > 0 && !work.offer(records, 100, TimeUnit.MILLISECONDS)) {
    client.pause(Seq(partition).asJava)
    do {
      val empty = client.poll(timeout)
      assert(empty == null || empty.isEmpty)
      doCommits()
    } while (!work.offer(records, 100, TimeUnit.MILLISECONDS))
    client.resume(Seq(partition).asJava)
  }
  doCommits()
}

When the main processing is backed up, work.offer returns false and I
enter the inner loop, pausing the partition.  However, the assertion
in the inner loop fails b/c poll is still returning records.

How can I keep the heartbeat going without consuming any data?

Reply via email to