Ok, finally, I find the problem, in the /etc/hosts, I set the hostname as
127.0.0.1 ip-10-0-0-108
It seems cannot use its internal hostname, and caused the connection to
close.
Thanks guys, if I encounter any further issue, I will let you know.


2013/10/11 Jiang Jacky <jiang0...@gmail.com>

> Hi, Since I changed the port to 9093, it seems works, I can produce
> message, and another node can consume that.
> But there is still exception
>
> [2013-10-11 05:32:42,706] ERROR [KafkaApi-2] Error while fetching metadata
> for partition [my-replicated-topic,0] (kafka.server.KafkaApis)
> kafka.common.ReplicaNotAvailableException
>         at
> kafka.server.KafkaApis$$anonfun$17$$anonfun$20.apply(KafkaApis.scala:471)
>         at
> kafka.server.KafkaApis$$anonfun$17$$anonfun$20.apply(KafkaApis.scala:456)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>         at
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
>         at scala.collection.immutable.List.foreach(List.scala:45)
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>         at scala.collection.immutable.List.map(List.scala:45)
>         at kafka.server.KafkaApis$$anonfun$17.apply(KafkaApis.scala:456)
>         at kafka.server.KafkaApis$$anonfun$17.apply(KafkaApis.scala:452)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>         at scala.collection.immutable.Set$Set1.foreach(Set.scala:81)
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>         at scala.collection.immutable.Set$Set1.map(Set.scala:68)
>         at
> kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:452)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:69)
>         at
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
>         at java.lang.Thread.run(Thread.java:724)
> [2013-10-11 05:32:42,710] INFO Closing socket connection to /10.0.0.42.
> (kafka.network.Processor)
>
>
> 2013/10/11 Jiang Jacky <jiang0...@gmail.com>
>
>> no, it is not reproducible. Thank you.
>>
>>
>> 2013/10/11 Jun Rao <jun...@gmail.com>
>>
>>> I meant the broker crash. Is that reproducible?
>>>
>>> Thanks,
>>>
>>> Jun
>>>
>>>
>>> On Thu, Oct 10, 2013 at 10:10 PM, Jiang Jacky <jiang0...@gmail.com>
>>> wrote:
>>>
>>> > Yes, everytime, I send message, it throws that error.
>>> >
>>> >
>>> > 2013/10/11 Jun Rao <jun...@gmail.com>
>>> >
>>> > > Is that reproducible?
>>> > >
>>> > > Thanks,
>>> > >
>>> > > Jun
>>> > >
>>> > >
>>> > > On Thu, Oct 10, 2013 at 9:54 PM, Jiang Jacky <jiang0...@gmail.com>
>>> > wrote:
>>> > >
>>> > > > Yes, it just says "INFO Reconnect due to socket error"
>>> > > > But why and how come it comes? my zookeeper and storm have no any
>>> > problem
>>> > > > to collaborate each other.
>>> > > >
>>> > > >
>>> > > > 2013/10/11 Jun Rao <jun...@gmail.com>
>>> > > >
>>> > > > > The log you posted for the second broker didn't say why it
>>> crashed.
>>> > Is
>>> > > > that
>>> > > > > all you got?
>>> > > > >
>>> > > > > Thanks,
>>> > > > >
>>> > > > > Jun
>>> > > > >
>>> > > > >
>>> > > > > On Thu, Oct 10, 2013 at 9:22 PM, Jiang Jacky <
>>> jiang0...@gmail.com>
>>> > > > wrote:
>>> > > > >
>>> > > > > > *Hi, Guys,*
>>> > > > > > *I am currently running into the kafka server issue. *
>>> > > > > > *I have a 5 nodes cluster and zookeeper running without any
>>> > problem.
>>> > > > > when I
>>> > > > > > manually boot each node by using* "*JMX_PORT=9997
>>> > > > > bin/kafka-server-start.sh
>>> > > > > > config/server-x.properties &*" command.
>>> > > > > >
>>> > > > > > *The scenario is:*
>>> > > > > > *Then, first node, it can be booted.*
>>> > > > > > *Once I boot the second node, it is crashed, the error is
>>> below:*
>>> > > > > >
>>> > > > > > [2013-10-11 04:02:17,200] INFO [Replica Manager on Broker 0]:
>>> > > Handling
>>> > > > > > LeaderAndIsr request
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> Name:LeaderAndIsrRequest;Version:0;Controller:0;ControllerEpoch:30416;CorrelationId:5;ClientId:id_0-host_null-port_9092;PartitionState:(test-kafka,0)
>>> > > > > > ->
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:90,ControllerEpoch:30411),ReplicationFactor:1),AllReplicas:1);Leaders:id:1,host:localhost,port:9092
>>> > > > > > (kafka.server.ReplicaManager)
>>> > > > > > [2013-10-11 04:02:17,204] WARN No previously checkpointed
>>> > > highwatermark
>>> > > > > > value found for topic test-kafka partition 0. Returning 0 as
>>> the
>>> > > > > > highwatermark (kafka.server.HighwaterMarkCheckpoint)
>>> > > > > > [2013-10-11 04:02:17,205] INFO [ReplicaFetcherManager on
>>> broker 0]
>>> > > > > Removing
>>> > > > > > fetcher for partition [test-kafka,0]
>>> > > > (kafka.server.ReplicaFetcherManager)
>>> > > > > > [2013-10-11 04:02:17,214] INFO [Kafka Log on Broker 0],
>>> Truncated
>>> > log
>>> > > > > > segment /tmp/kafka-logs/test-kafka-0/00000000000000000000.log
>>> to
>>> > > target
>>> > > > > > offset 0 (kafka.log.Log)
>>> > > > > > [2013-10-11 04:02:17,235] INFO [ReplicaFetcherManager on
>>> broker 0]
>>> > > > Adding
>>> > > > > > fetcher for partition [test-kafka,0], initOffset 0 to broker 1
>>> with
>>> > > > > > fetcherId 0 (kafka.server.ReplicaFetcherManager)
>>> > > > > > [2013-10-11 04:02:17,236] INFO [Replica Manager on Broker 0]:
>>> > Handled
>>> > > > > > leader and isr request
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> Name:LeaderAndIsrRequest;Version:0;Controller:0;ControllerEpoch:30416;CorrelationId:5;ClientId:id_0-host_null-port_9092;PartitionState:(test-kafka,0)
>>> > > > > > ->
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:90,ControllerEpoch:30411),ReplicationFactor:1),AllReplicas:1);Leaders:id:1,host:localhost,port:9092
>>> > > > > > (kafka.server.ReplicaManager)
>>> > > > > > [2013-10-11 04:02:17,240] INFO [ReplicaFetcherThread-0-1],
>>> Starting
>>> > > > > >  (kafka.server.ReplicaFetcherThread)
>>> > > > > > [2013-10-11 04:02:17,266] INFO [Replica Manager on Broker 0]:
>>> > > Handling
>>> > > > > > LeaderAndIsr request
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> Name:LeaderAndIsrRequest;Version:0;Controller:0;ControllerEpoch:30416;CorrelationId:6;ClientId:id_0-host_null-port_9092;PartitionState:(test-kafka,0)
>>> > > > > > ->
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:91,ControllerEpoch:30416),ReplicationFactor:1),AllReplicas:1);Leaders:id:1,host:localhost,port:9092
>>> > > > > > (kafka.server.ReplicaManager)
>>> > > > > > [2013-10-11 04:02:17,267] INFO [ReplicaFetcherManager on
>>> broker 0]
>>> > > > > Removing
>>> > > > > > fetcher for partition [test-kafka,0]
>>> > > > (kafka.server.ReplicaFetcherManager)
>>> > > > > > [2013-10-11 04:02:17,268] INFO [Kafka Log on Broker 0],
>>> Truncated
>>> > log
>>> > > > > > segment /tmp/kafka-logs/test-kafka-0/00000000000000000000.log
>>> to
>>> > > target
>>> > > > > > offset 0 (kafka.log.Log)
>>> > > > > > [2013-10-11 04:02:17,268] INFO [ReplicaFetcherManager on
>>> broker 0]
>>> > > > Adding
>>> > > > > > fetcher for partition [test-kafka,0], initOffset 0 to broker 1
>>> with
>>> > > > > > fetcherId 0 (kafka.server.ReplicaFetcherManager)
>>> > > > > > [2013-10-11 04:02:17,269] INFO [Replica Manager on Broker 0]:
>>> > Handled
>>> > > > > > leader and isr request
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> Name:LeaderAndIsrRequest;Version:0;Controller:0;ControllerEpoch:30416;CorrelationId:6;ClientId:id_0-host_null-port_9092;PartitionState:(test-kafka,0)
>>> > > > > > ->
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:91,ControllerEpoch:30416),ReplicationFactor:1),AllReplicas:1);Leaders:id:1,host:localhost,port:9092
>>> > > > > > (kafka.server.ReplicaManager)
>>> > > > > > [2013-10-11 04:02:17,269] ERROR [Kafka Request Handler 0 on
>>> Broker
>>> > > 0],
>>> > > > > > Exception when handling request
>>> (kafka.server.KafkaRequestHandler)
>>> > > > > > [2013-10-11 04:02:47,284] INFO Reconnect due to socket error:
>>> > > > > >  (kafka.consumer.SimpleConsumer)
>>> > > > > > java.net.SocketTimeoutException
>>> > > > > >         at
>>> > > > > >
>>> > >
>>> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
>>> > > > > >         at
>>> > > > > sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
>>> > > > > >         at
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
>>> > > > > >         at kafka.utils.Utils$.read(Utils.scala:394)
>>> > > > > >         at
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>>> > > > > >         at
>>> > > > > >
>>> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>> > > > > >         at
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>> > > > > >         at
>>> > > > > kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>>> > > > > >         at
>>> > > > > >
>>> > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
>>> > > > > >         at
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>>> > > > > >         at
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
>>> > > > > >         at
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
>>> > > > > >         at
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
>>> > > > > >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>> > > > > >         at
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
>>> > > > > >         at
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
>>> > > > > >         at
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
>>> > > > > >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>> > > > > >         at
>>> > > > kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
>>> > > > > >         at
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
>>> > > > > >         at
>>> > > > > >
>>> > > >
>>> >
>>> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
>>> > > > > >         at
>>> > > > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>>> > > > > > [2013-10-11 04:02:47,292] ERROR [Kafka Request Handler 1 on
>>> Broker
>>> > > 0],
>>> > > > > > Exception when handling request
>>> (kafka.server.KafkaRequestHandler)
>>> > > > > >
>>> > > > > > *Then I boot the third node until the last one, everything is
>>> good,
>>> > > > > except
>>> > > > > > of second node.*
>>> > > > > > *
>>> > > > > > *
>>> > > > > > *After that, I tried to stop server one by one, I first
>>> stopped the
>>> > > > > broken
>>> > > > > > node, then there is one of health node will show the same
>>> error as
>>> > > the
>>> > > > > > broken node, it is random. I stopped that broken node again,
>>> then
>>> > > there
>>> > > > > > will be another random node will be broken with the same
>>> error.*
>>> > > > > > *
>>> > > > > > *
>>> > > > > > *
>>> > > > > > *
>>> > > > > > *When I tried to produce message, it gives me the below
>>> errors:*
>>> > > > > >
>>> > > > > >
>>> > > > > > [2013-10-11 04:13:12,876] INFO Fetching metadata from broker
>>> > > > > > id:0,host:localhost,port:9092 with correlation id 15 for 1
>>> topic(s)
>>> > > > > > Set(my-replicated-topic) (kafka.client.ClientUtils$)
>>> > > > > > [2013-10-11 04:13:12,876] INFO Connected to localhost:9092 for
>>> > > > producing
>>> > > > > > (kafka.producer.SyncProducer)
>>> > > > > > [2013-10-11 04:13:12,886] INFO Disconnecting from
>>> localhost:9092
>>> > > > > > (kafka.producer.SyncProducer)
>>> > > > > > [2013-10-11 04:13:12,886] INFO Closing socket connection to /
>>> > > 127.0.0.1
>>> > > > .
>>> > > > > > (kafka.network.Processor)
>>> > > > > > [2013-10-11 04:13:12,887] WARN Error while fetching metadata
>>> > > > > > [{TopicMetadata for topic my-replicated-topic ->
>>> > > > > > No partition metadata for topic my-replicated-topic due to
>>> > > > > > kafka.common.LeaderNotAvailableException}] for topic
>>> > > > > [my-replicated-topic]:
>>> > > > > > class kafka.common.LeaderNotAvailableException
>>> > > > > >  (kafka.producer.BrokerPartitionInfo)
>>> > > > > > [2013-10-11 04:13:12,887] ERROR Failed to collate messages by
>>> > topic,
>>> > > > > > partition due to: Failed to fetch topic metadata for topic:
>>> > > > > > my-replicated-topic (kafka.producer.async.DefaultEventHandler)
>>> > > > > > [2013-10-11 04:13:12,887] INFO Back off for 100 ms before
>>> retrying
>>> > > > send.
>>> > > > > > Remaining retries = 0
>>> (kafka.producer.async.DefaultEventHandler)
>>> > > > > > [2013-10-11 04:13:12,988] INFO Fetching metadata from broker
>>> > > > > > id:0,host:localhost,port:9092 with correlation id 16 for 1
>>> topic(s)
>>> > > > > > Set(my-replicated-topic) (kafka.client.ClientUtils$)
>>> > > > > > [2013-10-11 04:13:12,989] INFO Connected to localhost:9092 for
>>> > > > producing
>>> > > > > > (kafka.producer.SyncProducer)
>>> > > > > > [2013-10-11 04:13:12,999] INFO Disconnecting from
>>> localhost:9092
>>> > > > > > (kafka.producer.SyncProducer)
>>> > > > > > [2013-10-11 04:13:12,999] INFO Closing socket connection to /
>>> > > 127.0.0.1
>>> > > > .
>>> > > > > > (kafka.network.Processor)
>>> > > > > > [2013-10-11 04:13:13,000] WARN Error while fetching metadata
>>> > > > > > [{TopicMetadata for topic my-replicated-topic ->
>>> > > > > > No partition metadata for topic my-replicated-topic due to
>>> > > > > > kafka.common.LeaderNotAvailableException}] for topic
>>> > > > > [my-replicated-topic]:
>>> > > > > > class kafka.common.LeaderNotAvailableException
>>> > > > > >  (kafka.producer.BrokerPartitionInfo)
>>> > > > > > [2013-10-11 04:13:13,000] ERROR Failed to send requests for
>>> topics
>>> > > > > > my-replicated-topic with correlation ids in [9,16]
>>> > > > > > (kafka.producer.async.DefaultEventHandler)
>>> > > > > > [2013-10-11 04:13:13,001] ERROR Error in handling batch of 1
>>> events
>>> > > > > > (kafka.producer.async.ProducerSendThread)
>>> > > > > > kafka.common.FailedToSendMessageException: Failed to send
>>> messages
>>> > > > after
>>> > > > > 3
>>> > > > > > tries.
>>> > > > > >         at
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
>>> > > > > >         at
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
>>> > > > > >         at
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
>>> > > > > >         at
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
>>> > > > > >         at
>>> > > scala.collection.immutable.Stream.foreach(Stream.scala:254)
>>> > > > > >         at
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>>> > > > > >         at
>>> > > > > >
>>> > > >
>>> >
>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>>> > > > > >
>>> > > > > > *I configured everything according to the documents.*
>>> > > > > > *I copied the setting from one of my nodes*
>>> > > > > >
>>> > > > > > broker.id=3
>>> > > > > >
>>> > > > > > ############################# Socket Server Settings
>>> > > > > > #############################
>>> > > > > >
>>> > > > > > port=9092
>>> > > > > >
>>> > > > > >
>>> > > > > > num.network.threads=2
>>> > > > > >
>>> > > > > > num.io.threads=2
>>> > > > > >
>>> > > > > > socket.send.buffer.bytes=1048576
>>> > > > > >
>>> > > > > > socket.receive.buffer.bytes=1048576
>>> > > > > >
>>> > > > > > socket.request.max.bytes=104857600
>>> > > > > >
>>> > > > > > log.dir=/tmp/kafka-logs
>>> > > > > >
>>> > > > > > num.partitions=1
>>> > > > > >
>>> > > > > > log.flush.interval.messages=10000
>>> > > > > >
>>> > > > > > log.flush.interval.ms=1000
>>> > > > > >
>>> > > > > > log.retention.hours=168
>>> > > > > >
>>> > > > > > log.segment.bytes=536870912
>>> > > > > >
>>> > > > > >
>>> > > > > > log.cleanup.interval.mins=1
>>> > > > > > zookeeper.connect=localhost:2181
>>> > > > > >
>>> > > > > >
>>> > > > > > zookeeper.connection.timeout.ms=1000000
>>> > > > > >
>>> > > > > >
>>> > > > > > kafka.metrics.polling.interval.secs=5
>>> > > > > > kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
>>> > > > > > kafka.csv.metrics.dir=/tmp/kafka_metrics
>>> > > > > >
>>> > > > > > kafka.csv.metrics.reporter.enabled=false
>>> > > > > >
>>> > > > > > *Can some one tell me what happened?  Appreciate!*
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>
>>
>

Reply via email to