Hi all,

I want to use the KafkaSpout(where is 
https://github.com/apache/incubator-storm/tree/master/external/storm-kafka) to 
read data from kafka. But my kafka and zookeeper cluster are builted on 
GCE(google cloud engine) and my storm testing is on local machine. The problem 
is I cannot set public IP by kafka config - server.properties to reach from my 
local(out of kafka's domain) storm machines. 
***My server.properties is as follows:
host.name=[Brokers's Private IP]
advertise.port=9092
advertise.host.name=[Brokers's Public IP]

***My KafkaSpout's config in Java is as follows:
BrokerHosts a = new ZkHosts("[IP1],[IP2],[IP3]","/brokers");
        SpoutConfig kafkaConf = new SpoutConfig(a, topic, "/stormkafka", 
topic);//Using [topic] as id
        kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
        kafkaConf.zkServers = new ArrayList<String>() {
            {
                add("[IP1]");
                add("[IP1]");
                add("[IP1]");
            }
        };
        kafkaConf.zkPort = 2181;
        kafkaConf.forceFromStart = true;
ps. I have 3-node kafka cluster and 3-node zookeeper cluster on 3 machines. 
That's to say, Each machine with a broker and zookeeper node

======================================
So the problem is it seens not to connect from the domain different from 
kafka's domain, e.g.my local storm machine. This is a big problem if I need to 
communcate with kafka out of its domain. The error exception is as follows, and 
can anyone help me with this issue. How can I config it in a right way so that 
I can subscribe topic of kafka from my local machine(storm).
Thanks a lot :)



java.lang.RuntimeException: java.net.NoRouteToHostException: 找不到通往該主機的路由路徑
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:83) 
~[storm-kafka-0.8-plus-0.4.0.jar:na]
at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:45) 
~[storm-kafka-0.8-plus-0.4.0.jar:na]
at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:118) 
~[storm-kafka-0.8-plus-0.4.0.jar:na]
at 
backtype.storm.daemon.executor$eval5100$fn__5101$fn__5116$fn__5145.invoke(executor.clj:562)
 ~[na:na]
at backtype.storm.util$async_loop$fn__390.invoke(util.clj:433) ~[na:na]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_60]
Caused by: java.net.NoRouteToHostException: 找不到通往該主機的路由路徑
at sun.nio.ch.Net.connect0(Native Method) ~[na:1.7.0_60]
at sun.nio.ch.Net.connect(Net.java:465) ~[na:1.7.0_60]
at sun.nio.ch.Net.connect(Net.java:457) ~[na:1.7.0_60]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670) 
~[na:1.7.0_60]
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) 
~[kafka_2.10-0.8.1.1.jar:na]
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) 
~[kafka_2.10-0.8.1.1.jar:na]
at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142) 
~[kafka_2.10-0.8.1.1.jar:na]
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
 ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:124) 
~[kafka_2.10-0.8.1.1.jar:na]
at 
kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) 
~[kafka_2.10-0.8.1.1.jar:na]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:55) 
~[storm-kafka-0.8-plus-0.4.0.jar:na]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:45) 
~[storm-kafka-0.8-plus-0.4.0.jar:na]
at storm.kafka.PartitionManager.<init>(PartitionManager.java:77) 
~[storm-kafka-0.8-plus-0.4.0.jar:na]
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:78) 
~[storm-kafka-0.8-plus-0.4.0.jar:na]
... 6 common frames omitted

Best regards,
James Fu

Reply via email to