Hi,

I’m hitting a strange problem using 0.8.2-beta and just a single kafka broker 
on CentOS 6.5. 

A percentage of my topic create attempts are randomly failing and leaving the 
new topic in a state in which it can not be used due to “partition doesn’t 
exist” errors as seen in server.log below.

In controller.log, it looks like the controller fails to send either the 
"become-leader LeaderAndIsr request” or the "UpdateMetadata request” to the 
broker (which in fact is the same Kafka instance), due to a socket read failing 
(for unknown reason).

My questions:

(1) Is the bad topic state a result of the message not making it from the 
controller to the broker?

(2) Any idea why the socket read might randomly fail? It can’t be a network 
issue since we’re running a single instance.

(3) Shouldn’t the controller try to resend the message?



controller.log

[2015-01-06 21:31:10,304] INFO [Controller 0]: New topic creation callback for 
[09b1ebac-7036-49fc-aa61-7852808ca241,0] (kafka.controller.KafkaController)

[2015-01-06 21:31:10,304] INFO [Controller 0]: New partition creation callback 
for [09b1ebac-7036-49fc-aa61-7852808ca241,0] (kafka.controller.KafkaController)

[2015-01-06 21:31:10,304] INFO [Partition state machine on Controller 0]: 
Invoking state change to NewPartition for partitions 
[09b1ebac-7036-49fc-aa61-7852808ca241,0] 
(kafka.controller.PartitionStateMachine)

[2015-01-06 21:31:10,308] INFO [Replica state machine on controller 0]: 
Invoking state change to NewReplica for replicas 
[Topic=09b1ebac-7036-49fc-aa61-7852808ca241,Partition=0,Replica=0] 
(kafka.controller.ReplicaStateMachine)

[2015-01-06 21:31:10,308] INFO [Partition state machine on Controller 0]: 
Invoking state change to OnlinePartition for partitions 
[09b1ebac-7036-49fc-aa61-7852808ca241,0] 
(kafka.controller.PartitionStateMachine)

[2015-01-06 21:31:10,308] DEBUG [Partition state machine on Controller 0]: Live 
assigned replicas for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] are: 
[List(0)] (kafka.controller.PartitionStateMachine)

[2015-01-06 21:31:10,309] DEBUG [Partition state machine on Controller 0]: 
Initializing leader and isr for partition 
[09b1ebac-7036-49fc-aa61-7852808ca241,0] to 
(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) 
(kafka.controller.PartitionStateMachine)

[2015-01-06 21:31:10,501] INFO [Replica state machine on controller 0]: 
Invoking state change to OnlineReplica for replicas 
[Topic=09b1ebac-7036-49fc-aa61-7852808ca241,Partition=0,Replica=0] 
(kafka.controller.ReplicaStateMachine)

[2015-01-06 21:31:10,502] WARN [Controller-0-to-broker-0-send-thread], 
Controller 0 fails to send a request to broker id:0,host:dev.unifina,port:9092 
(kafka.controller.RequestSendThread)
java.io.EOFException: Received -1 when reading from channel, socket has likely 
been closed.
        at kafka.utils.Utils$.read(Utils.scala:381)
        at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
        at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
        at 
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108)
        at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

[2015-01-06 21:31:10,505] ERROR [Controller-0-to-broker-0-send-thread], 
Controller 0 epoch 2 failed to send request 
Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:2;CorrelationId:16;ClientId:id_0-host_dev.unifina-port_9092;AliveBrokers:id:0,host:dev.unifina,port:9092;PartitionState:[09b1ebac-7036-49fc-aa61-7852808ca241,0]
 -> 
(LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0)
 to broker id:0,host:dev.unifina,port:9092. Reconnecting to broker. 
(kafka.controller.RequestSendThread)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:97)
        at 
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
        at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)




state-change.log

[2015-01-06 21:31:10,306] TRACE Controller 0 epoch 2 changed partition 
[09b1ebac-7036-49fc-aa61-7852808ca241,0] state from NonExistentPartition to 
NewPartition with assigned replicas 0 (state.change.logger)

[2015-01-06 21:31:10,308] TRACE Controller 0 epoch 2 changed state of replica 0 
for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] from NonExistentReplica 
to NewReplica (state.change.logger)

[2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 changed partition 
[09b1ebac-7036-49fc-aa61-7852808ca241,0] from NewPartition to OnlinePartition 
with leader 0 (state.change.logger)

[2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 sending become-leader 
LeaderAndIsr request (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) with 
correlationId 16 to broker 0 for partition 
[09b1ebac-7036-49fc-aa61-7852808ca241,0] (state.change.logger)

[2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 sending UpdateMetadata 
request (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) with correlationId 16 
to broker 0 for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] 
(state.change.logger)

[2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 changed state of replica 0 
for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] from NewReplica to 
OnlineReplica (state.change.logger)




server.log

[2015-01-06 22:01:48,137] WARN [KafkaApi-0] Offset request with correlation id 
0 from client 
console-consumer-34042-ConsumerFetcherThread-console-consumer-34042_dev.unifina-1420581705389-b3ba0eb6-0-0
 on partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] failed due to Partition 
[09b1ebac-7036-49fc-aa61-7852808ca241,0] doesn't exist on 0 
(kafka.server.KafkaApis)

[2015-01-06 22:01:48,140] INFO Closing socket connection to /192.168.10.21. 
(kafka.network.Processor)

... etc etc


describe topic:

$ bin/kafka-topics.sh --zookeeper dev.unifina:2181 --describe --topic 
09b1ebac-7036-49fc-aa61-7852808ca241
Topic:09b1ebac-7036-49fc-aa61-7852808ca241      PartitionCount:1        
ReplicationFactor:1     Configs:
        Topic: 09b1ebac-7036-49fc-aa61-7852808ca241     Partition: 0    Leader: 
0       Replicas: 0     Isr: 0


attempt to consume from the topic using the console consumer:

[2015-01-06 22:01:47,928] WARN 
[console-consumer-34042_dev.unifina-1420581705389-b3ba0eb6-leader-finder-thread],
 Failed to add leader for partitions [09b1ebac-7036-49fc-aa61-7852808ca241,0]; 
will retry (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
kafka.common.UnknownTopicOrPartitionException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
        at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
        at java.lang.Class.newInstance0(Class.java:372)
        at java.lang.Class.newInstance(Class.java:325)
        at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
        at 
kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:168)
        at 
kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60)
        at 
kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:180)
        at 
kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:175)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at 
kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:175)
        at 
kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:86)
        at 
kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:76)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at 
kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:76)
        at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)


Thanks for your help!

Best regards
Henri

Reply via email to