I have a 4-broker Kafka system running in Amazon EC2, and we are using Kafka 0.8 beta1. Most of the standard default configurations remain unchanged. Running the kafka tool ConsumerOffsetChecker is causing socket errors to occur. Some of these socket reset errors are also in the kafka server log.
Usually the first few message topics are printed, but then every subsequent one is causing a socket error. The entire command completes in only 3 or 4 seconds, so I don't know where the timeouts are coming from. Do you have any suggestions? Here is the log: bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group ArchivingConsumer --zkconnect ec2-23-22-34-191.compute-1.amazonaws.com:2181 [2013-07-08 10:14:33,172] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [2013-07-08 10:14:33,186] INFO Client environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT (org.apache.zookeeper.ZooKeeper) [2013-07-08 10:14:33,186] INFO Client environment:host.name=ip-10-41-3-33.ec2.internal (org.apache.zookeeper.ZooKeeper) [2013-07-08 10:14:33,186] INFO Client environment:java.version=1.7.0_21 (org.apache.zookeeper.ZooKeeper) [2013-07-08 10:14:33,186] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper) [2013-07-08 10:14:33,186] INFO Client environment:java.home=/usr/lib/jvm/java-7-openjdk-amd64/jre (org.apache.zookeeper.ZooKeeper) [2013-07-08 10:14:33,187] INFO Client environment:java.class.path=:bin/../core/target/scala-2.8.0/kafka_2.8.0-0.8.0-beta1.jar:bin/../core/target/scala-2.8.0/kafka-assembly-0.8.0-beta1-deps.jar:bin/../perf/target/scala-2.8.0/kafka-perf_2.8.0-0.8.0-beta1.jar:bin/../libs/*.jar:bin/../kafka*.jar (org.apache.zookeeper.ZooKeeper) [2013-07-08 10:14:33,187] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib/jni:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper) [2013-07-08 10:14:33,187] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper) [2013-07-08 10:14:33,187] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper) [2013-07-08 10:14:33,187] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper) [2013-07-08 10:14:33,187] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper) [2013-07-08 10:14:33,188] INFO Client environment:os.version=3.2.0-32-virtual (org.apache.zookeeper.ZooKeeper) [2013-07-08 10:14:33,188] INFO Client environment:user.name=ubuntu (org.apache.zookeeper.ZooKeeper) [2013-07-08 10:14:33,188] INFO Client environment:user.home=/home/ubuntu (org.apache.zookeeper.ZooKeeper) [2013-07-08 10:14:33,188] INFO Client environment:user.dir=/opt/kafka/kafka-0.8 (org.apache.zookeeper.ZooKeeper) [2013-07-08 10:14:33,189] INFO Initiating client connection, connectString= ec2-23-22-34-191.compute-1.amazonaws.com:2181 sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@737c45ee(org.apache.zookeeper.ZooKeeper) [2013-07-08 10:14:33,216] INFO Opening socket connection to server ec2-23-22-34-191.compute-1.amazonaws.com/10.122.169.44:2181(org.apache.zookeeper.ClientCnxn) [2013-07-08 10:14:33,226] INFO Socket connection established to ec2-23-22-34-191.compute-1.amazonaws.com/10.122.169.44:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2013-07-08 10:14:33,300] INFO Session establishment complete on server ec2-23-22-34-191.compute-1.amazonaws.com/10.122.169.44:2181, sessionid = 0x13fb0932c3a002a, negotiated timeout = 30000 (org.apache.zookeeper.ClientCnxn) [2013-07-08 10:14:33,302] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) Group Topic Pid Offset logSize Lag Owner ArchivingConsumer qa-M-Candidate-CrmStatus-Events 0 30 30 0 ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0 ArchivingConsumer qa-M-Match 0 36 36 0 ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0 ArchivingConsumer qa-M-friday-01 0 100 100 0 ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0 ArchivingConsumer qa-M-test-01 0 200 200 0 ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0 [2013-07-08 10:14:35,223] INFO Reconnect due to socket error: (kafka.consumer.SimpleConsumer) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125) at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89) at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154) at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153) at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala) . . . repeated for each topic . . . ArchivingConsumer qa-f1-Match 0 14 14 0 ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0 [2013-07-08 10:14:35,906] INFO Reconnect due to socket error: (kafka.consumer.SimpleConsumer) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125) at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89) at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154) at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153) at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala) ArchivingConsumer qa-f3-T-Campaign-Email 0 1 1 0 ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0 [2013-07-08 10:14:35,918] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [2013-07-08 10:14:35,923] INFO EventThread shut down (org.apache.zookeeper.ClientCnxn) [2013-07-08 10:14:35,923] INFO Session: 0x13fb0932c3a002a closed (org.apache.zookeeper.ZooKeeper)