Ok, finally, I find the problem, in the /etc/hosts, I set the hostname as 127.0.0.1 ip-10-0-0-108 It seems cannot use its internal hostname, and caused the connection to close. Thanks guys, if I encounter any further issue, I will let you know.
2013/10/11 Jiang Jacky <jiang0...@gmail.com> > Hi, Since I changed the port to 9093, it seems works, I can produce > message, and another node can consume that. > But there is still exception > > [2013-10-11 05:32:42,706] ERROR [KafkaApi-2] Error while fetching metadata > for partition [my-replicated-topic,0] (kafka.server.KafkaApis) > kafka.common.ReplicaNotAvailableException > at > kafka.server.KafkaApis$$anonfun$17$$anonfun$20.apply(KafkaApis.scala:471) > at > kafka.server.KafkaApis$$anonfun$17$$anonfun$20.apply(KafkaApis.scala:456) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) > at scala.collection.immutable.List.foreach(List.scala:45) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > at scala.collection.immutable.List.map(List.scala:45) > at kafka.server.KafkaApis$$anonfun$17.apply(KafkaApis.scala:456) > at kafka.server.KafkaApis$$anonfun$17.apply(KafkaApis.scala:452) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:81) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > at scala.collection.immutable.Set$Set1.map(Set.scala:68) > at > kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:452) > at kafka.server.KafkaApis.handle(KafkaApis.scala:69) > at > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) > at java.lang.Thread.run(Thread.java:724) > [2013-10-11 05:32:42,710] INFO Closing socket connection to /10.0.0.42. > (kafka.network.Processor) > > > 2013/10/11 Jiang Jacky <jiang0...@gmail.com> > >> no, it is not reproducible. Thank you. >> >> >> 2013/10/11 Jun Rao <jun...@gmail.com> >> >>> I meant the broker crash. Is that reproducible? >>> >>> Thanks, >>> >>> Jun >>> >>> >>> On Thu, Oct 10, 2013 at 10:10 PM, Jiang Jacky <jiang0...@gmail.com> >>> wrote: >>> >>> > Yes, everytime, I send message, it throws that error. >>> > >>> > >>> > 2013/10/11 Jun Rao <jun...@gmail.com> >>> > >>> > > Is that reproducible? >>> > > >>> > > Thanks, >>> > > >>> > > Jun >>> > > >>> > > >>> > > On Thu, Oct 10, 2013 at 9:54 PM, Jiang Jacky <jiang0...@gmail.com> >>> > wrote: >>> > > >>> > > > Yes, it just says "INFO Reconnect due to socket error" >>> > > > But why and how come it comes? my zookeeper and storm have no any >>> > problem >>> > > > to collaborate each other. >>> > > > >>> > > > >>> > > > 2013/10/11 Jun Rao <jun...@gmail.com> >>> > > > >>> > > > > The log you posted for the second broker didn't say why it >>> crashed. >>> > Is >>> > > > that >>> > > > > all you got? >>> > > > > >>> > > > > Thanks, >>> > > > > >>> > > > > Jun >>> > > > > >>> > > > > >>> > > > > On Thu, Oct 10, 2013 at 9:22 PM, Jiang Jacky < >>> jiang0...@gmail.com> >>> > > > wrote: >>> > > > > >>> > > > > > *Hi, Guys,* >>> > > > > > *I am currently running into the kafka server issue. * >>> > > > > > *I have a 5 nodes cluster and zookeeper running without any >>> > problem. >>> > > > > when I >>> > > > > > manually boot each node by using* "*JMX_PORT=9997 >>> > > > > bin/kafka-server-start.sh >>> > > > > > config/server-x.properties &*" command. >>> > > > > > >>> > > > > > *The scenario is:* >>> > > > > > *Then, first node, it can be booted.* >>> > > > > > *Once I boot the second node, it is crashed, the error is >>> below:* >>> > > > > > >>> > > > > > [2013-10-11 04:02:17,200] INFO [Replica Manager on Broker 0]: >>> > > Handling >>> > > > > > LeaderAndIsr request >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> Name:LeaderAndIsrRequest;Version:0;Controller:0;ControllerEpoch:30416;CorrelationId:5;ClientId:id_0-host_null-port_9092;PartitionState:(test-kafka,0) >>> > > > > > -> >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:90,ControllerEpoch:30411),ReplicationFactor:1),AllReplicas:1);Leaders:id:1,host:localhost,port:9092 >>> > > > > > (kafka.server.ReplicaManager) >>> > > > > > [2013-10-11 04:02:17,204] WARN No previously checkpointed >>> > > highwatermark >>> > > > > > value found for topic test-kafka partition 0. Returning 0 as >>> the >>> > > > > > highwatermark (kafka.server.HighwaterMarkCheckpoint) >>> > > > > > [2013-10-11 04:02:17,205] INFO [ReplicaFetcherManager on >>> broker 0] >>> > > > > Removing >>> > > > > > fetcher for partition [test-kafka,0] >>> > > > (kafka.server.ReplicaFetcherManager) >>> > > > > > [2013-10-11 04:02:17,214] INFO [Kafka Log on Broker 0], >>> Truncated >>> > log >>> > > > > > segment /tmp/kafka-logs/test-kafka-0/00000000000000000000.log >>> to >>> > > target >>> > > > > > offset 0 (kafka.log.Log) >>> > > > > > [2013-10-11 04:02:17,235] INFO [ReplicaFetcherManager on >>> broker 0] >>> > > > Adding >>> > > > > > fetcher for partition [test-kafka,0], initOffset 0 to broker 1 >>> with >>> > > > > > fetcherId 0 (kafka.server.ReplicaFetcherManager) >>> > > > > > [2013-10-11 04:02:17,236] INFO [Replica Manager on Broker 0]: >>> > Handled >>> > > > > > leader and isr request >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> Name:LeaderAndIsrRequest;Version:0;Controller:0;ControllerEpoch:30416;CorrelationId:5;ClientId:id_0-host_null-port_9092;PartitionState:(test-kafka,0) >>> > > > > > -> >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:90,ControllerEpoch:30411),ReplicationFactor:1),AllReplicas:1);Leaders:id:1,host:localhost,port:9092 >>> > > > > > (kafka.server.ReplicaManager) >>> > > > > > [2013-10-11 04:02:17,240] INFO [ReplicaFetcherThread-0-1], >>> Starting >>> > > > > > (kafka.server.ReplicaFetcherThread) >>> > > > > > [2013-10-11 04:02:17,266] INFO [Replica Manager on Broker 0]: >>> > > Handling >>> > > > > > LeaderAndIsr request >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> Name:LeaderAndIsrRequest;Version:0;Controller:0;ControllerEpoch:30416;CorrelationId:6;ClientId:id_0-host_null-port_9092;PartitionState:(test-kafka,0) >>> > > > > > -> >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:91,ControllerEpoch:30416),ReplicationFactor:1),AllReplicas:1);Leaders:id:1,host:localhost,port:9092 >>> > > > > > (kafka.server.ReplicaManager) >>> > > > > > [2013-10-11 04:02:17,267] INFO [ReplicaFetcherManager on >>> broker 0] >>> > > > > Removing >>> > > > > > fetcher for partition [test-kafka,0] >>> > > > (kafka.server.ReplicaFetcherManager) >>> > > > > > [2013-10-11 04:02:17,268] INFO [Kafka Log on Broker 0], >>> Truncated >>> > log >>> > > > > > segment /tmp/kafka-logs/test-kafka-0/00000000000000000000.log >>> to >>> > > target >>> > > > > > offset 0 (kafka.log.Log) >>> > > > > > [2013-10-11 04:02:17,268] INFO [ReplicaFetcherManager on >>> broker 0] >>> > > > Adding >>> > > > > > fetcher for partition [test-kafka,0], initOffset 0 to broker 1 >>> with >>> > > > > > fetcherId 0 (kafka.server.ReplicaFetcherManager) >>> > > > > > [2013-10-11 04:02:17,269] INFO [Replica Manager on Broker 0]: >>> > Handled >>> > > > > > leader and isr request >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> Name:LeaderAndIsrRequest;Version:0;Controller:0;ControllerEpoch:30416;CorrelationId:6;ClientId:id_0-host_null-port_9092;PartitionState:(test-kafka,0) >>> > > > > > -> >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:91,ControllerEpoch:30416),ReplicationFactor:1),AllReplicas:1);Leaders:id:1,host:localhost,port:9092 >>> > > > > > (kafka.server.ReplicaManager) >>> > > > > > [2013-10-11 04:02:17,269] ERROR [Kafka Request Handler 0 on >>> Broker >>> > > 0], >>> > > > > > Exception when handling request >>> (kafka.server.KafkaRequestHandler) >>> > > > > > [2013-10-11 04:02:47,284] INFO Reconnect due to socket error: >>> > > > > > (kafka.consumer.SimpleConsumer) >>> > > > > > java.net.SocketTimeoutException >>> > > > > > at >>> > > > > > >>> > > >>> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229) >>> > > > > > at >>> > > > > sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) >>> > > > > > at >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) >>> > > > > > at kafka.utils.Utils$.read(Utils.scala:394) >>> > > > > > at >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) >>> > > > > > at >>> > > > > > >>> kafka.network.Receive$class.readCompletely(Transmission.scala:56) >>> > > > > > at >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) >>> > > > > > at >>> > > > > kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) >>> > > > > > at >>> > > > > > >>> > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73) >>> > > > > > at >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) >>> > > > > > at >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110) >>> > > > > > at >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110) >>> > > > > > at >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110) >>> > > > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >>> > > > > > at >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109) >>> > > > > > at >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109) >>> > > > > > at >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109) >>> > > > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >>> > > > > > at >>> > > > kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108) >>> > > > > > at >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) >>> > > > > > at >>> > > > > > >>> > > > >>> > >>> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) >>> > > > > > at >>> > > > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) >>> > > > > > [2013-10-11 04:02:47,292] ERROR [Kafka Request Handler 1 on >>> Broker >>> > > 0], >>> > > > > > Exception when handling request >>> (kafka.server.KafkaRequestHandler) >>> > > > > > >>> > > > > > *Then I boot the third node until the last one, everything is >>> good, >>> > > > > except >>> > > > > > of second node.* >>> > > > > > * >>> > > > > > * >>> > > > > > *After that, I tried to stop server one by one, I first >>> stopped the >>> > > > > broken >>> > > > > > node, then there is one of health node will show the same >>> error as >>> > > the >>> > > > > > broken node, it is random. I stopped that broken node again, >>> then >>> > > there >>> > > > > > will be another random node will be broken with the same >>> error.* >>> > > > > > * >>> > > > > > * >>> > > > > > * >>> > > > > > * >>> > > > > > *When I tried to produce message, it gives me the below >>> errors:* >>> > > > > > >>> > > > > > >>> > > > > > [2013-10-11 04:13:12,876] INFO Fetching metadata from broker >>> > > > > > id:0,host:localhost,port:9092 with correlation id 15 for 1 >>> topic(s) >>> > > > > > Set(my-replicated-topic) (kafka.client.ClientUtils$) >>> > > > > > [2013-10-11 04:13:12,876] INFO Connected to localhost:9092 for >>> > > > producing >>> > > > > > (kafka.producer.SyncProducer) >>> > > > > > [2013-10-11 04:13:12,886] INFO Disconnecting from >>> localhost:9092 >>> > > > > > (kafka.producer.SyncProducer) >>> > > > > > [2013-10-11 04:13:12,886] INFO Closing socket connection to / >>> > > 127.0.0.1 >>> > > > . >>> > > > > > (kafka.network.Processor) >>> > > > > > [2013-10-11 04:13:12,887] WARN Error while fetching metadata >>> > > > > > [{TopicMetadata for topic my-replicated-topic -> >>> > > > > > No partition metadata for topic my-replicated-topic due to >>> > > > > > kafka.common.LeaderNotAvailableException}] for topic >>> > > > > [my-replicated-topic]: >>> > > > > > class kafka.common.LeaderNotAvailableException >>> > > > > > (kafka.producer.BrokerPartitionInfo) >>> > > > > > [2013-10-11 04:13:12,887] ERROR Failed to collate messages by >>> > topic, >>> > > > > > partition due to: Failed to fetch topic metadata for topic: >>> > > > > > my-replicated-topic (kafka.producer.async.DefaultEventHandler) >>> > > > > > [2013-10-11 04:13:12,887] INFO Back off for 100 ms before >>> retrying >>> > > > send. >>> > > > > > Remaining retries = 0 >>> (kafka.producer.async.DefaultEventHandler) >>> > > > > > [2013-10-11 04:13:12,988] INFO Fetching metadata from broker >>> > > > > > id:0,host:localhost,port:9092 with correlation id 16 for 1 >>> topic(s) >>> > > > > > Set(my-replicated-topic) (kafka.client.ClientUtils$) >>> > > > > > [2013-10-11 04:13:12,989] INFO Connected to localhost:9092 for >>> > > > producing >>> > > > > > (kafka.producer.SyncProducer) >>> > > > > > [2013-10-11 04:13:12,999] INFO Disconnecting from >>> localhost:9092 >>> > > > > > (kafka.producer.SyncProducer) >>> > > > > > [2013-10-11 04:13:12,999] INFO Closing socket connection to / >>> > > 127.0.0.1 >>> > > > . >>> > > > > > (kafka.network.Processor) >>> > > > > > [2013-10-11 04:13:13,000] WARN Error while fetching metadata >>> > > > > > [{TopicMetadata for topic my-replicated-topic -> >>> > > > > > No partition metadata for topic my-replicated-topic due to >>> > > > > > kafka.common.LeaderNotAvailableException}] for topic >>> > > > > [my-replicated-topic]: >>> > > > > > class kafka.common.LeaderNotAvailableException >>> > > > > > (kafka.producer.BrokerPartitionInfo) >>> > > > > > [2013-10-11 04:13:13,000] ERROR Failed to send requests for >>> topics >>> > > > > > my-replicated-topic with correlation ids in [9,16] >>> > > > > > (kafka.producer.async.DefaultEventHandler) >>> > > > > > [2013-10-11 04:13:13,001] ERROR Error in handling batch of 1 >>> events >>> > > > > > (kafka.producer.async.ProducerSendThread) >>> > > > > > kafka.common.FailedToSendMessageException: Failed to send >>> messages >>> > > > after >>> > > > > 3 >>> > > > > > tries. >>> > > > > > at >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) >>> > > > > > at >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) >>> > > > > > at >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) >>> > > > > > at >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) >>> > > > > > at >>> > > scala.collection.immutable.Stream.foreach(Stream.scala:254) >>> > > > > > at >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) >>> > > > > > at >>> > > > > > >>> > > > >>> > >>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) >>> > > > > > >>> > > > > > *I configured everything according to the documents.* >>> > > > > > *I copied the setting from one of my nodes* >>> > > > > > >>> > > > > > broker.id=3 >>> > > > > > >>> > > > > > ############################# Socket Server Settings >>> > > > > > ############################# >>> > > > > > >>> > > > > > port=9092 >>> > > > > > >>> > > > > > >>> > > > > > num.network.threads=2 >>> > > > > > >>> > > > > > num.io.threads=2 >>> > > > > > >>> > > > > > socket.send.buffer.bytes=1048576 >>> > > > > > >>> > > > > > socket.receive.buffer.bytes=1048576 >>> > > > > > >>> > > > > > socket.request.max.bytes=104857600 >>> > > > > > >>> > > > > > log.dir=/tmp/kafka-logs >>> > > > > > >>> > > > > > num.partitions=1 >>> > > > > > >>> > > > > > log.flush.interval.messages=10000 >>> > > > > > >>> > > > > > log.flush.interval.ms=1000 >>> > > > > > >>> > > > > > log.retention.hours=168 >>> > > > > > >>> > > > > > log.segment.bytes=536870912 >>> > > > > > >>> > > > > > >>> > > > > > log.cleanup.interval.mins=1 >>> > > > > > zookeeper.connect=localhost:2181 >>> > > > > > >>> > > > > > >>> > > > > > zookeeper.connection.timeout.ms=1000000 >>> > > > > > >>> > > > > > >>> > > > > > kafka.metrics.polling.interval.secs=5 >>> > > > > > kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter >>> > > > > > kafka.csv.metrics.dir=/tmp/kafka_metrics >>> > > > > > >>> > > > > > kafka.csv.metrics.reporter.enabled=false >>> > > > > > >>> > > > > > *Can some one tell me what happened? Appreciate!* >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> >> >> >