I'm using the Java high level consumer API (my code is written in Scala).
And when the connection of zookeeper is timeout and reconnected, the stream
hangs and cannot get the new data. Can anybody help? Thanks!

The version of Kafka is 0.8.2 that comes with cloudera.

The code is written like the example here:
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example .
Except I only use one thread for executors since I only have one partition
for this topic.

The topic is like this:

> kafka-topics --zookeeper szq2.appadhoc.com:2181 --describe --topic
adhoc_data
Topic:adhoc_data        PartitionCount:1        ReplicationFactor:1
Configs:
        Topic: adhoc_data       Partition: 0    Leader: 42      Replicas:
42    Isr: 42

And the config of consumer is like this:

  val props = new Properties()
  props.put("zookeeper.connect", config.getString("kafka.zookeeper"))
  props.put("group.id", config.getString("kafka.group-id"))
  props.put("num.consumer.fetchers", "32")
  props.put("rebalance.max.retries", "10000")
  props.put("auto.commit.enable", "false")
  props.put("offsets.channel.backoff.ms", "10000")
  props.put("offsets.commit.max.retries", "10000")

And here is the log:

22:18:39.994 [main] INFO  org.apache.zookeeper.ZooKeeper - Client
environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
22:18:39.996 [main] INFO  org.apache.zookeeper.ZooKeeper - Client
environment:host.name=szq2.appadhoc.com
22:18:39.996 [main] INFO  org.apache.zookeeper.ZooKeeper - Client
environment:java.version=1.7.0_79
22:18:39.996 [main] INFO  org.apache.zookeeper.ZooKeeper - Client
environment:java.vendor=Oracle Corporation
22:18:39.997 [main] INFO  org.apache.zookeeper.ZooKeeper - Client
environment:java.home=/usr/lib/jvm/java-7-openjdk-amd64/jre
22:18:39.997 [main] INFO  org.apache.zookeeper.ZooKeeper - Client
environment:java.class.path=...
22:18:39.997 [main] INFO  org.apache.zookeeper.ZooKeeper - Client
environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/
usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
22:18:39.997 [main] INFO  org.apache.zookeeper.ZooKeeper - Client
environment:java.io.tmpdir=/tmp
22:18:39.997 [main] INFO  org.apache.zookeeper.ZooKeeper - Client
environment:java.compiler=<NA>
22:18:39.997 [main] INFO  org.apache.zookeeper.ZooKeeper - Client
environment:os.name=Linux
22:18:39.997 [main] INFO  org.apache.zookeeper.ZooKeeper - Client
environment:os.arch=amd64
22:18:39.997 [main] INFO  org.apache.zookeeper.ZooKeeper - Client
environment:os.version=3.16.0-30-generic
22:18:39.997 [main] INFO  org.apache.zookeeper.ZooKeeper - Client
environment:user.name=root
22:18:39.997 [main] INFO  org.apache.zookeeper.ZooKeeper - Client
environment:user.home=/root
22:18:39.997 [main] INFO  org.apache.zookeeper.ZooKeeper - Client
environment:user.dir=/home/wangbin/adhoc-tracker-1.0
22:18:39.998 [main] INFO  org.apache.zookeeper.ZooKeeper - Initiating
client connection, connectString=szq2.appadhoc.com:2181,
szq4.appadhoc.com:2181,szq5.appadhoc.com:2181 sessio
nTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@6dc64fff
22:18:40.024 [main-SendThread(szq2.appadhoc.com:2181)] INFO
 org.apache.zookeeper.ClientCnxn - Opening socket connection to server
szq2.appadhoc.com/192.168.1.2:2181. Will not at
tempt to authenticate using SASL (unknown error)
22:18:40.035 [main-SendThread(szq2.appadhoc.com:2181)] INFO
 org.apache.zookeeper.ClientCnxn - Socket connection established to
szq2.appadhoc.com/192.168.1.2:2181, initiating ses
sion
22:18:40.053 [main-SendThread(szq2.appadhoc.com:2181)] INFO
 org.apache.zookeeper.ClientCnxn - Session establishment complete on server
szq2.appadhoc.com/192.168.1.2:2181, sessio
nid = 0x150efb12a81032d, negotiated timeout = 6000
22:18:40.567 [pool-1-thread-1] INFO  com.appadhoc.tracker.Reader$ - Kafka
consumer thread is running, thread id: 21
22:18:40.851 [on-spray-can-akka.actor.default-dispatcher-4] INFO
 akka.event.slf4j.Slf4jLogger - Slf4jLogger started
22:18:41.811 [main] INFO  com.appadhoc.tracker.Reader$ - Server started at
0.0.0.0:9094
22:18:45.651 [main-SendThread(szq2.appadhoc.com:2181)] INFO
 org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard
