ZkEventThread is blocked with the following stack trace: "ZkClient-EventThread-18-localhost:2181" daemon prio=5 tid=7fb31b95c000 nid=0x1194a6000 waiting on condition [1194a5000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <7c2201800> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207) at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36) at kafka.controller.TopicDeletionManager.shutdown(TopicDeletionManager.scala:93) at kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:340) at kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337) at kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337) at kafka.utils.Utils$.inLock(Utils.scala:538) at kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:337) at kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1068) at kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067) at kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067) at kafka.utils.Utils$.inLock(Utils.scala:538) at kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1067) at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
What did I do wrong in my test environment? On Tue, Mar 25, 2014 at 9:24 AM, Bae, Jae Hyeon <metac...@gmail.com> wrote: > Nope, linux doesn't work. Let me debug why it's not triggered. > > > On Tue, Mar 25, 2014 at 9:21 AM, Bae, Jae Hyeon <metac...@gmail.com>wrote: > >> Hm... I cannot reproduce in my local, I downloaded kafka_2.8.0-0.8.1 >> package but it didn't work. Let me try in my linux machine. >> >> >> On Mon, Mar 24, 2014 at 6:11 PM, Neha Narkhede >> <neha.narkh...@gmail.com>wrote: >> >>> I think you are trying to introduce a session expiration, then could you >>> try to do the following and see if you can reproduce the session >>> expiration? >>> >>> ./bin/kafka-server-start.sh config/server.properties >>> kill -SIGSTOP <kafka_server_pid> >>> sleep 6s >>> >>> At this point, the session will be expired and the node will disappear >>> from >>> zookeeper. Then you can do the following - >>> >>> kill -SIGCONT <kafka_server_pid> >>> >>> At this point, you should see the following log message from inside the >>> handleNewSession() method - >>> >>> INFO re-registering broker info in ZK for broker 0 >>> (kafka.server.KafkaHealthcheck) >>> >>> Hope that helps. >>> >>> Thanks, >>> Neha >>> >>> >>> >>> On Mon, Mar 24, 2014 at 3:19 PM, Bae, Jae Hyeon <metac...@gmail.com> >>> wrote: >>> >>> > Hi >>> > >>> > On zookeeper session timeout due to some stopping the world long GC >>> pause >>> > or zookeeper server outage, Ephemeral nodes on kafka broker and >>> consumer >>> > should be recreated but in my test environment, handleNewSession() is >>> not >>> > called. >>> > >>> > My test scenario is, starting kafka broker locally and put a breakpoint >>> > somewhere to simulate long pause, and then, I expected >>> handleNewSession() >>> > should be called, but it was not and I saw kafka broker zk >>> registration is >>> > gone. >>> > >>> > Previously, to avoid this problem, I overrode zkclient implementation >>> > internally to replace createEphemeral() function call with Apache >>> Curator's >>> > PersistentEphemeralNodes and for reinstating watchers, I implemented >>> > ConnectionStateListener to reinstate all watchers when RECONNECTED >>> happens. >>> > >>> > Do you know why I cannot reproduce handleNewSession()? >>> > >>> > Thank you >>> > Best, Jae >>> > >>> >> >> >