[ https://issues.apache.org/jira/browse/KAFKA-702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jay Kreps updated KAFKA-702: ---------------------------- Attachment: KAFKA-702-v1.patch TLDR: attached one-line patch should fix the deadlock, but there is a larger issue (though not a blocker). I don't like the idea of splitting response and request between different threads. The reason our socket server is simple is because each network thread is totally independent of all others, so there are no threading issues. Mixing these is slow (because of all the locking) and error prone. I have seen this done before and it is a big mess. Jun's second idea is better, but its not as simple as described. We have to put the newly attached request somewhere and trigger a second attempt on adding it to the queue. Registering again for reading doesn't really work because there won't be more data to read. Registering for writing doesn't work because sockets are always writable so we would end up busy waiting. So to make this work we would need some kind of list where we stored requests that had been read but didn't fit in the queue. But we need something that will check this list periodically and it is hard to guarantee that that would happen with any more frequency that the poll timeout. But I think we are muddling things a bit. Let's step back and think about this from first principles. Why do queues have limits? The reason is to bound memory usage. So taking data off the socket and putting it in a list is silly, that defeats the original purpose of having the bound (the queue after all is just a list). But think about this more. Why are we blocking adding responses to the response queue? The reason would be to bound memory usage. But the response queue doesn't actually bound memory usage. Things going into the response queue come either directly from processors or from purgatory, and in either case they are taking up memory there. Preventing responses from going out isn't helping anything. So the short term fix is just to remove the bound on the response queue. The larger problem is that regardless of this change in 0.8 *we aren't effectively bounding memory usage*. The reason is the purgatory. The purgatory will accumulate requests any time expiration gets slow. This could be due to a misconfigured client or due to a slow broker. So the error is that we are using queue size to indicate "backlog" but really the proper measure of backlog is the total number of requests in flight including all requests in queues OR in purgatory. But even once we understand the correct limit, it isn't clear what to do once we hit that limit. There are two choices: (1) stop taking new requests, (2) prematurely start responding to requests in the purgatory. Neither of these is great. Consider the case where one broker gets slow and umpteen produce requests pile up in purgatory. If we stop taking new requests that is like a GC pause, but since the timeout could be 30 seconds away it will be a long one. If we start dumping the purgatory prematurely we will have to respond with an error because we lack sufficient acknowledgements. > Deadlock between request handler/processor threads > -------------------------------------------------- > > Key: KAFKA-702 > URL: https://issues.apache.org/jira/browse/KAFKA-702 > Project: Kafka > Issue Type: Bug > Components: network > Affects Versions: 0.8 > Reporter: Joel Koshy > Assignee: Jay Kreps > Priority: Blocker > Labels: bugs > Fix For: 0.8 > > Attachments: KAFKA-702-v1.patch > > > We have seen this a couple of times in the past few days in a test cluster. > The request handler and processor threads deadlock on the request/response > queues bringing the server to a halt > "kafka-processor-10251-7" prio=10 tid=0x00007f4a0c3c9800 nid=0x4c39 waiting > on condition [0x00007f46f698e000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00007f48c9dd2698> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) > at > java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252) > at kafka.network.RequestChannel.sendRequest(RequestChannel.scala:107) > at kafka.network.Processor.read(SocketServer.scala:321) > at kafka.network.Processor.run(SocketServer.scala:231) > at java.lang.Thread.run(Thread.java:619) > "kafka-request-handler-7" daemon prio=10 tid=0x00007f4a0c57f000 nid=0x4c47 > waiting on condition [0x00007f46f5b80000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00007f48c9dd6348> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) > at > java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252) > at kafka.network.RequestChannel.sendResponse(RequestChannel.scala:112) > at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:198) > at kafka.server.KafkaApis.handle(KafkaApis.scala:58) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41) > at java.lang.Thread.run(Thread.java:619) > This is because there is a cycle in the wait-for graph of processor threads > and request handler threads. If the request handling slows down on a busy > server, the request queue fills up. All processor threads quickly block on > adding incoming requests to the request queue. Due to this, those threads do > not processes responses filling up their response queues. At this moment, the > request handler threads start blocking on adding responses to the respective > response queues. This can lead to a deadlock where every thread is holding a > lock on one queue and asking a lock for the other queue. This brings the > server to a halt where it accepts connections but every request gets timed > out. > One way to resolve this is by breaking the cycle in the wait-for graph of the > request handler and processor threads. Instead of having the processor > threads dispatching the responses, we can have one or more dedicated response > handler threads that dequeue responses from the queue and write those on the > socket. One downside of this approach is that now access to the selector will > have to be synchronized. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira