Hi Jan, Do you have stacktraces showing the issue? That would help. Also, if you can test 0.10.1.1, which is the latest stable release, that would be even better. From looking at the code briefly, I don't see where the consumer is locking on the received offsets map, so not sure what would cause it to block in the way you describe. Hopefully a stacktrace when the consumer is blocked would clarify. You can get a stacktrace via the jstack tool.
Ismael On Thu, Feb 2, 2017 at 10:45 AM, je.ik <je...@seznam.cz> wrote: > Hi all, > I have a question about a very suspicious behavior I see during consuming > messages using manual synchronous commit with Kafka 0.10.1.0. The code > looks something like this: > > try (KafkaConsumer<...> consumer = ...) { > Map<TopicPartition, OffsetAndMetadata> commitMap = > Collections.synchronizedMap(...); > while (!Thread.currentThread().isInterrupted()) { > ConsumerRecords records = consumer.poll(..); > for (...) { > // queue records for asynchronous processing in different thread. > // when the asynchronous processing finishes, it updates the > // `commitMap', so it has to be synchronized somehow > } > synchronized (commitMap) { > // commit if we have anything to commit > if (!commitMap.isEmpty()) { > consumer.commitSync(commitMap); > commitMap.clear(); > } > } > } > } > > > Now, what time to time happens in my case is that the consumer thread is > stuck in the call to `commitSync`. By straing the PID I found out that it > periodically epolls on an *empty* list of file descriptors. By further > investigation I found out, that response to the `commitSync` is being > handled by the kafka-coordinator-heartbeat-thread, which during handling > of the response needs to access the `commitMap`, and therefore blocks, > because the lock is being held by the application main thread. Therefore, > the whole consumption stops and ends in live-lock. The solution in my case > was to clone the map and unsynchronize the call to `commitSync` like this: > > final Map<TopicPartition, OffsetAndMetadata> clone; > synchronized (commitMap) { > if (!commitMap.isEmpty()) { > clone = new HashMap<>(commitMap); > commitMap.clear(); > } else { > clone = null; > } > } > if (clone != null) { > consumer.commitSync(clone); > } > > which seems to work fine. My question is whether my interpretation of the > problem is correct and if so, should be anything done to avoid this? I see > two possibilities - either the call to `commitSync` should clone the map > itself, or there should be somehow guaranteed that the same thread that > issues synchronous requests receives the response. Am I right? > > Thanks for comments, > best, > Jan >