I agree with Jan. KafkaConsumer should take a copy of the offsetMap, now that there are multiple threads accessing the map. commitAsync already does take a copy and when there was only one thread in the consumer, it was reasonable to avoid cloning for commitSync. But now it makes sense to clone for commitSync too.
On Thu, Feb 2, 2017 at 12:37 PM, Jan Lukavský <je...@seznam.cz> wrote: > I'd disagree that I can fix the issue as you suggest, because: > > - if I remove the `Collections.synchronizedMap` from the `commitMap` I > get unsynchronized map and therefore the asynchronous writes to this map > would result in undefined state > > - if I remove the manual synchronization then there is a race condition > between the call to `commitSync` and `clear` of the `commitMap` - some > other thread could write to the `commitMap` between calls to `commitSync` > and `clear` and therefore the update to the map would be lost - this is the > same reason why I cannot use ConcurrentHashMap, because there would be no > synchronization between commiting the map and clearing it > > It seems to me quite natural to clone the map in call to synchronous > commit, if it cannot be guaranteed that synchronous responses are handled > by the same thread that issued the request (which in my point of view would > be the best choice, but I still don't enough understand the details of > kafka network stack). > > Jan > > > > On 02/02/2017 01:25 PM, Ismael Juma wrote: > >> OK, you can fix this by removing `Collections.synchronizedMap` from the >> following line or by removing the synchronized blocks. >> >> Map<TopicPartition, OffsetAndMetadata> commitMap = >> Collections.synchronizedMap(...); >> >> There is no reason to do manual and automatic synchronization at the same >> time in this case. Because `Collections.synchonizedMap` uses the returned >> map for synchronization, it means that even calling `get` on it will block >> in this case. The consumer could copy the map to avoid this scenario as >> the >> heartbeat thread is meant to be an implementation detail. Jason, what do >> you think? >> >> Let me know if this fixes your issue. >> >> Ismael >> >> On Thu, Feb 2, 2017 at 12:17 PM, Jan Lukavský <je...@seznam.cz> wrote: >> >> Hi Ismael, >>> >>> yes, no problem: >>> >>> The following thread is the main thread interacting with the >>> KafkaConsumer >>> (polling topic and committing offsets): >>> >>> "pool-3-thread-1" #14 prio=5 os_prio=0 tid=0x00007f00f4434800 nid=0x32a9 >>> runnable [0x00007f00b6662000] >>> java.lang.Thread.State: RUNNABLE >>> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) >>> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269 >>> ) >>> at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java >>> :93) >>> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java: >>> 86) >>> - locked <0x00000005c0abb218> (a sun.nio.ch.Util$3) >>> - locked <0x00000005c0abb208> (a java.util.Collections$Unmodifi >>> ableSet) >>> - locked <0x00000005c0abaa48> (a sun.nio.ch.EPollSelectorImpl) >>> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) >>> at org.apache.kafka.common.network.Selector.select(Selector.jav >>> a:470) >>> at org.apache.kafka.common.network.Selector.poll(Selector.java: >>> 286) >>> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.ja >>> va:260) >>> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC >>> lient.poll(ConsumerNetworkClient.java:232) >>> - locked <0x00000005c0acf630> (a org.apache.kafka.clients.consu >>> mer.internals.ConsumerNetworkClient) >>> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC >>> lient.poll(ConsumerNetworkClient.java:180) >>> at org.apache.kafka.clients.consumer.internals.ConsumerCoordina >>> tor.commitOffsetsSync(ConsumerCoordinator.java:499) >>> at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(K >>> afkaConsumer.java:1104) >>> at cz.o2.<package hidden>.KafkaCommitLog.lambda$ >>> observePartitions$7(KafkaCommitLog.java:204) >>> - locked <0x00000005c0612c88> (a java.util.Collections$Synchron >>> izedMap) >>> at cz.o2.<package hidden>.KafkaCommitLog$$Lambda >>> $62/1960388071.run(Unknown >>> Source) <- here is the synchronized block that takes monitor of the >>> `commitMap` >>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool >>> Executor.java:1142) >>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo >>> lExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> This thread just spins around in epoll returning 0. The other thread is >>> the coordinator >>> >>> "kafka-coordinator-heartbeat-thread | consumer" #15 daemon prio=5 >>> os_prio=0 tid=0x00007f0084067000 nid=0x32aa waiting for monitor entry >>> [0x00007f00b6361000] >>> java.lang.Thread.State: BLOCKED (on object monitor) >>> at java.util.Collections$SynchronizedMap.get(Collections.java:2 >>> 584) >>> - waiting to lock <0x00000005c0612c88> (a >>> java.util.Collections$SynchronizedMap) <- waiting for the `commitMap`, >>> which will never happen >>> at org.apache.kafka.clients.consumer.internals.ConsumerCoordina >>> tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:635) <- >>> handles response to the commitSync request >>> at org.apache.kafka.clients.consumer.internals.ConsumerCoordina >>> tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:615) >>> at org.apache.kafka.clients.consumer.internals.AbstractCoordina >>> tor$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742) >>> at org.apache.kafka.clients.consumer.internals.AbstractCoordina >>> tor$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722) >>> at org.apache.kafka.clients.consumer.internals.RequestFuture$1. >>> onSuccess(RequestFuture.java:186) >>> at org.apache.kafka.clients.consumer.internals.RequestFuture.fi >>> reSuccess(RequestFuture.java:149) >>> at org.apache.kafka.clients.consumer.internals.RequestFuture.co >>> mplete(RequestFuture.java:116) >>> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC >>> lient$RequestFutureCompletionHandler.fireCompletion(Consumer >>> NetworkClient.java:479) >>> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC >>> lient.firePendingCompletedRequests(ConsumerNetworkClient.java:316) >>> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC >>> lient.poll(ConsumerNetworkClient.java:219) >>> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC >>> lient.pollNoWakeup(ConsumerNetworkClient.java:266) >>> at org.apache.kafka.clients.consumer.internals.AbstractCoordina >>> tor$HeartbeatThread.run(AbstractCoordinator.java:865) >>> - locked <0x00000005c0acefc8> (a org.apache.kafka.clients.consu >>> mer.internals.ConsumerCoordinator) >>> >>> Hope this helps, if you needed any more debug info, I'm here to help. :) >>> Cheers, >>> Jan >>> >>> >>> On 02/02/2017 12:48 PM, Ismael Juma wrote: >>> >>> Hi Jan, >>>> >>>> Do you have stacktraces showing the issue? That would help. Also, if you >>>> can test 0.10.1.1, which is the latest stable release, that would be >>>> even >>>> better. From looking at the code briefly, I don't see where the consumer >>>> is >>>> locking on the received offsets map, so not sure what would cause it to >>>> block in the way you describe. Hopefully a stacktrace when the consumer >>>> is >>>> blocked would clarify. You can get a stacktrace via the jstack tool. >>>> >>>> Ismael >>>> >>>> On Thu, Feb 2, 2017 at 10:45 AM, je.ik <je...@seznam.cz> wrote: >>>> >>>> Hi all, >>>> >>>>> I have a question about a very suspicious behavior I see during >>>>> consuming >>>>> messages using manual synchronous commit with Kafka 0.10.1.0. The code >>>>> looks something like this: >>>>> >>>>> try (KafkaConsumer<...> consumer = ...) { >>>>> Map<TopicPartition, OffsetAndMetadata> commitMap = >>>>> Collections.synchronizedMap(...); >>>>> while (!Thread.currentThread().isInterrupted()) { >>>>> ConsumerRecords records = consumer.poll(..); >>>>> for (...) { >>>>> // queue records for asynchronous processing in different >>>>> thread. >>>>> // when the asynchronous processing finishes, it updates the >>>>> // `commitMap', so it has to be synchronized somehow >>>>> } >>>>> synchronized (commitMap) { >>>>> // commit if we have anything to commit >>>>> if (!commitMap.isEmpty()) { >>>>> consumer.commitSync(commitMap); >>>>> commitMap.clear(); >>>>> } >>>>> } >>>>> } >>>>> } >>>>> >>>>> >>>>> Now, what time to time happens in my case is that the consumer thread >>>>> is >>>>> stuck in the call to `commitSync`. By straing the PID I found out that >>>>> it >>>>> periodically epolls on an *empty* list of file descriptors. By further >>>>> investigation I found out, that response to the `commitSync` is being >>>>> handled by the kafka-coordinator-heartbeat-thread, which during >>>>> handling >>>>> of the response needs to access the `commitMap`, and therefore blocks, >>>>> because the lock is being held by the application main thread. >>>>> Therefore, >>>>> the whole consumption stops and ends in live-lock. The solution in my >>>>> case >>>>> was to clone the map and unsynchronize the call to `commitSync` like >>>>> this: >>>>> >>>>> final Map<TopicPartition, OffsetAndMetadata> clone; >>>>> synchronized (commitMap) { >>>>> if (!commitMap.isEmpty()) { >>>>> clone = new HashMap<>(commitMap); >>>>> commitMap.clear(); >>>>> } else { >>>>> clone = null; >>>>> } >>>>> } >>>>> if (clone != null) { >>>>> consumer.commitSync(clone); >>>>> } >>>>> >>>>> which seems to work fine. My question is whether my interpretation of >>>>> the >>>>> problem is correct and if so, should be anything done to avoid this? I >>>>> see >>>>> two possibilities - either the call to `commitSync` should clone the >>>>> map >>>>> itself, or there should be somehow guaranteed that the same thread that >>>>> issues synchronous requests receives the response. Am I right? >>>>> >>>>> Thanks for comments, >>>>> best, >>>>> Jan >>>>> >>>>> >>>>> >