I usually get this exception when I define > 2 partitions .. Current configuration :
Single Topic - 4 partitions 1 Consumers Group - 10 Threads On Wed, Sep 11, 2013 at 10:24 PM, prashant amar <amasin...@gmail.com> wrote: > From the broker log: > > > INFO Reconnect due to socket error: (kafka.consumer.SimpleConsumer) > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72) > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) > at > kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125) > at > kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72) > at > kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90) > at > kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90) > at > kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90) > at > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) > at scala.collection.immutable.List.foreach(List.scala:45) > at > kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89) > at > kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154) > at > kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154) > at > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) > at scala.collection.immutable.List.foreach(List.scala:45) > at > kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153) > at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala) > > > On Wed, Sep 11, 2013 at 10:11 PM, Jun Rao <jun...@gmail.com> wrote: > >> This means the broker somehow closed the socket connection. Anything in >> the >> broker log around the same time? >> >> Thanks, >> >> Jun >> >> >> On Wed, Sep 11, 2013 at 6:07 PM, prashant amar <amasin...@gmail.com> >> wrote: >> >> > Also noticed another issue >> > >> > Specified below is the current configuration >> > >> > Topic1 -> n Partitions -> 2 Consumer Groups (gr1 and gr2) >> > Topic2 -> n Partitions -> 2 Consumer Groups (gr1 and gr2) >> > >> > Notice that I have used the same naming convention on the consumer group >> > set i.e. 'gr1' and 'gr2' are consumer groups associated with 2 sets of >> > topics. >> > >> > On calling the *ConsumerOffsetChecker* API, I am receiving a >> > ClosedChannelException >> > >> > (Check Trace Below) >> > >> > Is there any namespace collision occurring here ? This issue is >> > reproducible with the following setup above >> > >> > >> > *bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group gr2 >> > --zkconnect localhost:2181* >> > >> > >> > 2013-09-12 01:01:59,701] INFO Initiating client connection, >> > connectString=localhost:2181 sessionTimeout=30000 >> > watcher=org.I0Itec.zkclient.ZkClient@3af0ce45 >> > (org.apache.zookeeper.ZooKeeper) >> > [2013-09-12 01:01:59,724] INFO Opening socket connection to server >> > localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn) >> > [2013-09-12 01:01:59,732] INFO Socket connection established to >> localhost/ >> > 127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn) >> > [2013-09-12 01:01:59,741] INFO Session establishment complete on server >> > localhost/127.0.0.1:2181, sessionid = 0x140924380790211, negotiated >> > timeout >> > = 30000 (org.apache.zookeeper.ClientCnxn) >> > [2013-09-12 01:01:59,744] INFO zookeeper state changed (SyncConnected) >> > (org.I0Itec.zkclient.ZkClient) >> > Group Topic Pid Offset >> logSize >> > Lag Owner >> > gr2 pe1 0 129985 >> 130625 >> > 640 none >> > gr2 pe1 1 0 0 >> > 0 none >> > gr2 pe2 0 130493 >> 130493 >> > 0 gr2_ip-XXXXXXXXXX-6c6f5d94-0 >> > [2013-09-12 01:02:00,514] INFO Reconnect due to socket error: >> > (kafka.consumer.SimpleConsumer) >> > java.nio.channels.ClosedChannelException >> > at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) >> > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72) >> > at >> > >> > >> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) >> > at >> kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125) >> > at >> > >> > >> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72) >> > at >> > >> > >> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90) >> > at >> > >> > >> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90) >> > at >> > >> > >> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90) >> > at >> > >> > >> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) >> > at scala.collection.immutable.List.foreach(List.scala:45) >> > at >> > >> > >> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89) >> > at >> > >> > >> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154) >> > at >> > >> > >> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154) >> > at >> > >> > >> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) >> > at scala.collection.immutable.List.foreach(List.scala:45) >> > at >> kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153) >> > at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala) >> > gr2 pe2 1 0 0 >> > 0 gr2_ip-XXXXXXX-6c6f5d94-1 >> > [2013-09-12 01:02:00,523] INFO Terminate ZkClient event thread. >> > (org.I0Itec.zkclient.ZkEventThread) >> > [2013-09-12 01:02:00,526] INFO Session: 0x140924380790211 closed >> > (org.apache.zookeeper.ZooKeeper) >> > [2013-09-12 01:02:00,526] INFO EventThread shut down >> > (org.apache.zookeeper.ClientCnxn) >> > >> > >> > >> > >> > >> > >> > >> > On Wed, Sep 11, 2013 at 5:46 PM, Neha Narkhede <neha.narkh...@gmail.com >> > >> > wrote: >> > >> > > I think you are hitting this - >> > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whysomeoftheconsumersinaconsumergroupneverreceiveanymessage%3F >> > > >> > > Let us know if we can improve the documentation to make it clearer. >> > > >> > > Thanks, >> > > Neha >> > > >> > > >> > > On Wed, Sep 11, 2013 at 5:28 PM, prashant amar <amasin...@gmail.com> >> > > wrote: >> > > >> > > > Also attempted another pattern where >> > > > >> > > > 1. Created a topic T with 'n' partitions. >> > > > 2. Created a consumer group process with 'n + 1' threads subscribing >> > from >> > > > topic 'T' with a groupID 'y' >> > > > 3. Added another consumer group process with 'n + 1' threads >> > subscribing >> > > > from same topic 'T' with same groupID 'z' >> > > > (Note that 2 and 3 subscribe from same topic but different groups) >> > > > >> > > > Can a single topic with multiple partitions abetted with multiple >> > > consumer >> > > > groups increase parallelism is consumption? >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > On Wed, Sep 11, 2013 at 4:48 PM, prashant amar <amasin...@gmail.com >> > >> > > > wrote: >> > > > >> > > > > A Design Question that needs verification: >> > > > > >> > > > > 1. Created a topic T with 'n' partitions. >> > > > > 2. Created a consumer group process with 'n + 1' threads >> subscribing >> > > from >> > > > > topic 'T' with a groupID 'y' >> > > > > 3. Added another consumer group process with 'n + 1' threads >> > > subscribing >> > > > > from same topic 'T' with same groupID 'y' >> > > > > >> > > > > On doing so, I noticed that the previous consumer group stops >> > consuming >> > > > > and the new consumer beings to consume >> > > > > >> > > > > I was attempting to model on demand parallelization in an event >> where >> > > an >> > > > > consumer group cannot keep up with the events produced. Rather >> than >> > > > > increase the threadpool capacity in the same process, it would >> make >> > > sense >> > > > > to distribute the load across multiple processes. >> > > > > >> > > > > Advice please? >> > > > > >> > > > > Regards >> > > > > Amardeep >> > > > > >> > > > >> > > >> > > >> > >> > >