[ 
https://issues.apache.org/jira/browse/KAFKA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14361155#comment-14361155
 ] 

Dmitry Bugaychenko commented on KAFKA-1305:
-------------------------------------------

Yes, data loss is tolerated to some extend. With small queue it was about 
lossing less then a second or even none of data and were considered fine, but 
with extended queue it is about few minutes - using acks in this case will 
simply cause producers to crash, denie their service or drop messages (because 
for few minutes they basically can not produce).  In the end we decided to 
reduce the queue to default and to apply three patches:

# Add throttling to prefered replica elections and controlled shuttdown 
leadership reasignement
# Add timeout for adding messages to queue in order to avoid locked controller
# Add separate timeout for sending controlled shutdown message - in our setup 
it takes about 10 minutes and this value is meaningless and dangerous for other 
kind of controller-to-broker communication

Things seems to work and data are not lost, but shutdown and rebalance are 
slow. Instead of throttling it could be better to wait for previous leader 
movement to be completed by all the participatnts before moving to next one. It 
is also possible to do leader movements in batches (at least api seems to 
support that).

> Controller can hang on controlled shutdown with auto leader balance enabled
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-1305
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1305
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Joel Koshy
>            Assignee: Sriharsha Chintalapani
>            Priority: Blocker
>             Fix For: 0.8.2.0, 0.9.0
>
>         Attachments: KAFKA-1305.patch, KAFKA-1305.patch, 
> KAFKA-1305_2014-10-13_07:30:45.patch
>
>
> This is relatively easy to reproduce especially when doing a rolling bounce.
> What happened here is as follows:
> 1. The previous controller was bounced and broker 265 became the new 
> controller.
> 2. I went on to do a controlled shutdown of broker 265 (the new controller).
> 3. In the mean time the automatically scheduled preferred replica leader 
> election process started doing its thing and starts sending 
> LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers).  
> (t@113 below).
> 4. While that's happening, the controlled shutdown process on 265 succeeds 
> and proceeds to deregister itself from ZooKeeper and shuts down the socket 
> server.
> 5. (ReplicaStateMachine actually removes deregistered brokers from the 
> controller channel manager's list of brokers to send requests to.  However, 
> that removal cannot take place (t@18 below) because preferred replica leader 
> election task owns the controller lock.)
> 6. So the request thread to broker 265 gets into infinite retries.
> 7. The entire broker shutdown process is blocked on controller shutdown for 
> the same reason (it needs to acquire the controller lock).
> Relevant portions from the thread-dump:
> "Controller-265-to-broker-265-send-thread" - Thread t@113
>    java.lang.Thread.State: TIMED_WAITING
>       at java.lang.Thread.sleep(Native Method)
>       at 
> kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143)
>       at kafka.utils.Utils$.swallow(Utils.scala:167)
>       at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
>       at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
>       at kafka.utils.Logging$class.swallow(Logging.scala:94)
>       at kafka.utils.Utils$.swallow(Utils.scala:46)
>       at 
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143)
>       at 
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>       - locked java.lang.Object@6dbf14a7
>       at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>    Locked ownable synchronizers:
>       - None
> ...
> "Thread-4" - Thread t@17
>    java.lang.Thread.State: WAITING on 
> java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: 
> kafka-scheduler-0
>       at sun.misc.Unsafe.park(Native Method)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
>       at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
>       at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
>       at kafka.utils.Utils$.inLock(Utils.scala:536)
>       at kafka.controller.KafkaController.shutdown(KafkaController.scala:642)
>       at 
> kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242)
>       at kafka.utils.Utils$.swallow(Utils.scala:167)
>       at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
>       at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
>       at kafka.utils.Logging$class.swallow(Logging.scala:94)
>       at kafka.utils.Utils$.swallow(Utils.scala:46)
>       at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242)
>       at 
> kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46)
>       at kafka.Kafka$$anon$1.run(Kafka.scala:42)
> ...
> "kafka-scheduler-0" - Thread t@117
>    java.lang.Thread.State: WAITING on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1dc407fc
>       at sun.misc.Unsafe.park(Native Method)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>       at 
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
>       at 
> kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57)
>       - locked java.lang.Object@578b748f
>       at 
> kafka.controller.KafkaController.sendRequest(KafkaController.scala:657)
>       at 
> kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:290)
>       at 
> kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:282)
>       at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
>       at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>       at 
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
>       at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
>       at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>       at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
>       at 
> kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:282)
>       at 
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:126)
>       at 
> kafka.controller.KafkaController.onPreferredReplicaElection(KafkaController.scala:612)
>       at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply$mcV$sp(KafkaController.scala:1119)
>       at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1114)
>       at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1114)
>       at kafka.utils.Utils$.inLock(Utils.scala:538)
>       at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1111)
>       at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1109)
>       at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
>       at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>       at 
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
>       at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
>       at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>       at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
>       at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1109)
>       at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1088)
>       at 
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
>       at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
>       at 
> kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance(KafkaController.scala:1088)
>       at 
> kafka.controller.KafkaController$$anonfun$onControllerFailover$1.apply$mcV$sp(KafkaController.scala:323)
>       at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>       at 
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
>       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>       at java.lang.Thread.run(Thread.java:662)
>    Locked ownable synchronizers:
>       - locked java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840
>       - locked java.util.concurrent.locks.ReentrantLock$NonfairSync@4918530
> ...
> "ZkClient-EventThread-18-/kafka-shadow" - Thread t@18
>    java.lang.Thread.State: WAITING on 
> java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: 
> kafka-scheduler-0
>       at sun.misc.Unsafe.park(Native Method)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
>       at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
>       at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
>       at kafka.utils.Utils$.inLock(Utils.scala:536)
>       at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:328)
>       at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
>       at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to