Hi all,
I have a question about a very suspicious behavior I see during consuming messages using manual synchronous commit with Kafka 0.10.1.0. The code looks something like this:

try (KafkaConsumer<...> consumer = ...) {
Map<TopicPartition, OffsetAndMetadata> commitMap = Collections.synchronizedMap(...);
  while (!Thread.currentThread().isInterrupted()) {
    ConsumerRecords records = consumer.poll(..);
    for (...) {
      // queue records for asynchronous processing in different thread.
      // when the asynchronous processing finishes, it updates the
      // `commitMap', so it has to be synchronized somehow
    }
    synchronized (commitMap) {
      // commit if we have anything to commit
      if (!commitMap.isEmpty()) {
        consumer.commitSync(commitMap);
        commitMap.clear();
      }
    }
  }
}


Now, what time to time happens in my case is that the consumer thread is stuck in the call to `commitSync`. By straing the PID I found out that it periodically epolls on an *empty* list of file descriptors. By further investigation I found out, that response to the `commitSync` is being handled by the kafka-coordinator-heartbeat-thread, which during handling of the response needs to access the `commitMap`, and therefore blocks, because the lock is being held by the application main thread. Therefore, the whole consumption stops and ends in live-lock. The solution in my case was to clone the map and unsynchronize the call to `commitSync` like this:

  final Map<TopicPartition, OffsetAndMetadata> clone;
  synchronized (commitMap) {
    if (!commitMap.isEmpty()) {
      clone = new HashMap<>(commitMap);
      commitMap.clear();
    } else {
      clone = null;
    }
  }
  if (clone != null) {
    consumer.commitSync(clone);
  }

which seems to work fine. My question is whether my interpretation of the problem is correct and if so, should be anything done to avoid this? I see two possibilities - either the call to `commitSync` should clone the map itself, or there should be somehow guaranteed that the same thread that issues synchronous requests receives the response. Am I right?

Thanks for comments,
 best,
  Jan

Reply via email to