Hi, I’m hitting a strange problem using 0.8.2-beta and just a single kafka broker on CentOS 6.5.
A percentage of my topic create attempts are randomly failing and leaving the new topic in a state in which it can not be used due to “partition doesn’t exist” errors as seen in server.log below. In controller.log, it looks like the controller fails to send either the "become-leader LeaderAndIsr request” or the "UpdateMetadata request” to the broker (which in fact is the same Kafka instance), due to a socket read failing (for unknown reason). My questions: (1) Is the bad topic state a result of the message not making it from the controller to the broker? (2) Any idea why the socket read might randomly fail? It can’t be a network issue since we’re running a single instance. (3) Shouldn’t the controller try to resend the message? controller.log [2015-01-06 21:31:10,304] INFO [Controller 0]: New topic creation callback for [09b1ebac-7036-49fc-aa61-7852808ca241,0] (kafka.controller.KafkaController) [2015-01-06 21:31:10,304] INFO [Controller 0]: New partition creation callback for [09b1ebac-7036-49fc-aa61-7852808ca241,0] (kafka.controller.KafkaController) [2015-01-06 21:31:10,304] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [09b1ebac-7036-49fc-aa61-7852808ca241,0] (kafka.controller.PartitionStateMachine) [2015-01-06 21:31:10,308] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=09b1ebac-7036-49fc-aa61-7852808ca241,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2015-01-06 21:31:10,308] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [09b1ebac-7036-49fc-aa61-7852808ca241,0] (kafka.controller.PartitionStateMachine) [2015-01-06 21:31:10,308] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2015-01-06 21:31:10,309] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) (kafka.controller.PartitionStateMachine) [2015-01-06 21:31:10,501] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=09b1ebac-7036-49fc-aa61-7852808ca241,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2015-01-06 21:31:10,502] WARN [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a request to broker id:0,host:dev.unifina,port:9092 (kafka.controller.RequestSendThread) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2015-01-06 21:31:10,505] ERROR [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send request Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:2;CorrelationId:16;ClientId:id_0-host_dev.unifina-port_9092;AliveBrokers:id:0,host:dev.unifina,port:9092;PartitionState:[09b1ebac-7036-49fc-aa61-7852808ca241,0] -> (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0) to broker id:0,host:dev.unifina,port:9092. Reconnecting to broker. (kafka.controller.RequestSendThread) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:97) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) state-change.log [2015-01-06 21:31:10,306] TRACE Controller 0 epoch 2 changed partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] state from NonExistentPartition to NewPartition with assigned replicas 0 (state.change.logger) [2015-01-06 21:31:10,308] TRACE Controller 0 epoch 2 changed state of replica 0 for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] from NonExistentReplica to NewReplica (state.change.logger) [2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 changed partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] from NewPartition to OnlinePartition with leader 0 (state.change.logger) [2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 sending become-leader LeaderAndIsr request (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) with correlationId 16 to broker 0 for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] (state.change.logger) [2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 sending UpdateMetadata request (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) with correlationId 16 to broker 0 for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] (state.change.logger) [2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 changed state of replica 0 for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] from NewReplica to OnlineReplica (state.change.logger) server.log [2015-01-06 22:01:48,137] WARN [KafkaApi-0] Offset request with correlation id 0 from client console-consumer-34042-ConsumerFetcherThread-console-consumer-34042_dev.unifina-1420581705389-b3ba0eb6-0-0 on partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] failed due to Partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] doesn't exist on 0 (kafka.server.KafkaApis) [2015-01-06 22:01:48,140] INFO Closing socket connection to /192.168.10.21. (kafka.network.Processor) ... etc etc describe topic: $ bin/kafka-topics.sh --zookeeper dev.unifina:2181 --describe --topic 09b1ebac-7036-49fc-aa61-7852808ca241 Topic:09b1ebac-7036-49fc-aa61-7852808ca241 PartitionCount:1 ReplicationFactor:1 Configs: Topic: 09b1ebac-7036-49fc-aa61-7852808ca241 Partition: 0 Leader: 0 Replicas: 0 Isr: 0 attempt to consume from the topic using the console consumer: [2015-01-06 22:01:47,928] WARN [console-consumer-34042_dev.unifina-1420581705389-b3ba0eb6-leader-finder-thread], Failed to add leader for partitions [09b1ebac-7036-49fc-aa61-7852808ca241,0]; will retry (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) kafka.common.UnknownTopicOrPartitionException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:525) at java.lang.Class.newInstance0(Class.java:372) at java.lang.Class.newInstance(Class.java:325) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86) at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:168) at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60) at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:180) at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:175) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:175) at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:86) at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:76) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:76) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) Thanks for your help! Best regards Henri