I have Kafka 2.10-0.8.2.1 and zookeeper installed on the AWS EC2. The instance 
is working fine.

[kafka@ip-xx-xx-xx-xx bin]$ ./kafka-topics.sh --topic topic1 --zookeeper 
localhost:2181 --describe
Topic:topic1    PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: topic1   Partition: 0    Leader: 0       Replicas: 0     Isr: 0

When producer running on my local machine (remote) tries to publish to topic1 
it fails with exception below. It does create the topic1 (If topic1 didn't 
exist), Also on kakfa log I do see that producer was able to connect to kafka 
server.
[2015-09-26 01:24:26,874] INFO Closing socket connection to /X0X.23X.X4X.X3X. 
(kafka.network.Processor). These indicate the local producer is able to connect 
to kafka instance on AWS.

If I run producer on Kafka server itself, its able to publish to topic1 without 
any issue. I am not sure I understand what the issue with local producer. Any 
pointer will be helpful???

C:\JAVA_INSTALLATION\kafka\kafka_2.10-0.8.2.1>bin\windows\kafka-console-producer.bat
 --broker-list public-ip-address-of-kafka-ec2:9092 --topic topic1 
--queue-enqueuetimeout-ms 10000 --request-timeout-ms 10000
[2015-09-25 23:08:00,146] WARN Property topic is not valid 
(kafka.utils.VerifiableProperties)
12
[2015-09-25 23:08:02,977] WARN Failed to send producer request with correlation 
id 2 to broker 0 with data for partitions [topic1,0] 
(kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
       at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
        at 
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at 
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
        at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at 
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-09-25 23:08:03,653] WARN Failed to send producer request with correlation 
id 5 to broker 0 with data for partitions [topic1,0] 
(kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
        at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
        at 
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at 
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
        at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at 
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-09-25 23:08:04,296] WARN Failed to send producer request with correlation 
id 8 to broker 0 with data for partitions [topic1,0] 
(kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
        at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
        at 
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at 
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
        at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at 
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-09-25 23:08:04,954] WARN Failed to send producer request with correlation 
id 11 to broker 0 with data for partitions [topic1,0] 
(kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
        at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
        at 
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at 
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
        at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at 
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-09-25 23:08:05,597] ERROR Failed to send requests for topics topic1 with 
correlation ids in [0,12] (kafka.producer.async.DefaultEventHandler)
[2015-09-25 23:08:05,597] ERROR Error in handling batch of 1 events 
(kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 
tries.
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
        at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at 
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)


[2015-09-25 23:08:04,954] WARN Failed to send producer request with correlation 
id 11 to broker 0 with data for partitions [topic1,0] 
(kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
        at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
        at 
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at 
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
        at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at 
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-09-25 23:08:05,597] ERROR Failed to send requests for topics topic1 with 
correlation ids in [0,12] (kafka.producer.async.DefaultEventHandler)
[2015-09-25 23:08:05,597] ERROR Error in handling batch of 1 events 
(kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 
tries.
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
        at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at 
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

Thanks,
Shri


________________________________
This message and its contents (to include attachments) are the property of 
National Health Systems, Inc. and may contain confidential and proprietary 
information. This email and any files transmitted with it are intended solely 
for the use of the individual or entity to whom they are addressed. You are 
hereby notified that any unauthorized disclosure, copying, or distribution of 
this message, or the taking of any unauthorized action based on information 
contained herein is strictly prohibited. Unauthorized use of information 
contained herein may subject you to civil and criminal prosecution and 
penalties. If you are not the intended recipient, you should delete this 
message immediately and notify the sender immediately by telephone or by 
replying to this transmission.

Reply via email to