consumer offset not saved in zk
Hello All, I recently started experimenting Kafka for my usecase. I am running 0.8 in two node kafka setup. I produced 20messages using a java program(1 partition with 2 replicas) and I am running the consumer code as given in the example https://cwiki.apache.org/KAFKA/consumer-group-example.html. Consumer consumes all the messages and after 10sec program shuts down. Based on the configuration in the example, i am expecting that consumer offsets will be saved on ZK. If i start the program again, i should not be consuming the same messages again. But i am seeing different behavior. Messages are getting replayed. I am not seeing any update in ZK also. [zk: localhost:2181(CONNECTED) 13] get /consumers/1 null cZxid = 0x8009ff0ee ctime = Wed May 22 16:59:21 PDT 2013 mZxid = 0x8009ff0ee mtime = Wed May 22 16:59:21 PDT 2013 pZxid = 0x8009ff0f4 cversion = 2 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 0 numChildren = 2 Could any one of you explain me what could be the issue? Ravi
Re: consumer offset not saved in zk
Neha, below are my properties. I tried adding consumer.timeout.ms=3000 or 1 also. Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", "1"); props.put("zookeeper.session.timeout.ms", "4000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "10"); props.put("autocommit.enable", true); did not help. On Thu, May 23, 2013 at 6:48 AM, Neha Narkhede wrote: > I suspect you had auto.commit.enable=false and consumer.timeout.ms=1. > Can you confirm the values for the above configs in your example? > > Thanks, > Neha > On May 22, 2013 11:22 PM, "rk vishu" wrote: > > > Hello All, > > > > I recently started experimenting Kafka for my usecase. I am running 0.8 > in > > two node kafka setup. > > > > I produced 20messages using a java program(1 partition with 2 replicas) > and > > I am running the consumer code as given in the example > > https://cwiki.apache.org/KAFKA/consumer-group-example.html. Consumer > > consumes all the messages and after 10sec program shuts down. Based on > the > > configuration in the example, i am expecting that consumer offsets will > be > > saved on ZK. If i start the program again, i should not be consuming the > > same messages again. But i am seeing different behavior. Messages are > > getting replayed. I am not seeing any update in ZK also. > > > > [zk: localhost:2181(CONNECTED) 13] get /consumers/1 > > null > > cZxid = 0x8009ff0ee > > ctime = Wed May 22 16:59:21 PDT 2013 > > mZxid = 0x8009ff0ee > > mtime = Wed May 22 16:59:21 PDT 2013 > > pZxid = 0x8009ff0f4 > > cversion = 2 > > dataVersion = 0 > > aclVersion = 0 > > ephemeralOwner = 0x0 > > dataLength = 0 > > numChildren = 2 > > > > > > Could any one of you explain me what could be the issue? > > > > Ravi > > >
Re: consumer offset not saved in zk
My ZK directory listing is as below. Looks like offsets path is not even created. zk: localhost:2181(CONNECTED) 0] ls / [hadoop-ha, hbase, zookeeper, consumers, controller, storm, brokers, controller_epoch] [zk: localhost:2181(CONNECTED) 1] ls /consumers [1, das-service] [zk: localhost:2181(CONNECTED) 2] ls /consumers/1 [owners, ids] [zk: localhost:2181(CONNECTED) 3] ls /consumers/1/offsets Node does not exist: /consumers/1/offsets [zk: localhost:2181(CONNECTED) 4] On Thu, May 23, 2013 at 7:08 AM, Jun Rao wrote: > You are looking at the wrong path in ZK. The correct path for consumer > offset is /consumers/[groupId]/offsets/[topic]/[partitionId] -> long > (offset). For more details on our ZK layout, see > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper > > Thanks, > > Jun > > > On Wed, May 22, 2013 at 11:21 PM, rk vishu wrote: > > > Hello All, > > > > I recently started experimenting Kafka for my usecase. I am running 0.8 > in > > two node kafka setup. > > > > I produced 20messages using a java program(1 partition with 2 replicas) > and > > I am running the consumer code as given in the example > > https://cwiki.apache.org/KAFKA/consumer-group-example.html. Consumer > > consumes all the messages and after 10sec program shuts down. Based on > the > > configuration in the example, i am expecting that consumer offsets will > be > > saved on ZK. If i start the program again, i should not be consuming the > > same messages again. But i am seeing different behavior. Messages are > > getting replayed. I am not seeing any update in ZK also. > > > > [zk: localhost:2181(CONNECTED) 13] get /consumers/1 > > null > > cZxid = 0x8009ff0ee > > ctime = Wed May 22 16:59:21 PDT 2013 > > mZxid = 0x8009ff0ee > > mtime = Wed May 22 16:59:21 PDT 2013 > > pZxid = 0x8009ff0f4 > > cversion = 2 > > dataVersion = 0 > > aclVersion = 0 > > ephemeralOwner = 0x0 > > dataLength = 0 > > numChildren = 2 > > > > > > Could any one of you explain me what could be the issue? > > > > Ravi > > >
Re: consumer offset not saved in zk
Neha, Thanks for pointing out the log4j. I turned on logs at INFO level. Now i see some warnings as below. WARN [Kafka-consumer-autocommit-1] (Logging.scala:88) - [1_BELC02K41GGDKQ4.sea.corp.expecn.com-1369346576173-22148419], exception during commitOffsets java.lang.NoSuchMethodError: org.I0Itec.zkclient.ZkClient.writeData(Ljava/lang/String;Ljava/lang/Object;)Lorg/apache/zookeeper/data/Stat; at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:304) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$5.apply(ZookeeperConsumerConnector.scala:253) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$5.apply(ZookeeperConsumerConnector.scala:250) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:549) at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) at scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:570) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:250) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:248) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at kafka.utils.Pool$$anon$1.foreach(Pool.scala:83) at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) at kafka.utils.Pool.foreach(Pool.scala:27) at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:248) at kafka.consumer.ZookeeperConsumerConnector.autoCommit(ZookeeperConsumerConnector.scala:234) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:117) at kafka.utils.Utils$$anon$2.run(Utils.scala:67) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:680) below are is what i am using in my maven dependencies for ZK. Looks like zkclient 0.1 has method with void return type for org.I0Itec.zkclient.ZkClient.writeData. Do i need to use different dependency? org.scala-lang scala-library 2.8.1 com.github.sgroschupf zkclient 0.1 org.apache.zookeeper zookeeper 3.4.5 com.sun.jmx jmxri com.sun.jdmk jmxtools javax.jms jms On Thu, May 23, 2013 at 2:39 PM, Neha Narkhede wrote: > You don't want to override the default configs. Also, seems like something > else is wrong with your setup ? Could you share the log4j logs of your > consumer ? Meanwhile, can you try if you can use the console consumer > successfully ? > > Thanks, > Neha > > > On Thu, May 23, 2013 at 2:31 PM, rk vishu wrote: > > > My ZK directory listing is as below. Looks like offsets path is not even > > created. > > > > zk: localhost:2181(CONNECTED) 0] ls / > > [hadoop-ha, hbase, zookeeper, consumers, controller, storm, brokers, > > controller_epoch] > > [zk: localhost:2181(CONNECTED) 1] ls /consumers > > [1, das-service] > > [zk: localhost:2181(CONNECTED) 2] ls /consumers/1 > > [owners, ids] > > [zk: localhost:2181(CONNECTED) 3] ls /consumers/1/offsets > > Node does not exist: /consumers/1/offsets > > [zk: localhost:2181(CONNECTED) 4] > > > > > > On Thu, May 23, 2013 at 7:08 AM, Jun Rao wrote: > > > > > You are looking at the wrong path in ZK. The correct path for consumer > > > offset is /consumers/[groupId]/offsets/[topic]/[partitionId] -> long > > > (offset). For more details on our ZK layout, see > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper > > > > > &
Re: consumer offset not saved in zk
Neha, I see the point. I verified zkclient version from kafka build and found that it is 0.2. i updated my client app's POM to include the following (corrected from 0.1) com.101tec zkclient 0.2 Thank you very much for the inputs and help. On Thu, May 23, 2013 at 3:23 PM, Neha Narkhede wrote: > Can you please try the following - > > ./sbt clean assembly-package-dependency package > > Thanks, > Neha > > > On Thu, May 23, 2013 at 3:16 PM, rk vishu wrote: > > > Neha, > > > > Thanks for pointing out the log4j. I turned on logs at INFO level. Now i > > see some warnings as below. > > > > WARN [Kafka-consumer-autocommit-1] (Logging.scala:88) - > > [1_BELC02K41GGDKQ4.sea.corp.expecn.com-1369346576173-22148419], exception > > during commitOffsets > > java.lang.NoSuchMethodError: > > > > > org.I0Itec.zkclient.ZkClient.writeData(Ljava/lang/String;Ljava/lang/Object;)Lorg/apache/zookeeper/data/Stat; > > at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:304) > > at > > > > > kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$5.apply(ZookeeperConsumerConnector.scala:253) > > at > > > > > kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$5.apply(ZookeeperConsumerConnector.scala:250) > > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > > at > > > > > scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:549) > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > > at > > > > > scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:570) > > at > > > > > kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:250) > > at > > > > > kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:248) > > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > > at kafka.utils.Pool$$anon$1.foreach(Pool.scala:83) > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > > at kafka.utils.Pool.foreach(Pool.scala:27) > > at > > > > > kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:248) > > at > > > > > kafka.consumer.ZookeeperConsumerConnector.autoCommit(ZookeeperConsumerConnector.scala:234) > > at > > > > > kafka.consumer.ZookeeperConsumerConnector$$anonfun$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:117) > > at kafka.utils.Utils$$anon$2.run(Utils.scala:67) > > at > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) > > at > > > java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) > > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) > > at > > > > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) > > at > > > > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180) > > at > > > > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204) > > at > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) > > at > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) > > at java.lang.Thread.run(Thread.java:680) > > > > below are is what i am using in my maven dependencies for ZK. Looks like > > zkclient 0.1 has method with void return type for > > org.I0Itec.zkclient.ZkClient.writeData. Do i need to use different > > dependency? > > > > > > org.scala-lang > > scala-library > > 2.8.1 > > > > > > com.github.sgroschupf > > zkclient > > 0.1 > > > > > > org.apache.zookeeper > > zookeeper > > 3.4.5 > > > > > > com.sun.jmx > > jmxri > > > > > > com.sun.jdmk > > jmxtools > > > > > >
Recommendation for number of brokers on kafka(0.7.2) hosts
Hello All, I am currently using 5 node kafka cluster with 0.7.2 version. Would like to get some advice on optimal number of brokers on each kafka host. Below is the specification of each machine - 4 data directories /data1,/data2, /data3, /data4 with 200+GB usable space. RAID10 - 24 Core CPU - 32GB RAM >From the server properties, i believe we can only configure one data log.dir. So below are my questions 1) Will setting 4 brokers per host with different ports and different log data directories be beneficial to use all the available space? 2) Will there be any disadvantage using multiple brokers on same host? 3) If partition count is set to 2 per topic. Having multiple brokers on each machine , multiply the partition count per machine? i.e with 4 brokers per machine on 5 machines will generate 2*4*5 = 40 partitions per topic? 4) Is there a way to use all the directories from a single broker? Thanks and Regards Ravi
Re: Recommendation for number of brokers on kafka(0.7.2) hosts
Thank you Neha for the suggestion. On Tue, Oct 1, 2013 at 1:50 PM, Neha Narkhede wrote: > 1) Will setting 4 brokers per host with different ports and different log > data directories be beneficial to use all the available space? > 2) Will there be any disadvantage using multiple brokers on same host? > > It is recommended that you do not deploy multiple brokers on the same box > since that will not lead to optimal page cache usage, which will affect IO > performance. > > 3) If partition count is set to 2 per topic. Having multiple brokers on > each machine , multiply the partition count per machine? i.e with 4 brokers > per machine on 5 machines will generate 2*4*5 = 40 partitions per topic? > > Correct. > > 4) Is there a way to use all the directories from a single broker? > > Kafka 0.8 supports specifying multiple log directories through the log.dirs > config. > > Thanks, > Neha > > > On Tue, Oct 1, 2013 at 1:17 PM, rk vishu wrote: > > > Hello All, > > > > I am currently using 5 node kafka cluster with 0.7.2 version. Would like > to > > get some advice on optimal number of brokers on each kafka host. Below is > > the specification of each machine > > > > - 4 data directories /data1,/data2, /data3, /data4 with 200+GB usable > > space. RAID10 > > - 24 Core CPU > > - 32GB RAM > > > > From the server properties, i believe we can only configure one data > > log.dir. > > So below are my questions > > 1) Will setting 4 brokers per host with different ports and different log > > data directories be beneficial to use all the available space? > > 2) Will there be any disadvantage using multiple brokers on same host? > > 3) If partition count is set to 2 per topic. Having multiple brokers on > > each machine , multiply the partition count per machine? i.e with 4 > brokers > > per machine on 5 machines will generate 2*4*5 = 40 partitions per topic? > > 4) Is there a way to use all the directories from a single broker? > > > > Thanks and Regards > > Ravi > > >