[ https://issues.apache.org/jira/browse/KAFKA-1029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13985617#comment-13985617 ]
Michael Noll edited comment on KAFKA-1029 at 4/30/14 3:27 PM: -------------------------------------------------------------- I could finally reproduce the bug again, though still not consistently yet. And I think I have an idea what the root cause might be: I was test-driving Kafka in several VMs locally on a Mac MBP laptop, and the energy saver settings (or sth similar) might be responsible for triggering the problem when the laptop was idle. (In the test run below I intentionally let the laptop idle for 2-3 hours after creating a "test" topic and sending/consuming a few test massages.) The timestamp of when the problem was triggered (11:01) correlates with me unlocking the laptop (user screen). What's strange is that the laptop is configured not to go to sleep when idle, the hard disk is not asked to sleep either (and even if you did enable that setting in Mac OS X it does not have an effect on SSD's, which is the only disk installed in the laptop I was using), etc. -- only the display is allowed to turn off after 10 minutes. So even though the laptop is configured to be "always on" there apparently is something that throws off Kafka/ZooKeeper. Also, as I said in my earlier comment I still cannot reproduce the issue consistently, i.e. sometimes Kafka/ZK work correctly after idling/unlocking; still I think the root cause has ultimately to do with idling on Mac OS X. FWIW I still list further log messages and details just in case that information may be useful in the future. @Jun Rao: Yes, I did see ZK expiration in {{controller.log}}: {code} INFO [SessionExpirationListener on 0], ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener) {code} Below are some further details. Things turn bad at 11:01. *kafka1 (Kafka broker)* {{server.log}} {code} [2014-04-30 07:18:56,481] INFO Completed load of log test-0 with log end offset 0 (kafka.log.Log) [2014-04-30 07:18:56,485] INFO Created log for partition [test,0] in /app/kafka/log with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 1073741824, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 172800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 172800000}. (kafka.log.LogManager) [2014-04-30 07:18:56,486] WARN Partition [test,0] on broker 0: No checkpointed highwatermark is found for partition [test,0] (kafka.cluster.Partition) [2014-04-30 07:19:32,637] INFO Closing socket connection to /127.0.1.1. (kafka.network.Processor) [2014-04-30 07:19:37,328] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) [2014-04-30 07:19:56,356] INFO Closing socket connection to /127.0.1.1. (kafka.network.Processor) [2014-04-30 07:20:52,090] ERROR Closing socket for /127.0.1.1 because of error (kafka.network.Processor) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:251) at sun.nio.ch.IOUtil.read(IOUtil.java:224) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:254) at kafka.utils.Utils$.read(Utils.scala:375) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:347) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:701) [2014-04-30 07:21:02,805] INFO Closing socket connection to /127.0.1.1. (kafka.network.Processor) [2014-04-30 07:21:05,803] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) [2014-04-30 11:01:57,138] INFO Closing socket connection to /127.0.1.1. (kafka.network.Processor) [2014-04-30 11:01:59,561] INFO Closing socket connection to /127.0.1.1. (kafka.network.Processor) [2014-04-30 11:01:59,648] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector) [2014-04-30 11:01:59,692] INFO conflict in /controller data: {"version":1,"brokerid":0,"timestamp":"1398855719690"} stored data: {"version":1,"brokerid":0,"timestamp":"1398855719646"} (kafka.utils.ZkUtils$) [2014-04-30 11:01:59,699] INFO I wrote this conflicted ephemeral node [{"version":1,"brokerid":0,"timestamp":"1398855719690"}] at /controller a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$) [2014-04-30 11:02:05,704] INFO conflict in /controller data: {"version":1,"brokerid":0,"timestamp":"1398855719690"} stored data: {"version":1,"brokerid":0,"timestamp":"1398855719646"} (kafka.utils.ZkUtils$) [2014-04-30 11:02:05,711] INFO I wrote this conflicted ephemeral node [{"version":1,"brokerid":0,"timestamp":"1398855719690"}] at /controller a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$) {code} {{state-change.log}} {code} [2014-04-30 07:18:56,433] TRACE Controller 0 epoch 1 changed state of replica 0 for partition [test,0] from NewReplica to OnlineReplica (state.change.logger) [2014-04-30 07:18:56,443] TRACE Broker 0 received LeaderAndIsr request (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0) correlation id 7 from controller 0 epoch 1 for partition [test,0] (state.change.logger) [2014-04-30 07:18:56,448] TRACE Broker 0 handling LeaderAndIsr request correlationId 7 from controller 0 epoch 1 starting the become-leader transition for partition [test,0] (state.change.logger) [2014-04-30 07:18:56,455] TRACE Broker 0 stopped fetchers as part of become-leader request from controller 0 epoch 1 with correlation id 7 for partition [test,0] (state.change.logger) [2014-04-30 07:18:56,495] TRACE Broker 0 completed LeaderAndIsr request correlationId 7 from controller 0 epoch 1 for the become-leader transition for partition [test,0] (state.change.logger) [2014-04-30 07:18:56,506] TRACE Controller 0 epoch 1 received response correlationId 7 for a request sent to broker id:0,host:kafka1,port:9092 (state.change.logger) [2014-04-30 07:18:56,525] TRACE Broker 0 cached leader info (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0) for partition [test,0] in response to UpdateMetadata request sent by controller 0 epoch 1 with correlation id 7 (state.change.logger) [2014-04-30 07:18:56,526] TRACE Controller 0 epoch 1 received response correlationId 7 for a request sent to broker id:0,host:kafka1,port:9092 (state.change.logger) [2014-04-30 11:01:59,564] TRACE Controller 0 epoch 1 changed partition [test,0] state from OnlinePartition to OfflinePartition (state.change.logger) [2014-04-30 11:01:59,569] TRACE Controller 0 epoch 1 started leader election for partition [test,0] (state.change.logger) [2014-04-30 11:01:59,589] ERROR Controller 0 epoch 1 initiated state change for partition [test,0] from OfflinePartition to OnlinePartition failed (state.change.logger) kafka.common.NoReplicaOnlineException: No replica for partition [test,0] is alive. Live brokers are: [Set()], Assigned replicas are: [List(0)] at kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:61) at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:336) at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:185) at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:99) at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:96) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:96) at kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:433) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:344) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:330) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:330) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:329) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:328) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:328) at kafka.utils.Utils$.inLock(Utils.scala:538) at kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:327) at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) [2014-04-30 11:01:59,625] TRACE Controller 0 epoch 1 changed state of replica 0 for partition [test,0] from OnlineReplica to OfflineReplica (state.change.logger) [2014-04-30 11:01:59,676] TRACE Controller 0 epoch 2 started leader election for partition [test,0] (state.change.logger) [2014-04-30 11:01:59,686] ERROR Controller 0 epoch 2 initiated state change for partition [test,0] from OfflinePartition to OnlinePartition failed (state.change.logger) kafka.common.NoReplicaOnlineException: No replica for partition [test,0] is alive. Live brokers are: [Set()], Assigned replicas are: [List(0)] at kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:61) {code} {{controller.log}} {code} [2014-04-30 07:18:56,354] DEBUG [TopicChangeListener on Controller 0]: Topic change listener fired for path /brokers/topics with children test (kafka.controller.PartitionStateMachine$TopicChangeListener) [2014-04-30 07:18:56,373] INFO [TopicChangeListener on Controller 0]: New topics: [Set(test)], deleted topics: [Set()], new partition replica assignment [Map([test,0] -> List(0))] (kafka.controller.PartitionStateMachine$TopicChangeListener) [2014-04-30 07:18:56,373] INFO [Controller 0]: New topic creation callback for [test,0] (kafka.controller.KafkaController) [2014-04-30 07:18:56,376] INFO [Controller 0]: New partition creation callback for [test,0] (kafka.controller.KafkaController) [2014-04-30 07:18:56,376] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [test,0] (kafka.controller.PartitionStateMachine) [2014-04-30 07:18:56,391] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=test,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-04-30 07:18:56,394] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [test,0] (kafka.controller.PartitionStateMachine) [2014-04-30 07:18:56,395] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [test,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2014-04-30 07:18:56,398] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [test,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1) (kafka.controller.PartitionStateMachine) [2014-04-30 07:18:56,431] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=test,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-04-30 11:01:59,560] INFO [BrokerChangeListener on Controller 0]: Broker change listener fired for path /brokers/ids with children (kafka.controller.ReplicaStateMachine$BrokerChangeListener) [2014-04-30 11:01:59,560] INFO [BrokerChangeListener on Controller 0]: Newly added brokers: , deleted brokers: 0, all live brokers: (kafka.controller.ReplicaStateMachine$BrokerChangeListener) [2014-04-30 11:01:59,561] INFO [Controller-0-to-broker-0-send-thread], Shutting down (kafka.controller.RequestSendThread) [2014-04-30 11:01:59,562] INFO [Controller-0-to-broker-0-send-thread], Stopped (kafka.controller.RequestSendThread) [2014-04-30 11:01:59,562] INFO [Controller-0-to-broker-0-send-thread], Shutdown completed (kafka.controller.RequestSendThread) [2014-04-30 11:01:59,563] INFO [Controller 0]: Broker failure callback for 0 (kafka.controller.KafkaController) [2014-04-30 11:01:59,564] INFO [Controller 0]: Removed ArrayBuffer() from list of shutting down brokers. (kafka.controller.KafkaController) [2014-04-30 11:01:59,564] INFO [Partition state machine on Controller 0]: Invoking state change to OfflinePartition for partitions [test,0] (kafka.controller.PartitionStateMachine) [2014-04-30 11:01:59,588] DEBUG [OfflinePartitionLeaderSelector]: No broker in ISR is alive for [test,0]. Pick the leader from the alive assigned replicas: (kafka.controller.OfflinePartitionLeaderSelector) [2014-04-30 11:01:59,595] INFO [Replica state machine on controller 0]: Invoking state change to OfflineReplica for replicas [Topic=test,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-04-30 11:01:59,597] DEBUG [Controller 0]: Removing replica 0 from ISR 0 for partition [test,0]. (kafka.controller.KafkaController) [2014-04-30 11:01:59,625] INFO [Controller 0]: New leader and ISR for partition [test,0] is {"leader":-1,"leader_epoch":1,"isr":[]} (kafka.controller.KafkaController) [2014-04-30 11:01:59,628] DEBUG The stop replica request (delete = true) sent to broker 0 is (kafka.controller.ControllerBrokerRequestBatch) [2014-04-30 11:01:59,629] DEBUG The stop replica request (delete = false) sent to broker 0 is [Topic=test,Partition=0,Replica=0] (kafka.controller.ControllerBrokerRequestBatch) [2014-04-30 11:01:59,635] WARN [Channel manager on controller 0]: Not sending request Name: StopReplicaRequest; Version: 0; CorrelationId: 11; ClientId: ; DeletePartitions: false; ControllerId: 0; ControllerEpoch: 1; Partitions: [test,0] to broker 0, since it is offline. (kafka.controller.ControllerChannelManager) [2014-04-30 11:01:59,644] INFO [Controller 0]: Controller shutdown complete (kafka.controller.KafkaController) [2014-04-30 11:01:59,649] INFO [Controller 0]: Broker 0 starting become controller state transition (kafka.controller.KafkaController) [2014-04-30 11:01:59,651] INFO [Controller 0]: Controller 0 incremented epoch to 2 (kafka.controller.KafkaController) [2014-04-30 11:01:59,672] INFO [Controller 0]: Partitions undergoing preferred replica election: (kafka.controller.KafkaController) [2014-04-30 11:01:59,672] INFO [Controller 0]: Partitions that completed preferred replica election: (kafka.controller.KafkaController) [2014-04-30 11:01:59,672] INFO [Controller 0]: Resuming preferred replica election for partitions: (kafka.controller.KafkaController) [2014-04-30 11:01:59,673] INFO [Controller 0]: Partitions being reassigned: Map() (kafka.controller.KafkaController) [2014-04-30 11:01:59,673] INFO [Controller 0]: Partitions already reassigned: List() (kafka.controller.KafkaController) [2014-04-30 11:01:59,673] INFO [Controller 0]: Resuming reassignment of partitions: Map() (kafka.controller.KafkaController) [2014-04-30 11:01:59,675] INFO [Controller 0]: List of topics to be deleted: (kafka.controller.KafkaController) [2014-04-30 11:01:59,675] INFO [Controller 0]: List of topics ineligible for deletion: test (kafka.controller.KafkaController) [2014-04-30 11:01:59,675] INFO [Controller 0]: Currently active brokers in the cluster: Set() (kafka.controller.KafkaController) [2014-04-30 11:01:59,675] INFO [Controller 0]: Currently shutting brokers in the cluster: Set() (kafka.controller.KafkaController) [2014-04-30 11:01:59,675] INFO [Controller 0]: Current list of topics in the cluster: Set(test) (kafka.controller.KafkaController) [2014-04-30 11:01:59,676] INFO [Replica state machine on controller 0]: Started replica state machine with initial state -> Map([Topic=test,Partition=0,Replica=0] -> ReplicaDeletionIneligible) (kafka.controller.ReplicaStateMachine) [2014-04-30 11:01:59,685] DEBUG [OfflinePartitionLeaderSelector]: No broker in ISR is alive for [test,0]. Pick the leader from the alive assigned replicas: (kafka.controller.OfflinePartitionLeaderSelector) [2014-04-30 11:01:59,686] INFO [Partition state machine on Controller 0]: Started partition state machine with initial state -> Map([test,0] -> OfflinePartition) (kafka.controller.PartitionStateMachine) [2014-04-30 11:01:59,687] INFO [Controller 0]: Broker 0 is ready to serve as the new controller with epoch 2 (kafka.controller.KafkaController) [2014-04-30 11:01:59,688] INFO [Controller 0]: Starting preferred replica leader election for partitions (kafka.controller.KafkaController) [2014-04-30 11:01:59,688] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions (kafka.controller.PartitionStateMachine) [2014-04-30 11:01:59,690] INFO [SessionExpirationListener on 0], ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener) [2014-04-30 11:01:59,690] INFO [Controller 0]: Controller shutdown complete (kafka.controller.KafkaController) {code} *zookeeper1 (ZooKeeper server)* {{zookeeper.log}} {code} 2014-04-30 07:32:37,460 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted socket connection from /10.0.0.21:43468 2014-04-30 07:32:37,462 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@793] - Connection request from old client /10.0.0.21:43468; will be dropped if server is in r-o mode 2014-04-30 07:32:37,462 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@832] - Client attempting to renew session 0x145b15f015d0000 at /10.0.0.21:43468 2014-04-30 07:32:37,463 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@595] - Established session 0x145b15f015d0000 with negotiated timeout 6000 for client /10.0.0.21:43468 2014-04-30 11:01:57,832 [myid:] - INFO [SessionTracker:ZooKeeperServer@325] - Expiring session 0x145b15f015d0000, timeout of 6000ms exceeded 2014-04-30 11:01:57,833 [myid:] - INFO [SessionTracker:ZooKeeperServer@325] - Expiring session 0x145b15f015d0009, timeout of 6000ms exceeded 2014-04-30 11:01:57,837 [myid:] - INFO [ProcessThread(sid:0 cport:-1)::PrepRequestProcessor@476] - Processed session termination for sessionid: 0x145b15f015d0000 2014-04-30 11:01:57,837 [myid:] - INFO [ProcessThread(sid:0 cport:-1)::PrepRequestProcessor@476] - Processed session termination for sessionid: 0x145b15f015d0009 2014-04-30 11:01:57,842 [myid:] - INFO [SyncThread:0:NIOServerCnxn@1001] - Closed socket connection for client /10.0.0.21:43468 which had sessionid 0x145b15f015d0000 2014-04-30 11:01:57,845 [myid:] - INFO [SyncThread:0:NIOServerCnxn@1001] - Closed socket connection for client /10.0.0.21:43467 which had sessionid 0x145b15f015d0009 2014-04-30 11:01:59,001 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted socket connection from /10.0.0.21:43469 2014-04-30 11:01:59,002 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@793] - Connection request from old client /10.0.0.21:43469; will be dropped if server is in r-o mode 2014-04-30 11:01:59,002 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@832] - Client attempting to renew session 0x145b15f015d0009 at /10.0.0.21:43469 2014-04-30 11:01:59,003 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@588] - Invalid session 0x145b15f015d0009 for client /10.0.0.21:43469, probably expired 2014-04-30 11:01:59,004 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1001] - Closed socket connection for client /10.0.0.21:43469 which had sessionid 0x145b15f015d0009 2014-04-30 11:01:59,005 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted socket connection from /10.0.0.21:43470 2014-04-30 11:01:59,008 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@793] - Connection request from old client /10.0.0.21:43470; will be dropped if server is in r-o mode 2014-04-30 11:01:59,008 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@839] - Client attempting to establish new session at /10.0.0.21:43470 2014-04-30 11:01:59,012 [myid:] - INFO [SyncThread:0:ZooKeeperServer@595] - Established session 0x145b15f015d000b with negotiated timeout 6000 for client /10.0.0.21:43470 2014-04-30 11:01:59,545 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted socket connection from /10.0.0.21:43471 2014-04-30 11:01:59,545 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@793] - Connection request from old client /10.0.0.21:43471; will be dropped if server is in r-o mode 2014-04-30 11:01:59,545 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@832] - Client attempting to renew session 0x145b15f015d0000 at /10.0.0.21:43471 2014-04-30 11:01:59,546 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@588] - Invalid session 0x145b15f015d0000 for client /10.0.0.21:43471, probably expired 2014-04-30 11:01:59,546 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1001] - Closed socket connection for client /10.0.0.21:43471 which had sessionid 0x145b15f015d0000 2014-04-30 11:01:59,553 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted socket connection from /10.0.0.21:43472 2014-04-30 11:01:59,555 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@793] - Connection request from old client /10.0.0.21:43472; will be dropped if server is in r-o mode 2014-04-30 11:01:59,556 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@839] - Client attempting to establish new session at /10.0.0.21:43472 2014-04-30 11:01:59,557 [myid:] - INFO [SyncThread:0:ZooKeeperServer@595] - Established session 0x145b15f015d000c with negotiated timeout 6000 for client /10.0.0.21:43472 2014-04-30 11:01:59,687 [myid:] - INFO [ProcessThread(sid:0 cport:-1)::PrepRequestProcessor@627] - Got user-level KeeperException when processing sessionid:0x145b15f015d000c type:delete cxid:0x19 zxid:0x5c txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election 2014-04-30 11:01:59,689 [myid:] - INFO [ProcessThread(sid:0 cport:-1)::PrepRequestProcessor@627] - Got user-level KeeperException when processing sessionid:0x145b15f015d000c type:create cxid:0x1a zxid:0x5d txntype:-1 reqpath:n/a Error Path:/controller Error:KeeperErrorCode = NodeExists for /controller 2014-04-30 11:02:05,700 [myid:] - INFO [ProcessThread(sid:0 cport:-1)::PrepRequestProcessor@627] - Got user-level KeeperException when processing sessionid:0x145b15f015d000c type:create cxid:0x1d zxid:0x5e txntype:-1 reqpath:n/a Error Path:/controller Error:KeeperErrorCode = NodeExists for /controller {code} *State of topic after issue did trigger* {code} $ date;bin/kafka-topics.sh --zookeeper zookeeper1 --describe --topic test Wed Apr 30 11:02:25 UTC 2014 Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: -1 Replicas: 0 Isr: {code} *Notes* Kafka kinda recovered at some point. What indicated a recovery: - {{state-change.log}} reverted back to normal messages (partition wen tfrom OfflinePartition to OnlinePartition with leader 0; leader -1 was replaced with leader 0; etc.) - {{kafka-topics.sh --descripe --topic test}} showed normal operations, too, i.e. one partition with one replica with one leader and with one ISR. What speaks against a full recovery: - {{server.log}} was still showing an indefinite loop of messages "I wrote this conflicted ephemeral node [{"version":1,"brokerid":0,"timestamp":"1398860644679"}] at /controller a while back in a different session". was (Author: miguno): I could finally reproduce the bug again, though still not consistently yet. And I think I have an idea what the root cause might be: I was test-driving Kafka in several VMs locally on a Mac MBP laptop, and the energy saver settings (or sth similar) might be responsible for triggering the problem when the laptop was idle. (In the test run below I intentionally let the laptop idle for 2-3 hours after creating a "test" topic and sending/consuming a few test massages.) The timestamp of when the problem was triggered (11:01) correlates with me unlocking the laptop (user screen). What's strange is that the laptop is configured not to go to sleep when idle, the hard disk is not asked to sleep either (and even if you did enable that setting in Mac OS X it does not have an affect on SSD's, which is the only disk installed in the laptop I was using), etc. -- only the display is allowed to turn off after 10 minutes. So even though the laptop is configured to be "always on" there apparently is something that throws off Kafka/ZooKeeper. Also, as I said in my earlier comment I still cannot reproduce the issue consistently, i.e. sometimes Kafka/ZK work correctly after idling/unlocking; still I think the root cause has ultimately to do with idling on Mac OS X. FWIW I still list further log messages and details just in case that information may be useful in the future. @Jun Rao: Yes, I did see ZK expiration in {{controller.log}}: {code} INFO [SessionExpirationListener on 0], ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener) {code} Below are some further details. Things turn bad at 11:01. *kafka1 (Kafka broker)* {{server.log}} {code} [2014-04-30 07:18:56,481] INFO Completed load of log test-0 with log end offset 0 (kafka.log.Log) [2014-04-30 07:18:56,485] INFO Created log for partition [test,0] in /app/kafka/log with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 1073741824, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 172800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 172800000}. (kafka.log.LogManager) [2014-04-30 07:18:56,486] WARN Partition [test,0] on broker 0: No checkpointed highwatermark is found for partition [test,0] (kafka.cluster.Partition) [2014-04-30 07:19:32,637] INFO Closing socket connection to /127.0.1.1. (kafka.network.Processor) [2014-04-30 07:19:37,328] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) [2014-04-30 07:19:56,356] INFO Closing socket connection to /127.0.1.1. (kafka.network.Processor) [2014-04-30 07:20:52,090] ERROR Closing socket for /127.0.1.1 because of error (kafka.network.Processor) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:251) at sun.nio.ch.IOUtil.read(IOUtil.java:224) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:254) at kafka.utils.Utils$.read(Utils.scala:375) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:347) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:701) [2014-04-30 07:21:02,805] INFO Closing socket connection to /127.0.1.1. (kafka.network.Processor) [2014-04-30 07:21:05,803] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) [2014-04-30 11:01:57,138] INFO Closing socket connection to /127.0.1.1. (kafka.network.Processor) [2014-04-30 11:01:59,561] INFO Closing socket connection to /127.0.1.1. (kafka.network.Processor) [2014-04-30 11:01:59,648] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector) [2014-04-30 11:01:59,692] INFO conflict in /controller data: {"version":1,"brokerid":0,"timestamp":"1398855719690"} stored data: {"version":1,"brokerid":0,"timestamp":"1398855719646"} (kafka.utils.ZkUtils$) [2014-04-30 11:01:59,699] INFO I wrote this conflicted ephemeral node [{"version":1,"brokerid":0,"timestamp":"1398855719690"}] at /controller a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$) [2014-04-30 11:02:05,704] INFO conflict in /controller data: {"version":1,"brokerid":0,"timestamp":"1398855719690"} stored data: {"version":1,"brokerid":0,"timestamp":"1398855719646"} (kafka.utils.ZkUtils$) [2014-04-30 11:02:05,711] INFO I wrote this conflicted ephemeral node [{"version":1,"brokerid":0,"timestamp":"1398855719690"}] at /controller a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$) {code} {{state-change.log}} {code} [2014-04-30 07:18:56,433] TRACE Controller 0 epoch 1 changed state of replica 0 for partition [test,0] from NewReplica to OnlineReplica (state.change.logger) [2014-04-30 07:18:56,443] TRACE Broker 0 received LeaderAndIsr request (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0) correlation id 7 from controller 0 epoch 1 for partition [test,0] (state.change.logger) [2014-04-30 07:18:56,448] TRACE Broker 0 handling LeaderAndIsr request correlationId 7 from controller 0 epoch 1 starting the become-leader transition for partition [test,0] (state.change.logger) [2014-04-30 07:18:56,455] TRACE Broker 0 stopped fetchers as part of become-leader request from controller 0 epoch 1 with correlation id 7 for partition [test,0] (state.change.logger) [2014-04-30 07:18:56,495] TRACE Broker 0 completed LeaderAndIsr request correlationId 7 from controller 0 epoch 1 for the become-leader transition for partition [test,0] (state.change.logger) [2014-04-30 07:18:56,506] TRACE Controller 0 epoch 1 received response correlationId 7 for a request sent to broker id:0,host:kafka1,port:9092 (state.change.logger) [2014-04-30 07:18:56,525] TRACE Broker 0 cached leader info (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0) for partition [test,0] in response to UpdateMetadata request sent by controller 0 epoch 1 with correlation id 7 (state.change.logger) [2014-04-30 07:18:56,526] TRACE Controller 0 epoch 1 received response correlationId 7 for a request sent to broker id:0,host:kafka1,port:9092 (state.change.logger) [2014-04-30 11:01:59,564] TRACE Controller 0 epoch 1 changed partition [test,0] state from OnlinePartition to OfflinePartition (state.change.logger) [2014-04-30 11:01:59,569] TRACE Controller 0 epoch 1 started leader election for partition [test,0] (state.change.logger) [2014-04-30 11:01:59,589] ERROR Controller 0 epoch 1 initiated state change for partition [test,0] from OfflinePartition to OnlinePartition failed (state.change.logger) kafka.common.NoReplicaOnlineException: No replica for partition [test,0] is alive. Live brokers are: [Set()], Assigned replicas are: [List(0)] at kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:61) at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:336) at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:185) at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:99) at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:96) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:96) at kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:433) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:344) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:330) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:330) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:329) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:328) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:328) at kafka.utils.Utils$.inLock(Utils.scala:538) at kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:327) at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) [2014-04-30 11:01:59,625] TRACE Controller 0 epoch 1 changed state of replica 0 for partition [test,0] from OnlineReplica to OfflineReplica (state.change.logger) [2014-04-30 11:01:59,676] TRACE Controller 0 epoch 2 started leader election for partition [test,0] (state.change.logger) [2014-04-30 11:01:59,686] ERROR Controller 0 epoch 2 initiated state change for partition [test,0] from OfflinePartition to OnlinePartition failed (state.change.logger) kafka.common.NoReplicaOnlineException: No replica for partition [test,0] is alive. Live brokers are: [Set()], Assigned replicas are: [List(0)] at kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:61) {code} {{controller.log}} {code} [2014-04-30 07:18:56,354] DEBUG [TopicChangeListener on Controller 0]: Topic change listener fired for path /brokers/topics with children test (kafka.controller.PartitionStateMachine$TopicChangeListener) [2014-04-30 07:18:56,373] INFO [TopicChangeListener on Controller 0]: New topics: [Set(test)], deleted topics: [Set()], new partition replica assignment [Map([test,0] -> List(0))] (kafka.controller.PartitionStateMachine$TopicChangeListener) [2014-04-30 07:18:56,373] INFO [Controller 0]: New topic creation callback for [test,0] (kafka.controller.KafkaController) [2014-04-30 07:18:56,376] INFO [Controller 0]: New partition creation callback for [test,0] (kafka.controller.KafkaController) [2014-04-30 07:18:56,376] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [test,0] (kafka.controller.PartitionStateMachine) [2014-04-30 07:18:56,391] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=test,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-04-30 07:18:56,394] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [test,0] (kafka.controller.PartitionStateMachine) [2014-04-30 07:18:56,395] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [test,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2014-04-30 07:18:56,398] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [test,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1) (kafka.controller.PartitionStateMachine) [2014-04-30 07:18:56,431] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=test,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-04-30 11:01:59,560] INFO [BrokerChangeListener on Controller 0]: Broker change listener fired for path /brokers/ids with children (kafka.controller.ReplicaStateMachine$BrokerChangeListener) [2014-04-30 11:01:59,560] INFO [BrokerChangeListener on Controller 0]: Newly added brokers: , deleted brokers: 0, all live brokers: (kafka.controller.ReplicaStateMachine$BrokerChangeListener) [2014-04-30 11:01:59,561] INFO [Controller-0-to-broker-0-send-thread], Shutting down (kafka.controller.RequestSendThread) [2014-04-30 11:01:59,562] INFO [Controller-0-to-broker-0-send-thread], Stopped (kafka.controller.RequestSendThread) [2014-04-30 11:01:59,562] INFO [Controller-0-to-broker-0-send-thread], Shutdown completed (kafka.controller.RequestSendThread) [2014-04-30 11:01:59,563] INFO [Controller 0]: Broker failure callback for 0 (kafka.controller.KafkaController) [2014-04-30 11:01:59,564] INFO [Controller 0]: Removed ArrayBuffer() from list of shutting down brokers. (kafka.controller.KafkaController) [2014-04-30 11:01:59,564] INFO [Partition state machine on Controller 0]: Invoking state change to OfflinePartition for partitions [test,0] (kafka.controller.PartitionStateMachine) [2014-04-30 11:01:59,588] DEBUG [OfflinePartitionLeaderSelector]: No broker in ISR is alive for [test,0]. Pick the leader from the alive assigned replicas: (kafka.controller.OfflinePartitionLeaderSelector) [2014-04-30 11:01:59,595] INFO [Replica state machine on controller 0]: Invoking state change to OfflineReplica for replicas [Topic=test,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-04-30 11:01:59,597] DEBUG [Controller 0]: Removing replica 0 from ISR 0 for partition [test,0]. (kafka.controller.KafkaController) [2014-04-30 11:01:59,625] INFO [Controller 0]: New leader and ISR for partition [test,0] is {"leader":-1,"leader_epoch":1,"isr":[]} (kafka.controller.KafkaController) [2014-04-30 11:01:59,628] DEBUG The stop replica request (delete = true) sent to broker 0 is (kafka.controller.ControllerBrokerRequestBatch) [2014-04-30 11:01:59,629] DEBUG The stop replica request (delete = false) sent to broker 0 is [Topic=test,Partition=0,Replica=0] (kafka.controller.ControllerBrokerRequestBatch) [2014-04-30 11:01:59,635] WARN [Channel manager on controller 0]: Not sending request Name: StopReplicaRequest; Version: 0; CorrelationId: 11; ClientId: ; DeletePartitions: false; ControllerId: 0; ControllerEpoch: 1; Partitions: [test,0] to broker 0, since it is offline. (kafka.controller.ControllerChannelManager) [2014-04-30 11:01:59,644] INFO [Controller 0]: Controller shutdown complete (kafka.controller.KafkaController) [2014-04-30 11:01:59,649] INFO [Controller 0]: Broker 0 starting become controller state transition (kafka.controller.KafkaController) [2014-04-30 11:01:59,651] INFO [Controller 0]: Controller 0 incremented epoch to 2 (kafka.controller.KafkaController) [2014-04-30 11:01:59,672] INFO [Controller 0]: Partitions undergoing preferred replica election: (kafka.controller.KafkaController) [2014-04-30 11:01:59,672] INFO [Controller 0]: Partitions that completed preferred replica election: (kafka.controller.KafkaController) [2014-04-30 11:01:59,672] INFO [Controller 0]: Resuming preferred replica election for partitions: (kafka.controller.KafkaController) [2014-04-30 11:01:59,673] INFO [Controller 0]: Partitions being reassigned: Map() (kafka.controller.KafkaController) [2014-04-30 11:01:59,673] INFO [Controller 0]: Partitions already reassigned: List() (kafka.controller.KafkaController) [2014-04-30 11:01:59,673] INFO [Controller 0]: Resuming reassignment of partitions: Map() (kafka.controller.KafkaController) [2014-04-30 11:01:59,675] INFO [Controller 0]: List of topics to be deleted: (kafka.controller.KafkaController) [2014-04-30 11:01:59,675] INFO [Controller 0]: List of topics ineligible for deletion: test (kafka.controller.KafkaController) [2014-04-30 11:01:59,675] INFO [Controller 0]: Currently active brokers in the cluster: Set() (kafka.controller.KafkaController) [2014-04-30 11:01:59,675] INFO [Controller 0]: Currently shutting brokers in the cluster: Set() (kafka.controller.KafkaController) [2014-04-30 11:01:59,675] INFO [Controller 0]: Current list of topics in the cluster: Set(test) (kafka.controller.KafkaController) [2014-04-30 11:01:59,676] INFO [Replica state machine on controller 0]: Started replica state machine with initial state -> Map([Topic=test,Partition=0,Replica=0] -> ReplicaDeletionIneligible) (kafka.controller.ReplicaStateMachine) [2014-04-30 11:01:59,685] DEBUG [OfflinePartitionLeaderSelector]: No broker in ISR is alive for [test,0]. Pick the leader from the alive assigned replicas: (kafka.controller.OfflinePartitionLeaderSelector) [2014-04-30 11:01:59,686] INFO [Partition state machine on Controller 0]: Started partition state machine with initial state -> Map([test,0] -> OfflinePartition) (kafka.controller.PartitionStateMachine) [2014-04-30 11:01:59,687] INFO [Controller 0]: Broker 0 is ready to serve as the new controller with epoch 2 (kafka.controller.KafkaController) [2014-04-30 11:01:59,688] INFO [Controller 0]: Starting preferred replica leader election for partitions (kafka.controller.KafkaController) [2014-04-30 11:01:59,688] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions (kafka.controller.PartitionStateMachine) [2014-04-30 11:01:59,690] INFO [SessionExpirationListener on 0], ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener) [2014-04-30 11:01:59,690] INFO [Controller 0]: Controller shutdown complete (kafka.controller.KafkaController) {code} *zookeeper1 (ZooKeeper server)* {{zookeeper.log}} {code} 2014-04-30 07:32:37,460 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted socket connection from /10.0.0.21:43468 2014-04-30 07:32:37,462 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@793] - Connection request from old client /10.0.0.21:43468; will be dropped if server is in r-o mode 2014-04-30 07:32:37,462 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@832] - Client attempting to renew session 0x145b15f015d0000 at /10.0.0.21:43468 2014-04-30 07:32:37,463 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@595] - Established session 0x145b15f015d0000 with negotiated timeout 6000 for client /10.0.0.21:43468 2014-04-30 11:01:57,832 [myid:] - INFO [SessionTracker:ZooKeeperServer@325] - Expiring session 0x145b15f015d0000, timeout of 6000ms exceeded 2014-04-30 11:01:57,833 [myid:] - INFO [SessionTracker:ZooKeeperServer@325] - Expiring session 0x145b15f015d0009, timeout of 6000ms exceeded 2014-04-30 11:01:57,837 [myid:] - INFO [ProcessThread(sid:0 cport:-1)::PrepRequestProcessor@476] - Processed session termination for sessionid: 0x145b15f015d0000 2014-04-30 11:01:57,837 [myid:] - INFO [ProcessThread(sid:0 cport:-1)::PrepRequestProcessor@476] - Processed session termination for sessionid: 0x145b15f015d0009 2014-04-30 11:01:57,842 [myid:] - INFO [SyncThread:0:NIOServerCnxn@1001] - Closed socket connection for client /10.0.0.21:43468 which had sessionid 0x145b15f015d0000 2014-04-30 11:01:57,845 [myid:] - INFO [SyncThread:0:NIOServerCnxn@1001] - Closed socket connection for client /10.0.0.21:43467 which had sessionid 0x145b15f015d0009 2014-04-30 11:01:59,001 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted socket connection from /10.0.0.21:43469 2014-04-30 11:01:59,002 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@793] - Connection request from old client /10.0.0.21:43469; will be dropped if server is in r-o mode 2014-04-30 11:01:59,002 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@832] - Client attempting to renew session 0x145b15f015d0009 at /10.0.0.21:43469 2014-04-30 11:01:59,003 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@588] - Invalid session 0x145b15f015d0009 for client /10.0.0.21:43469, probably expired 2014-04-30 11:01:59,004 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1001] - Closed socket connection for client /10.0.0.21:43469 which had sessionid 0x145b15f015d0009 2014-04-30 11:01:59,005 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted socket connection from /10.0.0.21:43470 2014-04-30 11:01:59,008 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@793] - Connection request from old client /10.0.0.21:43470; will be dropped if server is in r-o mode 2014-04-30 11:01:59,008 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@839] - Client attempting to establish new session at /10.0.0.21:43470 2014-04-30 11:01:59,012 [myid:] - INFO [SyncThread:0:ZooKeeperServer@595] - Established session 0x145b15f015d000b with negotiated timeout 6000 for client /10.0.0.21:43470 2014-04-30 11:01:59,545 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted socket connection from /10.0.0.21:43471 2014-04-30 11:01:59,545 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@793] - Connection request from old client /10.0.0.21:43471; will be dropped if server is in r-o mode 2014-04-30 11:01:59,545 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@832] - Client attempting to renew session 0x145b15f015d0000 at /10.0.0.21:43471 2014-04-30 11:01:59,546 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@588] - Invalid session 0x145b15f015d0000 for client /10.0.0.21:43471, probably expired 2014-04-30 11:01:59,546 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1001] - Closed socket connection for client /10.0.0.21:43471 which had sessionid 0x145b15f015d0000 2014-04-30 11:01:59,553 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted socket connection from /10.0.0.21:43472 2014-04-30 11:01:59,555 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@793] - Connection request from old client /10.0.0.21:43472; will be dropped if server is in r-o mode 2014-04-30 11:01:59,556 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@839] - Client attempting to establish new session at /10.0.0.21:43472 2014-04-30 11:01:59,557 [myid:] - INFO [SyncThread:0:ZooKeeperServer@595] - Established session 0x145b15f015d000c with negotiated timeout 6000 for client /10.0.0.21:43472 2014-04-30 11:01:59,687 [myid:] - INFO [ProcessThread(sid:0 cport:-1)::PrepRequestProcessor@627] - Got user-level KeeperException when processing sessionid:0x145b15f015d000c type:delete cxid:0x19 zxid:0x5c txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election 2014-04-30 11:01:59,689 [myid:] - INFO [ProcessThread(sid:0 cport:-1)::PrepRequestProcessor@627] - Got user-level KeeperException when processing sessionid:0x145b15f015d000c type:create cxid:0x1a zxid:0x5d txntype:-1 reqpath:n/a Error Path:/controller Error:KeeperErrorCode = NodeExists for /controller 2014-04-30 11:02:05,700 [myid:] - INFO [ProcessThread(sid:0 cport:-1)::PrepRequestProcessor@627] - Got user-level KeeperException when processing sessionid:0x145b15f015d000c type:create cxid:0x1d zxid:0x5e txntype:-1 reqpath:n/a Error Path:/controller Error:KeeperErrorCode = NodeExists for /controller {code} *State of topic after issue did trigger* {code} $ date;bin/kafka-topics.sh --zookeeper zookeeper1 --describe --topic test Wed Apr 30 11:02:25 UTC 2014 Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: -1 Replicas: 0 Isr: {code} *Notes* Kafka kinda recovered at some point. What indicated a recovery: - {{state-change.log}} reverted back to normal messages (partition wen tfrom OfflinePartition to OnlinePartition with leader 0; leader -1 was replaced with leader 0; etc.) - {{kafka-topics.sh --descripe --topic test}} showed normal operations, too, i.e. one partition with one replica with one leader and with one ISR. What speaks against a full recovery: - {{server.log}} was still showing an indefinite loop of messages "I wrote this conflicted ephemeral node [{"version":1,"brokerid":0,"timestamp":"1398860644679"}] at /controller a while back in a different session". > Zookeeper leader election stuck in ephemeral node retry loop > ------------------------------------------------------------ > > Key: KAFKA-1029 > URL: https://issues.apache.org/jira/browse/KAFKA-1029 > Project: Kafka > Issue Type: Bug > Components: controller > Affects Versions: 0.8.0 > Reporter: Sam Meder > Assignee: Sam Meder > Priority: Blocker > Fix For: 0.8.0 > > Attachments: > 0002-KAFKA-1029-Use-brokerId-instead-of-leaderId-when-tri.patch > > > We're seeing the following log statements (over and over): > [2013-08-27 07:21:49,538] INFO conflict in /controller data: { "brokerid":3, > "timestamp":"1377587945206", "version":1 } stored data: { "brokerid":2, > "timestamp":"1377587460904", "version":1 } (kafka.utils.ZkUtils$) > [2013-08-27 07:21:49,559] INFO I wrote this conflicted ephemeral node [{ > "brokerid":3, "timestamp":"1377587945206", "version":1 }] at /controller a > while back in a different session, hence I will backoff for this node to be > deleted by Zookeeper and retry (kafka.utils.ZkUtils$) > where the broker is essentially stuck in the loop that is trying to deal with > left-over ephemeral nodes. The code looks a bit racy to me. In particular: > ZookeeperLeaderElector: > def elect: Boolean = { > controllerContext.zkClient.subscribeDataChanges(electionPath, > leaderChangeListener) > val timestamp = SystemTime.milliseconds.toString > val electString = ... > try { > > createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, > electionPath, electString, leaderId, > (controllerString : String, leaderId : Any) => > KafkaController.parseControllerId(controllerString) == > leaderId.asInstanceOf[Int], > controllerContext.zkSessionTimeout) > leaderChangeListener is registered before the create call (by the way, it > looks like a new registration will be added every elect call - shouldn't it > register in startup()?) so can update leaderId to the current leader before > the call to create. If that happens then we will continuously get node exists > exceptions and the checker function will always return true, i.e. we will > never get out of the while(true) loop. > I think the right fix here is to pass brokerId instead of leaderId when > calling create, i.e. > createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, > electionPath, electString, brokerId, > (controllerString : String, leaderId : Any) => > KafkaController.parseControllerId(controllerString) == > leaderId.asInstanceOf[Int], > controllerContext.zkSessionTimeout) > The loop dealing with the ephemeral node bug is now only triggered for the > broker that owned the node previously, although I am still not 100% sure if > that is sufficient. -- This message was sent by Atlassian JIRA (v6.2#6252)