Hi Gufran, Does the consumer has any messages to consume during these 10 seconds? There is also another config called "socket.timeout.ms", and if that amount of time has elapsed when there is no data coming back through the socket, the consumer will close automatically. From your logs it seems the consumer just shutdown itself without any errors thrown, so I am wondering if this would be the cause.
Guozhang On Mon, Mar 24, 2014 at 12:44 PM, Gufran Pathan <gufran.pat...@mu-sigma.com>wrote: > Hi, > > > > I'm facing an issue exactly similar to the one issued by someone else a > few days ago (see below for the previous thread transcript). > > > > I'm using a High Level Consumer java program to consume messages. The > consumer ends after 10 seconds (exactly the same time as faced by the other > user). I've tried increasing the "zookeeper.session.timeout.ms" to > "40000" and the "zookeeper.sync.time.ms" to "20000" but still no > difference. A fellow user suggested that the issue is due to GC and it > should be tuned. Any other thoughts? > > > > Attaching the logs and the Consumer code herewith. > > > > Here's the *log info I get from the zookeeper side*: > > > > *[2014-03-24 23:45:58,371] INFO Accepted socket connection from > /172.25.2.122:55327 <http://172.25.2.122:55327> > (org.apache.zookeeper.server.NIOServerCnxn)* > > *[2014-03-24 23:45:58,375] INFO Client attempting to establish new session > at /172.25.2.122:55327 <http://172.25.2.122:55327> > (org.apache.zookeeper.server.NIOServerCnxn)* > > *[2014-03-24 23:45:58,379] INFO Established session 0x144f5342bc60007 with > negotiated timeout 40000 for client /172.25.2.122:55327 > <http://172.25.2.122:55327> (org.apache.zookeeper.server.NIOServerCnxn)* > > *[2014-03-24 23:46:09,129] INFO Processed session termination for > sessionid: 0x144f5342bc60007 > (org.apache.zookeeper.server.PrepRequestProcessor)* > > *[2014-03-24 23:46:09,138] WARN EndOfStreamException: Unable to read > additional data from client sessionid 0x144f5342bc60007, likely client has > closed socket (org.apache.zookeeper.server.NIOServerCnxn)* > > *[2014-03-24 23:46:09,139] INFO Closed socket connection for client > /172.25.2.122:55327 <http://172.25.2.122:55327> which had sessionid > 0x144f5342bc60007 (org.apache.zookeeper.server.NIOServerCnxn)* > > > > *The Consumer logs*: > > > > *INFO 2014-03-24 23:45:58,413 [main] kafka.utils.VerifiableProperties - > Verifying properties* > > *INFO 2014-03-24 23:45:58,435 [main] kafka.utils.VerifiableProperties - > Property auto.commit.interval.ms <http://auto.commit.interval.ms> is > overridden to 1000* > > *INFO 2014-03-24 23:45:58,435 [main] kafka.utils.VerifiableProperties - > Property group.id <http://group.id> is overridden to group1* > > *INFO 2014-03-24 23:45:58,435 [main] kafka.utils.VerifiableProperties - > Property zookeeper.connect is overridden to 172.25.1.94:2181 > <http://172.25.1.94:2181>* > > *INFO 2014-03-24 23:45:58,436 [main] kafka.utils.VerifiableProperties - > Property zookeeper.session.timeout.ms <http://zookeeper.session.timeout.ms> > is overridden to 40000* > > *INFO 2014-03-24 23:45:58,436 [main] kafka.utils.VerifiableProperties - > Property zookeeper.sync.time.ms <http://zookeeper.sync.time.ms> is > overridden to 20000* > > *INFO 2014-03-24 23:45:58,485 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], Connecting to zookeeper instance > at 172.25.1.94:2181 <http://172.25.1.94:2181>* > > *INFO 2014-03-24 23:45:58,493 [ZkClient-EventThread-9-172.25.1.94:2181] > org.I0Itec.zkclient.ZkEventThread - Starting ZkClient event thread.* > > *INFO 2014-03-24 23:45:58,499 [main] org.apache.zookeeper.ZooKeeper - > Client environment:zookeeper.version=3.3.1-942149, built on 05/07/2010 > 17:14 GMT* > > *INFO 2014-03-24 23:45:58,499 [main] org.apache.zookeeper.ZooKeeper - > Client environment:host.name <http://host.name>=LAPSZ0914.mu-sigma.local* > > *INFO 2014-03-24 23:45:58,499 [main] org.apache.zookeeper.ZooKeeper - > Client environment:java.version=1.7.0_51* > > *INFO 2014-03-24 23:45:58,499 [main] org.apache.zookeeper.ZooKeeper - > Client environment:java.vendor=Oracle Corporation* > > *INFO 2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper - > Client environment:java.home=C:\Program Files\Java\jre7* > > *INFO 2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper - > Client > environment:java.class.path=C:\Users\*****\NewWorkSpace\KafkaNew\target\classes;D:\Modules\sl4j\slf4j-1.7.6\slf4j-log4j12-1.7.6.jar;C:\Users\*****\.m2\repository\org\apache\kafka\kafka_2.9.1\0.8.0-beta1\kafka_2.9.1-0.8.0-beta1.jar;C:\Users\*****\.m2\repository\org\scala-lang\scala-library\2.9.1\scala-library-2.9.1.jar;C:\Users\*****\.m2\repository\org\scala-lang\scala-compiler\2.9.1\scala-compiler-2.9.1.jar;C:\Users\*****\.m2\repository\com\yammer\metrics\metrics-core\2.2.0\metrics-core-2.2.0.jar;C:\Users\*****\.m2\repository\com\yammer\metrics\metrics-annotation\2.2.0\metrics-annotation-2.2.0.jar;C:\Users\*****\.m2\repository\com\101tec\zkclient\0.3\zkclient-0.3.jar;C:\Users\*****\.m2\repository\org\apache\zookeeper\zookeeper\3.3.1\zookeeper-3.3.1.jar;C:\Users\*****\.m2\repository\jline\jline\0.9.94\jline-0.9.94.jar;C:\Users\*****\.m2\repository\log4j\log4j\1.2.14\log4j-1.2.14.jar;C:\Users\*****\.m2\repository\net\sf\jopt-simple\jopt-simple\3.2\jopt-simple-3.2.jar;C:\Users\*****\.m2\repository\junit\junit\3.8.1\junit-3.8.1.jar;C:\Users\*****\.m2\repository\org\slf4j\slf4j-api\1.7.6\slf4j-api-1.7.6.jar* > > *INFO 2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper - > Client environment:java.library.path=C:\Program > Files\Java\jre7\bin;C:\Windows\Sun\Java\bin;C:\Windows\system32;C:\Windows;C:/Program > Files/Java/jre7/bin/client;C:/Program Files/Java/jre7/bin;C:/Program > Files/Java/jre7/lib/i386;C:\Python27\;C:\Python27\Scripts;C:\Program > Files\Common Files\Microsoft Shared\Microsoft Online Services;C:\Program > Files\RSA SecurID Token Common;C:\Program Files\Intel\iCLS > Client\;C:\Windows\system32;C:\Windows;C:\Windows\System32\Wbem;C:\Windows\System32\WindowsPowerShell\v1.0\;C:\Program > Files\Intel\Intel(R) Management Engine Components\DAL;C:\Program > Files\Intel\Intel(R) Management Engine Components\IPT;C:\Program > Files\Intel\OpenCL SDK\2.0\bin\x86;c:\Program Files\Microsoft SQL > Server\100\Tools\Binn\VSShell\Common7\IDE\;c:\Program Files\Microsoft SQL > Server\100\Tools\Binn\;c:\Program Files\Microsoft SQL > Server\100\DTS\Binn\;C:\Program Files\SAS\SharedFiles\Formats;C:\Program > Files\Java\jdk1.7.0_51\bin;C:\Program > Files\apache-maven-3.0.5\bin;C:\Program Files\Git\cmd; > D:\Modules\Storm\apache-storm-0.9.1-incubating\bin;C:\Program > Files\GnuWin32\bin;C:\Program > Files\sbt\\bin;C:\Users\*****\AppData\Roaming\Python\Scripts;C:\Program > Files\Apache Software Foundation\apache-maven-3.1.1\bin;D:\Installation > Deck\Software-2\Eclipse\eclipse-jee-juno-win32\eclipse;;.* > > *INFO 2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper - > Client environment:java.io.tmpdir=C:\Users\GUFRAN~1.PAT\AppData\Local\Temp\* > > *INFO 2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper - > Client environment:java.compiler=<NA>* > > *INFO 2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper - > Client environment:os.name <http://os.name>=Windows 7* > > *INFO 2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper - > Client environment:os.arch=x86* > > *INFO 2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper - > Client environment:os.version=6.1* > > *INFO 2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper - > Client environment:user.name <http://user.name>=****** > > *INFO 2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper - > Client environment:user.home=C:\Users\****** > > *INFO 2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper - > Client environment:user.dir=C:\Users\*****\NewWorkSpace\KafkaNew* > > *INFO 2014-03-24 23:45:58,501 [main] org.apache.zookeeper.ZooKeeper - > Initiating client connection, connectString=172.25.1.94:2181 > <http://172.25.1.94:2181> sessionTimeout=40000 > watcher=org.I0Itec.zkclient.ZkClient@1d08c1b* > > *INFO 2014-03-24 23:45:58,520 [main-SendThread()] > org.apache.zookeeper.ClientCnxn - Opening socket connection to server > /172.25.1.94:2181 <http://172.25.1.94:2181>* > > *INFO 2014-03-24 23:45:58,526 [main-SendThread(vm.centos.com:2181 > <http://vm.centos.com:2181>)] org.apache.zookeeper.ClientCnxn - Socket > connection established to vm.centos.com/172.25.1.94:2181 > <http://vm.centos.com/172.25.1.94:2181>, initiating session* > > *INFO 2014-03-24 23:45:58,538 [main-SendThread(vm.centos.com:2181 > <http://vm.centos.com:2181>)] org.apache.zookeeper.ClientCnxn - Session > establishment complete on server vm.centos.com/172.25.1.94:2181 > <http://vm.centos.com/172.25.1.94:2181>, sessionid = 0x144f5342bc60007, > negotiated timeout = 40000* > > *INFO 2014-03-24 23:45:58,540 [main-EventThread] > org.I0Itec.zkclient.ZkClient - zookeeper state changed (SyncConnected)* > > *INFO 2014-03-24 23:45:58,559 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], starting auto committer every > 1000 ms* > > *INFO 2014-03-24 23:45:58,656 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], begin registering consumer > group1_LAPSZ0914-1395684958481-7fc24c0a in ZK* > > *INFO 2014-03-24 23:45:58,680 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], end registering consumer > group1_LAPSZ0914-1395684958481-7fc24c0a in ZK* > > *INFO 2014-03-24 23:45:58,682 > [group1_LAPSZ0914-1395684958481-7fc24c0a_watcher_executor] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], starting watcher executor thread > for consumer group1_LAPSZ0914-1395684958481-7fc24c0a* > > *INFO 2014-03-24 23:45:58,710 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], begin rebalancing consumer > group1_LAPSZ0914-1395684958481-7fc24c0a try #0* > > *INFO 2014-03-24 23:45:58,867 [main] kafka.utils.VerifiableProperties - > Verifying properties* > > *INFO 2014-03-24 23:45:58,867 [main] kafka.utils.VerifiableProperties - > Property metadata.broker.list is overridden to vm:9092* > > *INFO 2014-03-24 23:45:58,867 [main] kafka.utils.VerifiableProperties - > Property request.timeout.ms <http://request.timeout.ms> is overridden to > 30000* > > *INFO 2014-03-24 23:45:58,867 [main] kafka.utils.VerifiableProperties - > Property client.id <http://client.id> is overridden to group1* > > *INFO 2014-03-24 23:45:58,888 [main] kafka.client.ClientUtils$ - > Fetching metadata from broker id:0,host:vm,port:9092 with correlation id 0 > for 1 topic(s) Set(partitioned)* > > *INFO 2014-03-24 23:45:58,893 [main] kafka.producer.SyncProducer - > Connected to vm:9092 for producing* > > *INFO 2014-03-24 23:45:58,914 [main] kafka.producer.SyncProducer - > Disconnecting from vm:9092* > > *INFO 2014-03-24 23:45:58,924 [main] > kafka.consumer.ConsumerFetcherManager - > [ConsumerFetcherManager-1395684958545] Stopping leader finder thread* > > *INFO 2014-03-24 23:45:58,925 [main] > kafka.consumer.ConsumerFetcherManager - > [ConsumerFetcherManager-1395684958545] Stopping all fetchers* > > *INFO 2014-03-24 23:45:58,926 [main] > kafka.consumer.ConsumerFetcherManager - > [ConsumerFetcherManager-1395684958545] All connections stopped* > > *INFO 2014-03-24 23:45:58,927 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], Cleared all relevant queues for > this fetcher* > > *INFO 2014-03-24 23:45:58,928 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], Cleared the data chunks in all > the consumer message iterators* > > *INFO 2014-03-24 23:45:58,928 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], Committing all offsets after > clearing the fetcher queues* > > *INFO 2014-03-24 23:45:58,929 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], Releasing partition ownership* > > *INFO 2014-03-24 23:45:58,932 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], Consumer > group1_LAPSZ0914-1395684958481-7fc24c0a rebalancing the following > partitions: ArrayBuffer(0, 1, 2, 3, 4) for topic partitioned with > consumers: List(group1_LAPSZ0914-1395684958481-7fc24c0a-0, > group1_LAPSZ0914-1395684958481-7fc24c0a-1, > group1_LAPSZ0914-1395684958481-7fc24c0a-2, > group1_LAPSZ0914-1395684958481-7fc24c0a-3, > group1_LAPSZ0914-1395684958481-7fc24c0a-4)* > > *INFO 2014-03-24 23:45:58,934 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], > group1_LAPSZ0914-1395684958481-7fc24c0a-4 attempting to claim partition 4* > > *INFO 2014-03-24 23:45:58,939 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], > group1_LAPSZ0914-1395684958481-7fc24c0a-3 attempting to claim partition 3* > > *INFO 2014-03-24 23:45:58,942 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], > group1_LAPSZ0914-1395684958481-7fc24c0a-1 attempting to claim partition 1* > > *INFO 2014-03-24 23:45:58,949 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], > group1_LAPSZ0914-1395684958481-7fc24c0a-2 attempting to claim partition 2* > > *INFO 2014-03-24 23:45:58,953 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], > group1_LAPSZ0914-1395684958481-7fc24c0a-0 attempting to claim partition 0* > > *INFO 2014-03-24 23:45:58,967 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], > group1_LAPSZ0914-1395684958481-7fc24c0a-1 successfully owned partition 1 > for topic partitioned* > > *INFO 2014-03-24 23:45:58,970 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], > group1_LAPSZ0914-1395684958481-7fc24c0a-2 successfully owned partition 2 > for topic partitioned* > > *INFO 2014-03-24 23:45:58,973 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], > group1_LAPSZ0914-1395684958481-7fc24c0a-0 successfully owned partition 0 > for topic partitioned* > > *INFO 2014-03-24 23:45:58,977 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], > group1_LAPSZ0914-1395684958481-7fc24c0a-4 successfully owned partition 4 > for topic partitioned* > > *INFO 2014-03-24 23:45:58,980 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], > group1_LAPSZ0914-1395684958481-7fc24c0a-3 successfully owned partition 3 > for topic partitioned* > > *INFO 2014-03-24 23:45:58,981 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], Updating the cache* > > *INFO 2014-03-24 23:45:58,984 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], Consumer > group1_LAPSZ0914-1395684958481-7fc24c0a selected partitions : > partitioned:0: fetched offset = 1939: consumed offset = 1939,partitioned:1: > fetched offset = 1969: consumed offset = 1969,partitioned:2: fetched offset > = 30917: consumed offset = 30917,partitioned:3: fetched offset = 52147: > consumed offset = 52147,partitioned:4: fetched offset = 1876: consumed > offset = 1876* > > *INFO 2014-03-24 23:45:58,988 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], end rebalancing consumer > group1_LAPSZ0914-1395684958481-7fc24c0a try #0* > > *INFO 2014-03-24 23:45:58,989 > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread] > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread - > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread], Starting * > > * INFO 2014-03-24 23:45:59,000 > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread] > kafka.utils.VerifiableProperties - Verifying properties* > > *INFO 2014-03-24 23:45:59,000 > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread] > kafka.utils.VerifiableProperties - Property metadata.broker.list is > overridden to vm:9092* > > *INFO 2014-03-24 23:45:59,000 > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread] > kafka.utils.VerifiableProperties - Property request.timeout.ms > <http://request.timeout.ms> is overridden to 30000* > > *INFO 2014-03-24 23:45:59,000 > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread] > kafka.utils.VerifiableProperties - Property client.id <http://client.id> > is overridden to group1* > > *INFO 2014-03-24 23:45:59,001 > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread] > kafka.client.ClientUtils$ - Fetching metadata from broker > id:0,host:vm,port:9092 with correlation id 0 for 1 topic(s) > Set(partitioned)* > > *INFO 2014-03-24 23:45:59,004 > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread] > kafka.producer.SyncProducer - Connected to vm:9092 for producing* > > *INFO 2014-03-24 23:45:59,043 > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread] > kafka.producer.SyncProducer - Disconnecting from vm:9092* > > *INFO 2014-03-24 23:45:59,056 > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread] > kafka.consumer.ConsumerFetcherManager - > [ConsumerFetcherManager-1395684958545] Adding fetcher for partition > [partitioned,2], initOffset 30917 to broker 0 with fetcherId 0* > > *INFO 2014-03-24 23:45:59,056 > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread] > kafka.consumer.ConsumerFetcherManager - > [ConsumerFetcherManager-1395684958545] Adding fetcher for partition > [partitioned,3], initOffset 52147 to broker 0 with fetcherId 0* > > *INFO 2014-03-24 23:45:59,056 > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread] > kafka.consumer.ConsumerFetcherManager - > [ConsumerFetcherManager-1395684958545] Adding fetcher for partition > [partitioned,1], initOffset 1969 to broker 0 with fetcherId 0* > > *INFO 2014-03-24 23:45:59,056 > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread] > kafka.consumer.ConsumerFetcherManager - > [ConsumerFetcherManager-1395684958545] Adding fetcher for partition > [partitioned,0], initOffset 1939 to broker 0 with fetcherId 0* > > *INFO 2014-03-24 23:45:59,056 > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread] > kafka.consumer.ConsumerFetcherManager - > [ConsumerFetcherManager-1395684958545] Adding fetcher for partition > [partitioned,4], initOffset 1876 to broker 0 with fetcherId 0* > > *INFO 2014-03-24 23:45:59,058 > [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0] > kafka.consumer.ConsumerFetcherThread - > [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0], > Starting * > > * INFO 2014-03-24 23:46:08,991 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], ZKConsumerConnector shutting > down* > > *INFO 2014-03-24 23:46:08,992 [main] kafka.utils.KafkaScheduler - > Forcing shutdown of Kafka scheduler* > > *INFO 2014-03-24 23:46:08,992 [main] > kafka.consumer.ConsumerFetcherManager - > [ConsumerFetcherManager-1395684958545] Stopping leader finder thread* > > *INFO 2014-03-24 23:46:08,992 [main] > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread - > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread], Shutting > down* > > *INFO 2014-03-24 23:46:08,993 > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread] > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread - > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread], Stopped * > > * INFO 2014-03-24 23:46:08,993 [main] > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread - > [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread], Shutdown > completed* > > *INFO 2014-03-24 23:46:08,993 [main] > kafka.consumer.ConsumerFetcherManager - > [ConsumerFetcherManager-1395684958545] Stopping all fetchers* > > *INFO 2014-03-24 23:46:08,993 [main] > kafka.consumer.ConsumerFetcherThread - > [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0], > Shutting down* > > *INFO 2014-03-24 23:46:09,257 > [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0] > kafka.consumer.SimpleConsumer - Reconnect due to socket error: * > > * java.nio.channels.ClosedByInterruptException* > > * at > java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown Source)* > > * at sun.nio.ch.SocketChannelImpl.read(Unknown Source)* > > * at > sun.nio.ch.SocketAdaptor$SocketInputStream.read(Unknown Source)* > > * at sun.nio.ch.ChannelInputStream.read(Unknown Source)* > > * at > java.nio.channels.Channels$ReadableByteChannelImpl.read(Unknown Source)* > > * at kafka.utils.Utils$.read(Utils.scala:394)* > > * at > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)* > > * at > kafka.network.Receive$class.readCompletely(Transmission.scala:56)* > > * at > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)* > > * at > kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)* > > * at > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)* > > * at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)* > > * at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)* > > * at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)* > > * at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)* > > * at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)* > > * at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)* > > * at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)* > > * at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)* > > * at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)* > > * at > kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)* > > * at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)* > > * at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)* > > * at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)* > > *INFO 2014-03-24 23:46:09,259 > [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0] > kafka.consumer.ConsumerFetcherThread - > [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0], > Stopped * > > * INFO 2014-03-24 23:46:09,259 [main] > kafka.consumer.ConsumerFetcherThread - > [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0], > Shutdown completed* > > *INFO 2014-03-24 23:46:09,259 [main] > kafka.consumer.ConsumerFetcherManager - > [ConsumerFetcherManager-1395684958545] All connections stopped* > > *INFO 2014-03-24 23:46:09,283 [ZkClient-EventThread-9-172.25.1.94:2181] > org.I0Itec.zkclient.ZkEventThread - Terminate ZkClient event thread.* > > *INFO 2014-03-24 23:46:09,293 [main] org.apache.zookeeper.ZooKeeper - > Session: 0x144f5342bc60007 closed* > > *INFO 2014-03-24 23:46:09,294 [main] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], ZKConsumerConnector shut down > completed* > > *INFO 2014-03-24 23:46:09,694 > [group1_LAPSZ0914-1395684958481-7fc24c0a_watcher_executor] > kafka.consumer.ZookeeperConsumerConnector - > [group1_LAPSZ0914-1395684958481-7fc24c0a], stopping watcher executor thread > for consumer group1_LAPSZ0914-1395684958481-7fc24c0a* > > > > *My Consumer code*: > > > > *import kafka.consumer.ConsumerConfig;* > > *import kafka.consumer.KafkaStream;* > > *import kafka.javaapi.consumer.ConsumerConnector;* > > *import java.util.HashMap;* > > *import java.util.List;* > > *import java.util.Map;* > > *import java.util.Properties;* > > *import java.util.concurrent.ExecutorService;* > > *import java.util.concurrent.Executors;* > > > > *public class TestConsumer {* > > * private final ConsumerConnector consumer;* > > * private final String topic;* > > * private ExecutorService executor;* > > > > * public TestConsumer(String a_zookeeper, String > a_groupId, String a_topic) {* > > * consumer = > kafka.consumer.Consumer.createJavaConsumerConnector(* > > * createConsumerConfig(a_zookeeper, > a_groupId));* > > * this.topic = a_topic;* > > * }* > > > > * public void shutdown() {* > > * if (consumer != null) consumer.shutdown();* > > * if (executor != null) executor.shutdown();* > > * }* > > > > * public void run(int a_numThreads) {* > > * Map<String, Integer> topicCountMap = new > HashMap<String, Integer>();* > > * topicCountMap.put(topic, new > Integer(a_numThreads));* > > * Map<String, List<KafkaStream<byte[], byte[]>>> > consumerMap = consumer.createMessageStreams(topicCountMap);* > > * List<KafkaStream<byte[], byte[]>> streams = > consumerMap.get(topic);* > > > > * // now launch all the threads* > > * //* > > * executor = > Executors.newFixedThreadPool(a_numThreads);* > > > > * // now create an object to consume the messages* > > * //* > > * int threadNumber = 0;* > > * for (final KafkaStream stream : streams) {* > > * executor.submit(new ConsumerTest(stream, > threadNumber));* > > * threadNumber++;* > > * }* > > * }* > > > > * private static ConsumerConfig > createConsumerConfig(String a_zookeeper, String a_groupId) {* > > * Properties props = new Properties();* > > * props.put("zookeeper.connect", a_zookeeper);* > > * props.put("group.id <http://group.id>", > a_groupId);* > > * props.put("zookeeper.session.timeout.ms > <http://zookeeper.session.timeout.ms>", "40000");* > > * props.put("zookeeper.sync.time.ms > <http://zookeeper.sync.time.ms>", "20000");* > > * props.put("auto.commit.interval.ms > <http://auto.commit.interval.ms>", "1000");* > > > > * return new ConsumerConfig(props);* > > * }* > > > > * public static void main(String[] args) {* > > * String zooKeeper = "172.25.1.94:2181 > <http://172.25.1.94:2181>";* > > * String groupId = "group1";* > > * String topic = "partitioned";* > > * int threads = 5;* > > > > * TestConsumer example = new > TestConsumer(zooKeeper, groupId, topic);* > > * example.run(threads);* > > > > * try {* > > * Thread.sleep(10000);* > > * } catch (InterruptedException ie) {* > > > > * }* > > * example.shutdown();* > > * }* > > > > > > *}* > > > > > > *From* > > Neha Narkhede <neha.narkh...@gmail.com> > > *Subject* > > Re: Kafka High Level Consumer Connector shuts down after 10 seconds > > *Date* > > Mon, 10 Mar 2014 16:48:31 GMT > > Session termination can happen either when client or zookeeper process > > pauses (due to GC) or when the client process terminates. A sustainable > > solution is to tune GC settings. For now, you can try increasing the > > zookeeper.session.timeout.ms. > > > > > > > > > > On Sun, Mar 9, 2014 at 3:44 PM, Ameya Bhagat <ameya.bha...@gmail.com> > wrote: > > > > > I am using a high level consumer as described at: > > > https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example > > > > > > I am noticing that my consumer does not run forever and ends after some > > > time (< 15s). At the zookeeper side, I see the following: > > > > > > INFO Processed session termination for sessionid: 0x144a4854325004d > > > (org.apache.zookeeper.server.PrepRequestProcessor) > > > INFO Closed socket connection for client /127.0.0.1:59899 which had > > > sessionid 0x144a4854325004d (org.apache.zookeeper.server.NIOServerCnxn) > > > > > > I am using default configurations. How do I make my consumer listen > > > forever? > > > > > > Thanks > > > Ameya > > > > > > > > > -- > > Thanks & Regards, > > Gufran Pathan *| *+91-9566811502 *| *www.mu-sigma.com *|* > > > Disclaimer: http://www.mu-sigma.com/disclaimer.html > -- -- Guozhang