from server in 5066ms for sessionid 0x150e
fb12a81032d, closing socket connection and attempting reconnect
22:18:50.030 [main-SendThread(szq4.appadhoc.com:2181)] INFO
 org.apache.zookeeper.ClientCnxn - Opening socket connection to server
szq4.appadhoc.com/192.168.1.4:2181. Will not at
tempt to authenticate using SASL (unknown error)
22:18:50.032 [main-SendThread(szq4.appadhoc.com:2181)] INFO
 org.apache.zookeeper.ClientCnxn - Socket connection established to
szq4.appadhoc.com/192.168.1.4:2181, initiating ses
sion
22:18:50.034 [main-SendThread(szq4.appadhoc.com:2181)] INFO
 org.apache.zookeeper.ClientCnxn - Unable to reconnect to ZooKeeper
service, session 0x150efb12a81032d has expired, cl
osing socket connection
22:18:50.034 [main-EventThread] INFO  org.apache.zookeeper.ZooKeeper -
Initiating client connection, connectString=szq2.appadhoc.com:2181,
szq4.appadhoc.com:2181,szq5.appadhoc.com
:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@6dc64fff
22:18:50.047 [main-EventThread] INFO  org.apache.zookeeper.ClientCnxn -
EventThread shut down
22:18:50.050 [main-SendThread(szq2.appadhoc.com:2181)] INFO
 org.apache.zookeeper.ClientCnxn - Opening socket connection to server
szq2.appadhoc.com/192.168.1.2:2181. Will not at
tempt to authenticate using SASL (unknown error)
22:18:50.073 [main-SendThread(szq2.appadhoc.com:2181)] INFO
 org.apache.zookeeper.ClientCnxn - Socket connection established to
szq2.appadhoc.com/192.168.1.2:2181, initiating ses
sion
22:18:50.119 [main-SendThread(szq2.appadhoc.com:2181)] INFO
 org.apache.zookeeper.ClientCnxn - Session establishment complete on server
szq2.appadhoc.com/192.168.1.2:2181, sessio
nid = 0x150efb12a81032f, negotiated timeout = 6000
22:18:51.033 [on-spray-can-akka.actor.default-dispatcher-2] INFO
 spray.can.server.HttpListener - Bound to /0.0.0.0:9094
22:18:51.033 [on-spray-can-akka.actor.default-dispatcher-2] INFO
 akka.actor.DeadLetterActorRef - Message [akka.io.Tcp$Bound] from
Actor[akka://on-spray-can/user/IO-HTTP/listener
-0#1252523356] to Actor[akka://on-spray-can/deadLetters] was not delivered.
[1] dead letters encountered. This logging can be turned off or adjusted
with configuration settings '
akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
22:31:35.391 [main-SendThread(szq2.appadhoc.com:2181)] INFO
 org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard
from server in 7645ms for sessionid 0x150efb12a81032f, closing socket
connection and attempting reconnect
22:31:36.075 [main-SendThread(szq5.appadhoc.com:2181)] INFO
 org.apache.zookeeper.ClientCnxn - Opening socket connection to server
szq5.appadhoc.com/192.168.1.5:2181. Will not attempt to authenticate using
SASL (unknown error)
22:31:36.077 [main-SendThread(szq5.appadhoc.com:2181)] INFO
 org.apache.zookeeper.ClientCnxn - Socket connection established to
szq5.appadhoc.com/192.168.1.5:2181, initiating session
22:31:36.079 [main-SendThread(szq5.appadhoc.com:2181)] INFO
 org.apache.zookeeper.ClientCnxn - Unable to reconnect to ZooKeeper
service, session 0x150efb12a81032f has expired, closing socket connection
22:31:36.079 [main-EventThread] INFO  org.apache.zookeeper.ZooKeeper -
Initiating client connection, connectString=szq2.appadhoc.com:2181,
szq4.appadhoc.com:2181,szq5.appadhoc.com:2181 sessionTimeout=6000
watcher=org.I0Itec.zkclient.ZkClient@6dc64fff
22:31:36.089 [main-EventThread] INFO  org.apache.zookeeper.ClientCnxn -
EventThread shut down
22:31:36.093 [main-SendThread(szq5.appadhoc.com:2181)] INFO
 org.apache.zookeeper.ClientCnxn - Opening socket connection to server
szq5.appadhoc.com/192.168.1.5:2181. Will not attempt to authenticate using
SASL (unknown error)
22:31:36.100 [main-SendThread(szq5.appadhoc.com:2181)] INFO
 org.apache.zookeeper.ClientCnxn - Socket connection established to
szq5.appadhoc.com/192.168.1.5:2181, initiating session
22:31:36.117 [main-SendThread(szq5.appadhoc.com:2181)] INFO
 org.apache.zookeeper.ClientCnxn - Session establishment complete on server
szq5.appadhoc.com/192.168.1.5:2181, sessionid = 0x450efb12a830305,
negotiated timeout = 6000

Reply via email to