[ 
https://issues.apache.org/jira/browse/KAFKA-702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-702:
----------------------------

    Attachment: KAFKA-702-v1.patch

TLDR: attached one-line patch should fix the deadlock, but there is a larger 
issue (though not a blocker).

I don't like the idea of splitting response and request between different 
threads. The reason our socket server is simple is because each network thread 
is totally independent of all others, so there are no threading issues. Mixing 
these is slow (because of all the locking) and error prone. I have seen this 
done before and it is a big mess.

Jun's second idea is better, but its not as simple as described. We have to put 
the newly attached request somewhere and trigger a second attempt on adding it 
to the queue. Registering again for reading doesn't really work because there 
won't be more data to read. Registering for writing doesn't work because 
sockets are always writable so we would end up busy waiting. So to make this 
work we would need some kind of list where we stored requests that had been 
read but didn't fit in the queue. But we need something that will check this 
list periodically and it is hard to guarantee that that would happen with any 
more frequency that the poll timeout.

But I think we are muddling things a bit. Let's step back and think about this 
from first principles.

Why do queues have limits? The reason is to bound memory usage. So taking data 
off the socket and putting it in a list is silly, that defeats the original 
purpose of having the bound (the queue after all is just a list).

But think about this more. Why are we blocking adding responses to the response 
queue? The reason would be to bound memory usage. But the response queue 
doesn't actually bound memory usage. Things going into the response queue come 
either directly from processors or from purgatory, and in either case they are 
taking up memory there. Preventing responses from going out isn't helping 
anything.

So the short term fix is just to remove the bound on the response queue.

The larger problem is that regardless of this change in 0.8 *we aren't 
effectively bounding memory usage*. The reason is the purgatory. The purgatory 
will accumulate requests any time expiration gets slow. This could be due to a 
misconfigured client or due to a slow broker.

So the error is that we are using queue size to indicate "backlog" but really 
the proper measure of backlog is the total number of requests in flight 
including all requests in queues OR in purgatory.

But even once we understand the correct limit, it isn't clear what to do once 
we hit that limit. There are two choices: (1) stop taking new requests, (2) 
prematurely start responding to requests in the purgatory. Neither of these is 
great. Consider the case where one broker gets slow and umpteen produce 
requests pile up in purgatory. If we stop taking new requests that is like a GC 
pause, but since the timeout could be 30 seconds away it will be a long one. If 
we start dumping the purgatory prematurely we will have to respond with an 
error because we lack sufficient acknowledgements.
                
> Deadlock between request handler/processor threads
> --------------------------------------------------
>
>                 Key: KAFKA-702
>                 URL: https://issues.apache.org/jira/browse/KAFKA-702
>             Project: Kafka
>          Issue Type: Bug
>          Components: network
>    Affects Versions: 0.8
>            Reporter: Joel Koshy
>            Assignee: Jay Kreps
>            Priority: Blocker
>              Labels: bugs
>             Fix For: 0.8
>
>         Attachments: KAFKA-702-v1.patch
>
>
> We have seen this a couple of times in the past few days in a test cluster. 
> The request handler and processor threads deadlock on the request/response 
> queues bringing the server to a halt
> "kafka-processor-10251-7" prio=10 tid=0x00007f4a0c3c9800 nid=0x4c39 waiting 
> on condition [0x00007f46f698e000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f48c9dd2698> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>         at 
> java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252)
>         at kafka.network.RequestChannel.sendRequest(RequestChannel.scala:107)
>         at kafka.network.Processor.read(SocketServer.scala:321)
>         at kafka.network.Processor.run(SocketServer.scala:231)
>         at java.lang.Thread.run(Thread.java:619)
> "kafka-request-handler-7" daemon prio=10 tid=0x00007f4a0c57f000 nid=0x4c47 
> waiting on condition [0x00007f46f5b80000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f48c9dd6348> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>         at 
> java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252)
>         at kafka.network.RequestChannel.sendResponse(RequestChannel.scala:112)
>         at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:198)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:58)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
>         at java.lang.Thread.run(Thread.java:619)
> This is because there is a cycle in the wait-for graph of processor threads 
> and request handler threads. If the request handling slows down on a busy 
> server, the request queue fills up. All processor threads quickly block on 
> adding incoming requests to the request queue. Due to this, those threads do 
> not processes responses filling up their response queues. At this moment, the 
> request handler threads start blocking on adding responses to the respective 
> response queues. This can lead to a deadlock where every thread is holding a 
> lock on one queue and asking a lock for the other queue. This brings the 
> server to a halt where it accepts connections but every request gets timed 
> out.
> One way to resolve this is by breaking the cycle in the wait-for graph of the 
> request handler and processor threads. Instead of having the processor 
> threads dispatching the responses, we can have one or more dedicated response 
> handler threads that dequeue responses from the queue and write those on the 
> socket. One downside of this approach is that now access to the selector will 
> have to be synchronized.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to