Yes, typically, a bad topic state is the result of communication issues
between the controller and the broker. You may be hitting KAFKA-1738. Could
you try 0.8.2.0 RC1 or the latest 0.8.2 branch? The issue has been fixed
there.

Thanks,

Jun

On Tue, Jan 6, 2015 at 2:17 PM, Henri Pihkala <henri.pihk...@streamr.com>
wrote:

> Hi,
>
> I’m hitting a strange problem using 0.8.2-beta and just a single kafka
> broker on CentOS 6.5.
>
> A percentage of my topic create attempts are randomly failing and leaving
> the new topic in a state in which it can not be used due to “partition
> doesn’t exist” errors as seen in server.log below.
>
> In controller.log, it looks like the controller fails to send either the
> "become-leader LeaderAndIsr request” or the "UpdateMetadata request” to the
> broker (which in fact is the same Kafka instance), due to a socket read
> failing (for unknown reason).
>
> My questions:
>
> (1) Is the bad topic state a result of the message not making it from the
> controller to the broker?
>
> (2) Any idea why the socket read might randomly fail? It can’t be a
> network issue since we’re running a single instance.
>
> (3) Shouldn’t the controller try to resend the message?
>
>
>
> controller.log
>
> [2015-01-06 21:31:10,304] INFO [Controller 0]: New topic creation callback
> for [09b1ebac-7036-49fc-aa61-7852808ca241,0]
> (kafka.controller.KafkaController)
>
> [2015-01-06 21:31:10,304] INFO [Controller 0]: New partition creation
> callback for [09b1ebac-7036-49fc-aa61-7852808ca241,0]
> (kafka.controller.KafkaController)
>
> [2015-01-06 21:31:10,304] INFO [Partition state machine on Controller 0]:
> Invoking state change to NewPartition for partitions
> [09b1ebac-7036-49fc-aa61-7852808ca241,0]
> (kafka.controller.PartitionStateMachine)
>
> [2015-01-06 21:31:10,308] INFO [Replica state machine on controller 0]:
> Invoking state change to NewReplica for replicas
> [Topic=09b1ebac-7036-49fc-aa61-7852808ca241,Partition=0,Replica=0]
> (kafka.controller.ReplicaStateMachine)
>
> [2015-01-06 21:31:10,308] INFO [Partition state machine on Controller 0]:
> Invoking state change to OnlinePartition for partitions
> [09b1ebac-7036-49fc-aa61-7852808ca241,0]
> (kafka.controller.PartitionStateMachine)
>
> [2015-01-06 21:31:10,308] DEBUG [Partition state machine on Controller 0]:
> Live assigned replicas for partition
> [09b1ebac-7036-49fc-aa61-7852808ca241,0] are: [List(0)]
> (kafka.controller.PartitionStateMachine)
>
> [2015-01-06 21:31:10,309] DEBUG [Partition state machine on Controller 0]:
> Initializing leader and isr for partition
> [09b1ebac-7036-49fc-aa61-7852808ca241,0] to
> (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2)
> (kafka.controller.PartitionStateMachine)
>
> [2015-01-06 21:31:10,501] INFO [Replica state machine on controller 0]:
> Invoking state change to OnlineReplica for replicas
> [Topic=09b1ebac-7036-49fc-aa61-7852808ca241,Partition=0,Replica=0]
> (kafka.controller.ReplicaStateMachine)
>
> [2015-01-06 21:31:10,502] WARN [Controller-0-to-broker-0-send-thread],
> Controller 0 fails to send a request to broker
> id:0,host:dev.unifina,port:9092 (kafka.controller.RequestSendThread)
> java.io.EOFException: Received -1 when reading from channel, socket has
> likely been closed.
>         at kafka.utils.Utils$.read(Utils.scala:381)
>         at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>         at
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>         at
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>         at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108)
>         at
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>
> [2015-01-06 21:31:10,505] ERROR [Controller-0-to-broker-0-send-thread],
> Controller 0 epoch 2 failed to send request
> Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:2;CorrelationId:16;ClientId:id_0-host_dev.unifina-port_9092;AliveBrokers:id:0,host:dev.unifina,port:9092;PartitionState:[09b1ebac-7036-49fc-aa61-7852808ca241,0]
> ->
> (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0)
> to broker id:0,host:dev.unifina,port:9092. Reconnecting to broker.
> (kafka.controller.RequestSendThread)
> java.nio.channels.ClosedChannelException
>         at kafka.network.BlockingChannel.send(BlockingChannel.scala:97)
>         at
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
>         at
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>
>
>
>
> state-change.log
>
> [2015-01-06 21:31:10,306] TRACE Controller 0 epoch 2 changed partition
> [09b1ebac-7036-49fc-aa61-7852808ca241,0] state from NonExistentPartition to
> NewPartition with assigned replicas 0 (state.change.logger)
>
> [2015-01-06 21:31:10,308] TRACE Controller 0 epoch 2 changed state of
> replica 0 for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] from
> NonExistentReplica to NewReplica (state.change.logger)
>
> [2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 changed partition
> [09b1ebac-7036-49fc-aa61-7852808ca241,0] from NewPartition to
> OnlinePartition with leader 0 (state.change.logger)
>
> [2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 sending become-leader
> LeaderAndIsr request (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) with
> correlationId 16 to broker 0 for partition
> [09b1ebac-7036-49fc-aa61-7852808ca241,0] (state.change.logger)
>
> [2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 sending
> UpdateMetadata request (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2)
> with correlationId 16 to broker 0 for partition
> [09b1ebac-7036-49fc-aa61-7852808ca241,0] (state.change.logger)
>
> [2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 changed state of
> replica 0 for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] from
> NewReplica to OnlineReplica (state.change.logger)
>
>
>
>
> server.log
>
> [2015-01-06 22:01:48,137] WARN [KafkaApi-0] Offset request with
> correlation id 0 from client
> console-consumer-34042-ConsumerFetcherThread-console-consumer-34042_dev.unifina-1420581705389-b3ba0eb6-0-0
> on partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] failed due to
> Partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] doesn't exist on 0
> (kafka.server.KafkaApis)
>
> [2015-01-06 22:01:48,140] INFO Closing socket connection to /192.168.10.21.
> (kafka.network.Processor)
>
> ... etc etc
>
>
> describe topic:
>
> $ bin/kafka-topics.sh --zookeeper dev.unifina:2181 --describe --topic
> 09b1ebac-7036-49fc-aa61-7852808ca241
> Topic:09b1ebac-7036-49fc-aa61-7852808ca241      PartitionCount:1
> ReplicationFactor:1     Configs:
>         Topic: 09b1ebac-7036-49fc-aa61-7852808ca241     Partition: 0
> Leader: 0       Replicas: 0     Isr: 0
>
>
> attempt to consume from the topic using the console consumer:
>
> [2015-01-06 22:01:47,928] WARN
> [console-consumer-34042_dev.unifina-1420581705389-b3ba0eb6-leader-finder-thread],
> Failed to add leader for partitions
> [09b1ebac-7036-49fc-aa61-7852808ca241,0]; will retry
> (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
> kafka.common.UnknownTopicOrPartitionException
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
>         at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>         at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
>         at java.lang.Class.newInstance0(Class.java:372)
>         at java.lang.Class.newInstance(Class.java:325)
>         at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
>         at
> kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:168)
>         at
> kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60)
>         at
> kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:180)
>         at
> kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:175)
>         at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
>         at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>         at
> kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:175)
>         at
> kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:86)
>         at
> kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:76)
>         at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
>         at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>         at
> kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:76)
>         at
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>
>
> Thanks for your help!
>
> Best regards
> Henri
>
>

Reply via email to