ZkEventThread is blocked with the following stack trace:

"ZkClient-EventThread-18-localhost:2181" daemon prio=5 tid=7fb31b95c000
nid=0x1194a6000 waiting on condition [1194a5000]
   java.lang.Thread.State: WAITING (parking)
  at sun.misc.Unsafe.park(Native Method)
  - parking to wait for  <7c2201800> (a
java.util.concurrent.CountDownLatch$Sync)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
  at
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
  at
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
  at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
  at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
  at
kafka.controller.TopicDeletionManager.shutdown(TopicDeletionManager.scala:93)
  at
kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:340)
  at
kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)
  at
kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)
  at kafka.utils.Utils$.inLock(Utils.scala:538)
  at
kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:337)
 at
kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1068)
  at
kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067)
  at
kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067)
  at kafka.utils.Utils$.inLock(Utils.scala:538)
  at
kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1067)
  at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
  at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)

What did I do wrong in my test environment?


On Tue, Mar 25, 2014 at 9:24 AM, Bae, Jae Hyeon <metac...@gmail.com> wrote:

> Nope, linux doesn't work. Let me debug why it's not triggered.
>
>
> On Tue, Mar 25, 2014 at 9:21 AM, Bae, Jae Hyeon <metac...@gmail.com>wrote:
>
>> Hm... I cannot reproduce in my local, I downloaded kafka_2.8.0-0.8.1
>> package but it didn't work. Let me try in my linux machine.
>>
>>
>> On Mon, Mar 24, 2014 at 6:11 PM, Neha Narkhede 
>> <neha.narkh...@gmail.com>wrote:
>>
>>> I think you are trying to introduce a session expiration, then could you
>>> try to do the following and see if you can reproduce the session
>>> expiration?
>>>
>>> ./bin/kafka-server-start.sh config/server.properties
>>> kill -SIGSTOP <kafka_server_pid>
>>> sleep 6s
>>>
>>> At this point, the session will be expired and the node will disappear
>>> from
>>> zookeeper. Then you can do the following -
>>>
>>> kill -SIGCONT <kafka_server_pid>
>>>
>>> At this point, you should see the following log message from inside the
>>> handleNewSession() method -
>>>
>>> INFO re-registering broker info in ZK for broker 0
>>> (kafka.server.KafkaHealthcheck)
>>>
>>> Hope that helps.
>>>
>>> Thanks,
>>> Neha
>>>
>>>
>>>
>>> On Mon, Mar 24, 2014 at 3:19 PM, Bae, Jae Hyeon <metac...@gmail.com>
>>> wrote:
>>>
>>> > Hi
>>> >
>>> > On zookeeper session timeout due to some stopping the world long GC
>>> pause
>>> > or zookeeper server outage, Ephemeral nodes on kafka broker and
>>> consumer
>>> > should be recreated but in my test environment, handleNewSession() is
>>> not
>>> > called.
>>> >
>>> > My test scenario is, starting kafka broker locally and put a breakpoint
>>> > somewhere to simulate long pause, and then, I expected
>>> handleNewSession()
>>> > should be called, but it was not and I saw kafka broker zk
>>> registration is
>>> > gone.
>>> >
>>> > Previously, to avoid this problem, I overrode zkclient implementation
>>> > internally to replace createEphemeral() function call with Apache
>>> Curator's
>>> > PersistentEphemeralNodes and for reinstating watchers, I implemented
>>> > ConnectionStateListener to reinstate all watchers when RECONNECTED
>>> happens.
>>> >
>>> > Do you know why I cannot reproduce handleNewSession()?
>>> >
>>> > Thank you
>>> > Best, Jae
>>> >
>>>
>>
>>
>

Reply via email to