Make sure you have host.name in you server properties setup right. I usually give it the ec2 DNS name. Another case where that helped: https://discuss.elastic.co/t/logstash-kafka-output- <https://discuss.elastic.co/t/logstash-kafka-output-plugins-not-working-on-windows/25253/3?u=joe_lawson> plugins- <https://discuss.elastic.co/t/logstash-kafka-output-plugins-not-working-on-windows/25253/3?u=joe_lawson> not-working-on-windows <https://discuss.elastic.co/t/logstash-kafka-output-plugins-not-working-on-windows/25253/3?u=joe_lawson> /25253/3?u= <https://discuss.elastic.co/t/logstash-kafka-output-plugins-not-working-on-windows/25253/3?u=joe_lawson> joe <https://discuss.elastic.co/t/logstash-kafka-output-plugins-not-working-on-windows/25253/3?u=joe_lawson> _lawson <https://discuss.elastic.co/t/logstash-kafka-output-plugins-not-working-on-windows/25253/3?u=joe_lawson> and here: https://discuss.elastic.co/t/logstash-1-5-3-is-not-able-to-connect-to-kafka/27611/5?u=joe_lawson On Sep 26, 2015 1:38 AM, "Shrikant Patel" <spa...@pdxinc.com> wrote:
> I have Kafka 2.10-0.8.2.1 and zookeeper installed on the AWS EC2. The > instance is working fine. > > [kafka@ip-xx-xx-xx-xx bin]$ ./kafka-topics.sh --topic topic1 --zookeeper > localhost:2181 --describe > Topic:topic1 PartitionCount:1 ReplicationFactor:1 Configs: > Topic: topic1 Partition: 0 Leader: 0 Replicas: 0 > Isr: 0 > > When producer running on my local machine (remote) tries to publish to > topic1 it fails with exception below. It does create the topic1 (If topic1 > didn't exist), Also on kakfa log I do see that producer was able to connect > to kafka server. > [2015-09-26 01:24:26,874] INFO Closing socket connection to > /X0X.23X.X4X.X3X. (kafka.network.Processor). These indicate the local > producer is able to connect to kafka instance on AWS. > > If I run producer on Kafka server itself, its able to publish to topic1 > without any issue. I am not sure I understand what the issue with local > producer. Any pointer will be helpful??? > > C:\JAVA_INSTALLATION\kafka\kafka_2.10-0.8.2.1>bin\windows\kafka-console-producer.bat > --broker-list public-ip-address-of-kafka-ec2:9092 --topic topic1 > --queue-enqueuetimeout-ms 10000 --request-timeout-ms 10000 > [2015-09-25 23:08:00,146] WARN Property topic is not valid > (kafka.utils.VerifiableProperties) > 12 > [2015-09-25 23:08:02,977] WARN Failed to send producer request with > correlation id 2 to broker 0 with data for partitions [topic1,0] > (kafka.producer.async.DefaultEventHandler) > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > at > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103) > at > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103) > at > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102) > at > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) > at > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at kafka.producer.SyncProducer.send(SyncProducer.scala:101) > at > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255) > at > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106) > at > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > at > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > [2015-09-25 23:08:03,653] WARN Failed to send producer request with > correlation id 5 to broker 0 with data for partitions [topic1,0] > (kafka.producer.async.DefaultEventHandler) > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > at > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103) > at > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103) > at > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102) > at > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) > at > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at kafka.producer.SyncProducer.send(SyncProducer.scala:101) > at > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255) > at > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106) > at > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > at > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > [2015-09-25 23:08:04,296] WARN Failed to send producer request with > correlation id 8 to broker 0 with data for partitions [topic1,0] > (kafka.producer.async.DefaultEventHandler) > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > at > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103) > at > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103) > at > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102) > at > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) > at > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at kafka.producer.SyncProducer.send(SyncProducer.scala:101) > at > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255) > at > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106) > at > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > at > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > [2015-09-25 23:08:04,954] WARN Failed to send producer request with > correlation id 11 to broker 0 with data for partitions [topic1,0] > (kafka.producer.async.DefaultEventHandler) > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > at > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103) > at > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103) > at > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102) > at > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) > at > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at kafka.producer.SyncProducer.send(SyncProducer.scala:101) > at > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255) > at > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106) > at > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > at > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > [2015-09-25 23:08:05,597] ERROR Failed to send requests for topics topic1 > with correlation ids in [0,12] (kafka.producer.async.DefaultEventHandler) > [2015-09-25 23:08:05,597] ERROR Error in handling batch of 1 events > (kafka.producer.async.ProducerSendThread) > kafka.common.FailedToSendMessageException: Failed to send messages after 3 > tries. > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > > > [2015-09-25 23:08:04,954] WARN Failed to send producer request with > correlation id 11 to broker 0 with data for partitions [topic1,0] > (kafka.producer.async.DefaultEventHandler) > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > at > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103) > at > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103) > at > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102) > at > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) > at > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at kafka.producer.SyncProducer.send(SyncProducer.scala:101) > at > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255) > at > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106) > at > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > at > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > [2015-09-25 23:08:05,597] ERROR Failed to send requests for topics topic1 > with correlation ids in [0,12] (kafka.producer.async.DefaultEventHandler) > [2015-09-25 23:08:05,597] ERROR Error in handling batch of 1 events > (kafka.producer.async.ProducerSendThread) > kafka.common.FailedToSendMessageException: Failed to send messages after 3 > tries. > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > > Thanks, > Shri > > > ________________________________ > This message and its contents (to include attachments) are the property of > National Health Systems, Inc. and may contain confidential and proprietary > information. This email and any files transmitted with it are intended > solely for the use of the individual or entity to whom they are addressed. > You are hereby notified that any unauthorized disclosure, copying, or > distribution of this message, or the taking of any unauthorized action > based on information contained herein is strictly prohibited. Unauthorized > use of information contained herein may subject you to civil and criminal > prosecution and penalties. If you are not the intended recipient, you > should delete this message immediately and notify the sender immediately by > telephone or by replying to this transmission. >