Hello,

I get an error when trying to 'consume' messages from Kafka (2.9.2-0.8.1) with 
a Zookeer stand-alone (3.4.5). You can see the source code below as well as the 
error message and logfile from Zookeeper.

I'm not sure if the Java libraries are incompatible, because I added dependency 
kafka_0.9.2 (0.8.1) via Maven which automatically resolved dependency of 
zkclient (0.3) and zookeeper (3.3.4).

The consumer source code:

                import java.util.Properties;

                import kafka.consumer.Consumer;
                import kafka.consumer.ConsumerConfig;
                import kafka.javaapi.consumer.ConsumerConnector;

                public class ConsumerTest
                {
                               public static void main(String[] args)
                               {
                                               try
                                               {
                                                               Properties props 
= new Properties();

                                                               
props.put("zookeeper.connect", "192.168.0.1:2181/kafka");
                                                               
props.put("group.id", "my-consumer");
                                                               
props.put("zookeeper.session.timeout.ms", "400");
                                                               
props.put("zookeeper.sync.time.ms", "200");
                                                               
props.put("auto.commit.interval.ms", "1000");

                                                               ConsumerConfig 
config = new ConsumerConfig(props);
                                                               
@SuppressWarnings("unused")
                                                               
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config);
                                               }
                                               catch(Exception e)
                                               {
                                                               
System.out.println(e.getMessage());
                                                               
e.printStackTrace();
                                               }
                               }
                }

The pom.xml:

                <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
                  <modelVersion>4.0.0</modelVersion>
                  <groupId>test.my</groupId>
                  <artifactId>kafka-consumer</artifactId>
                  <version>0.0.1-SNAPSHOT</version>
                  <dependencies>
                                <dependency>
                                               
<groupId>org.apache.kafka</groupId>
                                               
<artifactId>kafka_2.9.2</artifactId>
                                               <exclusions>
                                                               <exclusion>
                                                                               
<artifactId>jms</artifactId>
                                                                               
<groupId>javax.jms</groupId>
                                                               </exclusion>
                                                               <exclusion>
                                                                               
<artifactId>jmxtools</artifactId>
                                                                               
<groupId>com.sun.jdmk</groupId>
                                                               </exclusion>
                                                               <exclusion>
                                                                               
<artifactId>jmxri</artifactId>
                                                                               
<groupId>com.sun.jmx</groupId>
                                                               </exclusion>
                                               </exclusions>
                                </dependency>
                  </dependencies>
                  <dependencyManagement>
                                <dependencies>
                                               <dependency>
                                                               
<groupId>org.apache.kafka</groupId>
                                                               
<artifactId>kafka_2.9.2</artifactId>
                                                               
<version>0.8.1</version>
                                               </dependency>
                                </dependencies>
                  </dependencyManagement>
                </project>

The exception message and stack trace:

                Unable to connect to zookeeper server within timeout: 400
                org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to 
connect to zookeeper server within timeout: 400
                               at 
org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:880)
                               at 
org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:98)
                               at 
org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:84)
                               at 
kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:156)
                               at 
kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:114)
                               at 
kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:65)
                               at 
kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:67)
                               at 
kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100)
                               at 
kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
                               at ConsumerTest.main(ConsumerTest.java:23)

The zookeeper log:

                2014-05-06 11:48:11,907 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted 
socket connection from /192.168.0.4:52568
                2014-05-06 11:48:11,909 [myid:] - WARN  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@349] - caught end of 
stream exception
                EndOfStreamException: Unable to read additional data from 
client sessionid 0x0, likely client has closed socket
                                    at 
org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220)
                                    at 
org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
                                    at java.lang.Thread.run(Thread.java:701)
                2014-05-06 11:48:11,909 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1001] - Closed socket 
connection for client /192.168.0.4:52568 (no session established for client)

Note I can successfully 'produce' and 'consume' messages from Kafka nodes with 
the command line tools:

                $ sudo -u kafka bin/kafka-console-producer.sh --broker-list 
192.168.0.2:9092,192.168.0.3:9092 --topic my-topic
                SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
                SLF4J: Defaulting to no-operation (NOP) logger implementation
                SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder 
for further details.
                This is a first message.
                This is a second message.
                $ sudo -u kafka bin/kafka-console-consumer.sh --zookeeper 
192.168.0.1:2181/kafka --topic my-topic --from-beginning
                SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
                SLF4J: Defaulting to no-operation (NOP) logger implementation
                SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder 
for further details.
                This is a first message.
                This is a second message.

I can even successfully produce messages from a Java client producer.

Regards
Sebastian

Reply via email to