Hi all,
I have a question about a very suspicious behavior I see during
consuming messages using manual synchronous commit with Kafka 0.10.1.0.
The code looks something like this:
try (KafkaConsumer<...> consumer = ...) {
Map<TopicPartition, OffsetAndMetadata> commitMap =
Collections.synchronizedMap(...);
while (!Thread.currentThread().isInterrupted()) {
ConsumerRecords records = consumer.poll(..);
for (...) {
// queue records for asynchronous processing in different thread.
// when the asynchronous processing finishes, it updates the
// `commitMap', so it has to be synchronized somehow
}
synchronized (commitMap) {
// commit if we have anything to commit
if (!commitMap.isEmpty()) {
consumer.commitSync(commitMap);
commitMap.clear();
}
}
}
}
Now, what time to time happens in my case is that the consumer thread is
stuck in the call to `commitSync`. By straing the PID I found out that
it periodically epolls on an *empty* list of file descriptors. By
further investigation I found out, that response to the `commitSync` is
being handled by the kafka-coordinator-heartbeat-thread, which during
handling of the response needs to access the `commitMap`, and therefore
blocks, because the lock is being held by the application main thread.
Therefore, the whole consumption stops and ends in live-lock. The
solution in my case was to clone the map and unsynchronize the call to
`commitSync` like this:
final Map<TopicPartition, OffsetAndMetadata> clone;
synchronized (commitMap) {
if (!commitMap.isEmpty()) {
clone = new HashMap<>(commitMap);
commitMap.clear();
} else {
clone = null;
}
}
if (clone != null) {
consumer.commitSync(clone);
}
which seems to work fine. My question is whether my interpretation of
the problem is correct and if so, should be anything done to avoid this?
I see two possibilities - either the call to `commitSync` should clone
the map itself, or there should be somehow guaranteed that the same
thread that issues synchronous requests receives the response. Am I right?
Thanks for comments,
best,
Jan