Yes, typically, a bad topic state is the result of communication issues between the controller and the broker. You may be hitting KAFKA-1738. Could you try 0.8.2.0 RC1 or the latest 0.8.2 branch? The issue has been fixed there.
Thanks, Jun On Tue, Jan 6, 2015 at 2:17 PM, Henri Pihkala <henri.pihk...@streamr.com> wrote: > Hi, > > I’m hitting a strange problem using 0.8.2-beta and just a single kafka > broker on CentOS 6.5. > > A percentage of my topic create attempts are randomly failing and leaving > the new topic in a state in which it can not be used due to “partition > doesn’t exist” errors as seen in server.log below. > > In controller.log, it looks like the controller fails to send either the > "become-leader LeaderAndIsr request” or the "UpdateMetadata request” to the > broker (which in fact is the same Kafka instance), due to a socket read > failing (for unknown reason). > > My questions: > > (1) Is the bad topic state a result of the message not making it from the > controller to the broker? > > (2) Any idea why the socket read might randomly fail? It can’t be a > network issue since we’re running a single instance. > > (3) Shouldn’t the controller try to resend the message? > > > > controller.log > > [2015-01-06 21:31:10,304] INFO [Controller 0]: New topic creation callback > for [09b1ebac-7036-49fc-aa61-7852808ca241,0] > (kafka.controller.KafkaController) > > [2015-01-06 21:31:10,304] INFO [Controller 0]: New partition creation > callback for [09b1ebac-7036-49fc-aa61-7852808ca241,0] > (kafka.controller.KafkaController) > > [2015-01-06 21:31:10,304] INFO [Partition state machine on Controller 0]: > Invoking state change to NewPartition for partitions > [09b1ebac-7036-49fc-aa61-7852808ca241,0] > (kafka.controller.PartitionStateMachine) > > [2015-01-06 21:31:10,308] INFO [Replica state machine on controller 0]: > Invoking state change to NewReplica for replicas > [Topic=09b1ebac-7036-49fc-aa61-7852808ca241,Partition=0,Replica=0] > (kafka.controller.ReplicaStateMachine) > > [2015-01-06 21:31:10,308] INFO [Partition state machine on Controller 0]: > Invoking state change to OnlinePartition for partitions > [09b1ebac-7036-49fc-aa61-7852808ca241,0] > (kafka.controller.PartitionStateMachine) > > [2015-01-06 21:31:10,308] DEBUG [Partition state machine on Controller 0]: > Live assigned replicas for partition > [09b1ebac-7036-49fc-aa61-7852808ca241,0] are: [List(0)] > (kafka.controller.PartitionStateMachine) > > [2015-01-06 21:31:10,309] DEBUG [Partition state machine on Controller 0]: > Initializing leader and isr for partition > [09b1ebac-7036-49fc-aa61-7852808ca241,0] to > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) > (kafka.controller.PartitionStateMachine) > > [2015-01-06 21:31:10,501] INFO [Replica state machine on controller 0]: > Invoking state change to OnlineReplica for replicas > [Topic=09b1ebac-7036-49fc-aa61-7852808ca241,Partition=0,Replica=0] > (kafka.controller.ReplicaStateMachine) > > [2015-01-06 21:31:10,502] WARN [Controller-0-to-broker-0-send-thread], > Controller 0 fails to send a request to broker > id:0,host:dev.unifina,port:9092 (kafka.controller.RequestSendThread) > java.io.EOFException: Received -1 when reading from channel, socket has > likely been closed. > at kafka.utils.Utils$.read(Utils.scala:381) > at > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > at > kafka.network.Receive$class.readCompletely(Transmission.scala:56) > at > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) > at > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > > [2015-01-06 21:31:10,505] ERROR [Controller-0-to-broker-0-send-thread], > Controller 0 epoch 2 failed to send request > Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:2;CorrelationId:16;ClientId:id_0-host_dev.unifina-port_9092;AliveBrokers:id:0,host:dev.unifina,port:9092;PartitionState:[09b1ebac-7036-49fc-aa61-7852808ca241,0] > -> > (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0) > to broker id:0,host:dev.unifina,port:9092. Reconnecting to broker. > (kafka.controller.RequestSendThread) > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:97) > at > kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) > at > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > > > > > state-change.log > > [2015-01-06 21:31:10,306] TRACE Controller 0 epoch 2 changed partition > [09b1ebac-7036-49fc-aa61-7852808ca241,0] state from NonExistentPartition to > NewPartition with assigned replicas 0 (state.change.logger) > > [2015-01-06 21:31:10,308] TRACE Controller 0 epoch 2 changed state of > replica 0 for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] from > NonExistentReplica to NewReplica (state.change.logger) > > [2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 changed partition > [09b1ebac-7036-49fc-aa61-7852808ca241,0] from NewPartition to > OnlinePartition with leader 0 (state.change.logger) > > [2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 sending become-leader > LeaderAndIsr request (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) with > correlationId 16 to broker 0 for partition > [09b1ebac-7036-49fc-aa61-7852808ca241,0] (state.change.logger) > > [2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 sending > UpdateMetadata request (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) > with correlationId 16 to broker 0 for partition > [09b1ebac-7036-49fc-aa61-7852808ca241,0] (state.change.logger) > > [2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 changed state of > replica 0 for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] from > NewReplica to OnlineReplica (state.change.logger) > > > > > server.log > > [2015-01-06 22:01:48,137] WARN [KafkaApi-0] Offset request with > correlation id 0 from client > console-consumer-34042-ConsumerFetcherThread-console-consumer-34042_dev.unifina-1420581705389-b3ba0eb6-0-0 > on partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] failed due to > Partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] doesn't exist on 0 > (kafka.server.KafkaApis) > > [2015-01-06 22:01:48,140] INFO Closing socket connection to /192.168.10.21. > (kafka.network.Processor) > > ... etc etc > > > describe topic: > > $ bin/kafka-topics.sh --zookeeper dev.unifina:2181 --describe --topic > 09b1ebac-7036-49fc-aa61-7852808ca241 > Topic:09b1ebac-7036-49fc-aa61-7852808ca241 PartitionCount:1 > ReplicationFactor:1 Configs: > Topic: 09b1ebac-7036-49fc-aa61-7852808ca241 Partition: 0 > Leader: 0 Replicas: 0 Isr: 0 > > > attempt to consume from the topic using the console consumer: > > [2015-01-06 22:01:47,928] WARN > [console-consumer-34042_dev.unifina-1420581705389-b3ba0eb6-leader-finder-thread], > Failed to add leader for partitions > [09b1ebac-7036-49fc-aa61-7852808ca241,0]; will retry > (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) > kafka.common.UnknownTopicOrPartitionException > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:525) > at java.lang.Class.newInstance0(Class.java:372) > at java.lang.Class.newInstance(Class.java:325) > at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86) > at > kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:168) > at > kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60) > at > kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:180) > at > kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:175) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > at > kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:175) > at > kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:86) > at > kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:76) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > at > kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:76) > at > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > > > Thanks for your help! > > Best regards > Henri > >