Hello, I am using 0.8.0-beta1, with two brokers. After using bin/kafka-create-topic.sh created a topic named "ead_click", under bin/kafka-list-topic.sh I can see the topic seems to be created successfully:
[2013-11-01 15:49:35,388] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) topic: ead_click partition: 0 leader: 1 replicas: 1 isr: 1 [2013-11-01 15:49:35,720] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) But the producer for the topic says can not find leader. Then I start a consumer, an exception throwed: [2013-11-01 15:50:32,807] INFO [ConsumerFetcherThread-console-consumer-50963_qt103.corp.xxx.com-1383292232223-b874ebd0-0-1], Starting (kafka.consumer.ConsumerFetcherThread) [2013-11-01 15:50:32,833] ERROR [console-consumer-50963_qt103.corp.xxx.com-1383292232223-b874ebd0-leader-finder-thread], Error due to (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) kafka.common.NotLeaderForPartitionException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) at java.lang.reflect.Constructor.newInstance(Constructor.java:513) at java.lang.Class.newInstance0(Class.java:355) at java.lang.Class.newInstance(Class.java:308) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70) at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:147) at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60) at kafka.server.AbstractFetcherThread.addPartition(AbstractFetcherThread.scala:180) at kafka.server.AbstractFetcherManager.addFetcher(AbstractFetcherManager.scala:80) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$7.apply(ConsumerFetcherManager.scala:95) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$7.apply(ConsumerFetcherManager.scala:92) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:92) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) In state-change.log of broker 0, this exception can be found: [2013-11-01 15:25:04,608] TRACE Broker 1 received LeaderAndIsr request correlationId 9 from controller 1 epoch 1 starting the become-leader transition for partition [ead_click,0] (state.change.logger) [2013-11-01 15:25:04,635] ERROR Error on broker 1 while processing LeaderAndIsr request correlationId 9 received from controller 1 epoch 1 for partition (ead_click,0) (state.change.logger) java.util.NoSuchElementException: key not found: /disk1/test/kafka-data at scala.collection.MapLike$class.default(MapLike.scala:223) at scala.collection.immutable.Map$Map1.default(Map.scala:93) at scala.collection.MapLike$class.apply(MapLike.scala:134) at scala.collection.immutable.Map$Map1.apply(Map.scala:93) at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:83) at kafka.cluster.Partition$$anonfun$1.apply(Partition.scala:149) at kafka.cluster.Partition$$anonfun$1.apply(Partition.scala:149) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.immutable.List.map(List.scala:45) at kafka.cluster.Partition.makeLeader(Partition.scala:149) at kafka.server.ReplicaManager.kafka$server$ReplicaManager$$makeLeader(ReplicaManager.scala:257) at kafka.server.ReplicaManager$$anonfun$becomeLeaderOrFollower$3.apply(ReplicaManager.scala:221) at kafka.server.ReplicaManager$$anonfun$becomeLeaderOrFollower$3.apply(ReplicaManager.scala:213) at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) at kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:213) at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:87) at kafka.server.KafkaApis.handle(KafkaApis.scala:70) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:662) /disk1/test/kafka-data is the value under the key log.dir in config/server.properties. The log of ZooKeeper says: 2013-11-01 15:27:20,050 [myid:] - INFO [ProcessThread(sid:0 cport:-1)::PrepRequestProcessor@627] - Got user-level KeeperException when processing sessionid:0x142128b8bf10003 type:setData cxid:0x22 zxid:0x2a txntype:-1 reqpath:n/a Error Path:/consumers/console-consumer-18048/offsets/ead_click/0 Error:KeeperErrorCode = NoNode for /consumers/console-consumer-18048/offsets/ead_click/0 2013-11-01 15:27:20,052 [myid:] - INFO [ProcessThread(sid:0 cport:-1)::PrepRequestProcessor@627] - Got user-level KeeperException when processing sessionid:0x142128b8bf10003 type:create cxid:0x23 zxid:0x2b txntype:-1 reqpath:n/a Error Path:/consumers/console-consumer-18048/offsets Error:KeeperErrorCode = NoNode for /consumers/console-consumer-18048/offsets Can anyone tell me why and how to solve it? Thank you very much! -- Best Regards, Henry Ma