Could you try setting a larger zookeeper.connection.timeout.ms?

Thanks,

Jun


On Tue, May 6, 2014 at 4:16 AM, Sebastian Mattheis <
sebastian.matth...@bmw-carit.de> wrote:

> Hello,
>
> I get an error when trying to 'consume' messages from Kafka (2.9.2-0.8.1)
> with a Zookeer stand-alone (3.4.5). You can see the source code below as
> well as the error message and logfile from Zookeeper.
>
> I'm not sure if the Java libraries are incompatible, because I added
> dependency kafka_0.9.2 (0.8.1) via Maven which automatically resolved
> dependency of zkclient (0.3) and zookeeper (3.3.4).
>
> The consumer source code:
>
>                 import java.util.Properties;
>
>                 import kafka.consumer.Consumer;
>                 import kafka.consumer.ConsumerConfig;
>                 import kafka.javaapi.consumer.ConsumerConnector;
>
>                 public class ConsumerTest
>                 {
>                                public static void main(String[] args)
>                                {
>                                                try
>                                                {
>                                                                Properties
> props = new Properties();
>
>
>  props.put("zookeeper.connect", "192.168.0.1:2181/kafka");
>                                                                props.put("
> group.id", "my-consumer");
>                                                                props.put("
> zookeeper.session.timeout.ms", "400");
>                                                                props.put("
> zookeeper.sync.time.ms", "200");
>                                                                props.put("
> auto.commit.interval.ms", "1000");
>
>
>  ConsumerConfig config = new ConsumerConfig(props);
>
>  @SuppressWarnings("unused")
>
>  ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config);
>                                                }
>                                                catch(Exception e)
>                                                {
>
>  System.out.println(e.getMessage());
>
>  e.printStackTrace();
>                                                }
>                                }
>                 }
>
> The pom.xml:
>
>                 <project xmlns="http://maven.apache.org/POM/4.0.0";
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="
> http://maven.apache.org/POM/4.0.0
> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>                   <modelVersion>4.0.0</modelVersion>
>                   <groupId>test.my</groupId>
>                   <artifactId>kafka-consumer</artifactId>
>                   <version>0.0.1-SNAPSHOT</version>
>                   <dependencies>
>                                 <dependency>
>
>  <groupId>org.apache.kafka</groupId>
>
>  <artifactId>kafka_2.9.2</artifactId>
>                                                <exclusions>
>                                                                <exclusion>
>
>      <artifactId>jms</artifactId>
>
>      <groupId>javax.jms</groupId>
>                                                                </exclusion>
>                                                                <exclusion>
>
>      <artifactId>jmxtools</artifactId>
>
>      <groupId>com.sun.jdmk</groupId>
>                                                                </exclusion>
>                                                                <exclusion>
>
>      <artifactId>jmxri</artifactId>
>
>      <groupId>com.sun.jmx</groupId>
>                                                                </exclusion>
>                                                </exclusions>
>                                 </dependency>
>                   </dependencies>
>                   <dependencyManagement>
>                                 <dependencies>
>                                                <dependency>
>
>  <groupId>org.apache.kafka</groupId>
>
>  <artifactId>kafka_2.9.2</artifactId>
>
>  <version>0.8.1</version>
>                                                </dependency>
>                                 </dependencies>
>                   </dependencyManagement>
>                 </project>
>
> The exception message and stack trace:
>
>                 Unable to connect to zookeeper server within timeout: 400
>                 org.I0Itec.zkclient.exception.ZkTimeoutException: Unable
> to connect to zookeeper server within timeout: 400
>                                at
> org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:880)
>                                at
> org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:98)
>                                at
> org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:84)
>                                at
> kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:156)
>                                at
> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:114)
>                                at
> kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:65)
>                                at
> kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:67)
>                                at
> kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100)
>                                at
> kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
>                                at ConsumerTest.main(ConsumerTest.java:23)
>
> The zookeeper log:
>
>                 2014-05-06 11:48:11,907 [myid:] - INFO
>  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] -
> Accepted socket connection from /192.168.0.4:52568
>                 2014-05-06 11:48:11,909 [myid:] - WARN
>  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@349] - caught
> end of stream exception
>                 EndOfStreamException: Unable to read additional data from
> client sessionid 0x0, likely client has closed socket
>                                     at
> org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:220)
>                                     at
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
>                                     at
> java.lang.Thread.run(Thread.java:701)
>                 2014-05-06 11:48:11,909 [myid:] - INFO
>  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1001] - Closed
> socket connection for client /192.168.0.4:52568 (no session established
> for client)
>
> Note I can successfully 'produce' and 'consume' messages from Kafka nodes
> with the command line tools:
>
>                 $ sudo -u kafka bin/kafka-console-producer.sh
> --broker-list 192.168.0.2:9092,192.168.0.3:9092 --topic my-topic
>                 SLF4J: Failed to load class
> "org.slf4j.impl.StaticLoggerBinder".
>                 SLF4J: Defaulting to no-operation (NOP) logger
> implementation
>                 SLF4J: See
> http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
>                 This is a first message.
>                 This is a second message.
>                 $ sudo -u kafka bin/kafka-console-consumer.sh --zookeeper
> 192.168.0.1:2181/kafka --topic my-topic --from-beginning
>                 SLF4J: Failed to load class
> "org.slf4j.impl.StaticLoggerBinder".
>                 SLF4J: Defaulting to no-operation (NOP) logger
> implementation
>                 SLF4J: See
> http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
>                 This is a first message.
>                 This is a second message.
>
> I can even successfully produce messages from a Java client producer.
>
> Regards
> Sebastian
>

Reply via email to