consumer offset not saved in zk

2013-05-22 Thread rk vishu
Hello All,

I recently started experimenting Kafka for my usecase. I am running 0.8 in
two node kafka setup.

I produced 20messages using a java program(1 partition with 2 replicas) and
I am running the consumer code as given in the example
https://cwiki.apache.org/KAFKA/consumer-group-example.html. Consumer
consumes all the messages and after 10sec program shuts down. Based on the
configuration in the example, i am expecting that consumer offsets will be
saved on ZK. If i start the program again, i should not be consuming the
same messages again. But i am seeing different behavior. Messages are
getting replayed. I am not seeing any update in ZK also.

[zk: localhost:2181(CONNECTED) 13] get /consumers/1
null
cZxid = 0x8009ff0ee
ctime = Wed May 22 16:59:21 PDT 2013
mZxid = 0x8009ff0ee
mtime = Wed May 22 16:59:21 PDT 2013
pZxid = 0x8009ff0f4
cversion = 2
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 2


Could any one of you explain me what could be the issue?

Ravi


Re: consumer offset not saved in zk

2013-05-23 Thread rk vishu
Neha,

below are my properties. I tried adding consumer.timeout.ms=3000 or 1
also.

 Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", "1");
props.put("zookeeper.session.timeout.ms", "4000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "10");
props.put("autocommit.enable", true);

did not help.



On Thu, May 23, 2013 at 6:48 AM, Neha Narkhede wrote:

> I suspect you had auto.commit.enable=false and consumer.timeout.ms=1.
> Can you confirm the values for the above configs in your example?
>
> Thanks,
> Neha
> On May 22, 2013 11:22 PM, "rk vishu"  wrote:
>
> > Hello All,
> >
> > I recently started experimenting Kafka for my usecase. I am running 0.8
> in
> > two node kafka setup.
> >
> > I produced 20messages using a java program(1 partition with 2 replicas)
> and
> > I am running the consumer code as given in the example
> > https://cwiki.apache.org/KAFKA/consumer-group-example.html. Consumer
> > consumes all the messages and after 10sec program shuts down. Based on
> the
> > configuration in the example, i am expecting that consumer offsets will
> be
> > saved on ZK. If i start the program again, i should not be consuming the
> > same messages again. But i am seeing different behavior. Messages are
> > getting replayed. I am not seeing any update in ZK also.
> >
> > [zk: localhost:2181(CONNECTED) 13] get /consumers/1
> > null
> > cZxid = 0x8009ff0ee
> > ctime = Wed May 22 16:59:21 PDT 2013
> > mZxid = 0x8009ff0ee
> > mtime = Wed May 22 16:59:21 PDT 2013
> > pZxid = 0x8009ff0f4
> > cversion = 2
> > dataVersion = 0
> > aclVersion = 0
> > ephemeralOwner = 0x0
> > dataLength = 0
> > numChildren = 2
> >
> >
> > Could any one of you explain me what could be the issue?
> >
> > Ravi
> >
>


Re: consumer offset not saved in zk

2013-05-23 Thread rk vishu
My ZK directory listing is as below. Looks like offsets path is not even
created.

zk: localhost:2181(CONNECTED) 0] ls /
[hadoop-ha, hbase, zookeeper, consumers, controller, storm, brokers,
controller_epoch]
[zk: localhost:2181(CONNECTED) 1] ls /consumers
[1, das-service]
[zk: localhost:2181(CONNECTED) 2] ls /consumers/1
[owners, ids]
[zk: localhost:2181(CONNECTED) 3] ls /consumers/1/offsets
Node does not exist: /consumers/1/offsets
[zk: localhost:2181(CONNECTED) 4]


On Thu, May 23, 2013 at 7:08 AM, Jun Rao  wrote:

> You are looking at the wrong path in ZK. The correct path for consumer
> offset is /consumers/[groupId]/offsets/[topic]/[partitionId] -> long
> (offset). For more details on our ZK layout, see
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper
>
> Thanks,
>
> Jun
>
>
> On Wed, May 22, 2013 at 11:21 PM, rk vishu  wrote:
>
> > Hello All,
> >
> > I recently started experimenting Kafka for my usecase. I am running 0.8
> in
> > two node kafka setup.
> >
> > I produced 20messages using a java program(1 partition with 2 replicas)
> and
> > I am running the consumer code as given in the example
> > https://cwiki.apache.org/KAFKA/consumer-group-example.html. Consumer
> > consumes all the messages and after 10sec program shuts down. Based on
> the
> > configuration in the example, i am expecting that consumer offsets will
> be
> > saved on ZK. If i start the program again, i should not be consuming the
> > same messages again. But i am seeing different behavior. Messages are
> > getting replayed. I am not seeing any update in ZK also.
> >
> > [zk: localhost:2181(CONNECTED) 13] get /consumers/1
> > null
> > cZxid = 0x8009ff0ee
> > ctime = Wed May 22 16:59:21 PDT 2013
> > mZxid = 0x8009ff0ee
> > mtime = Wed May 22 16:59:21 PDT 2013
> > pZxid = 0x8009ff0f4
> > cversion = 2
> > dataVersion = 0
> > aclVersion = 0
> > ephemeralOwner = 0x0
> > dataLength = 0
> > numChildren = 2
> >
> >
> > Could any one of you explain me what could be the issue?
> >
> > Ravi
> >
>


Re: consumer offset not saved in zk

2013-05-23 Thread rk vishu
Neha,

Thanks for pointing out the log4j. I turned on logs at INFO level. Now i
see some warnings as below.

WARN [Kafka-consumer-autocommit-1] (Logging.scala:88) -
[1_BELC02K41GGDKQ4.sea.corp.expecn.com-1369346576173-22148419], exception
during commitOffsets
java.lang.NoSuchMethodError:
org.I0Itec.zkclient.ZkClient.writeData(Ljava/lang/String;Ljava/lang/Object;)Lorg/apache/zookeeper/data/Stat;
at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:304)
at
kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$5.apply(ZookeeperConsumerConnector.scala:253)
at
kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$5.apply(ZookeeperConsumerConnector.scala:250)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at
scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:549)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
at
scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:570)
at
kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:250)
at
kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:248)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at kafka.utils.Pool$$anon$1.foreach(Pool.scala:83)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
at kafka.utils.Pool.foreach(Pool.scala:27)
at
kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:248)
at
kafka.consumer.ZookeeperConsumerConnector.autoCommit(ZookeeperConsumerConnector.scala:234)
at
kafka.consumer.ZookeeperConsumerConnector$$anonfun$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:117)
at kafka.utils.Utils$$anon$2.run(Utils.scala:67)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
at
java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:680)

below are is what i am using in my maven dependencies for ZK. Looks like
zkclient 0.1 has method with void return type for
org.I0Itec.zkclient.ZkClient.writeData. Do i need to use different
dependency?


org.scala-lang
scala-library
2.8.1


com.github.sgroschupf
zkclient
0.1


org.apache.zookeeper
zookeeper
3.4.5


com.sun.jmx
jmxri


com.sun.jdmk
jmxtools


javax.jms
jms






On Thu, May 23, 2013 at 2:39 PM, Neha Narkhede wrote:

> You don't want to override the default configs. Also, seems like something
> else is wrong with your setup ? Could you share the log4j logs of your
> consumer ? Meanwhile, can you try if you can use the console consumer
> successfully ?
>
> Thanks,
> Neha
>
>
> On Thu, May 23, 2013 at 2:31 PM, rk vishu  wrote:
>
> > My ZK directory listing is as below. Looks like offsets path is not even
> > created.
> >
> > zk: localhost:2181(CONNECTED) 0] ls /
> > [hadoop-ha, hbase, zookeeper, consumers, controller, storm, brokers,
> > controller_epoch]
> > [zk: localhost:2181(CONNECTED) 1] ls /consumers
> > [1, das-service]
> > [zk: localhost:2181(CONNECTED) 2] ls /consumers/1
> > [owners, ids]
> > [zk: localhost:2181(CONNECTED) 3] ls /consumers/1/offsets
> > Node does not exist: /consumers/1/offsets
> > [zk: localhost:2181(CONNECTED) 4]
> >
> >
> > On Thu, May 23, 2013 at 7:08 AM, Jun Rao  wrote:
> >
> > > You are looking at the wrong path in ZK. The correct path for consumer
> > > offset is /consumers/[groupId]/offsets/[topic]/[partitionId] -> long
> > > (offset). For more details on our ZK layout, see
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper
> > >
> > &

Re: consumer offset not saved in zk

2013-05-23 Thread rk vishu
Neha,

I see the point. I verified zkclient version from kafka build and found
that it is 0.2.

i updated my client app's POM to include the following (corrected from 0.1)


com.101tec
zkclient
0.2


Thank you very much for the inputs and help.



On Thu, May 23, 2013 at 3:23 PM, Neha Narkhede wrote:

> Can you please  try the following -
>
> ./sbt clean assembly-package-dependency package
>
> Thanks,
> Neha
>
>
> On Thu, May 23, 2013 at 3:16 PM, rk vishu  wrote:
>
> > Neha,
> >
> > Thanks for pointing out the log4j. I turned on logs at INFO level. Now i
> > see some warnings as below.
> >
> > WARN [Kafka-consumer-autocommit-1] (Logging.scala:88) -
> > [1_BELC02K41GGDKQ4.sea.corp.expecn.com-1369346576173-22148419], exception
> > during commitOffsets
> > java.lang.NoSuchMethodError:
> >
> >
> org.I0Itec.zkclient.ZkClient.writeData(Ljava/lang/String;Ljava/lang/Object;)Lorg/apache/zookeeper/data/Stat;
> > at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:304)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$5.apply(ZookeeperConsumerConnector.scala:253)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$5.apply(ZookeeperConsumerConnector.scala:250)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> > at
> >
> >
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:549)
> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> > at
> >
> >
> scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:570)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:250)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:248)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> > at kafka.utils.Pool$$anon$1.foreach(Pool.scala:83)
> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> > at kafka.utils.Pool.foreach(Pool.scala:27)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:248)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector.autoCommit(ZookeeperConsumerConnector.scala:234)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:117)
> > at kafka.utils.Utils$$anon$2.run(Utils.scala:67)
> > at
> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
> > at
> >
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
> > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
> > at
> >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
> > at
> >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
> > at
> >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
> > at
> >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
> > at
> >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
> > at java.lang.Thread.run(Thread.java:680)
> >
> > below are is what i am using in my maven dependencies for ZK. Looks like
> > zkclient 0.1 has method with void return type for
> > org.I0Itec.zkclient.ZkClient.writeData. Do i need to use different
> > dependency?
> >
> > 
> > org.scala-lang
> > scala-library
> > 2.8.1
> > 
> > 
> > com.github.sgroschupf
> > zkclient
> > 0.1
> > 
> > 
> > org.apache.zookeeper
> > zookeeper
> > 3.4.5
> > 
> > 
> > com.sun.jmx
> > jmxri
> > 
> > 
> >     com.sun.jdmk
> > jmxtools
> > 
> > 
> >  

Recommendation for number of brokers on kafka(0.7.2) hosts

2013-10-01 Thread rk vishu
Hello All,

I am currently using 5 node kafka cluster with 0.7.2 version. Would like to
get some advice on optimal number of brokers on each kafka host. Below is
the specification of each machine

- 4 data directories /data1,/data2, /data3, /data4 with 200+GB usable
space. RAID10
- 24 Core CPU
- 32GB RAM

>From the server properties, i believe we can only configure one data
log.dir.
So below are my questions
1) Will setting 4 brokers per host with different ports and different log
data directories be beneficial to use all the available space?
2) Will there be any disadvantage using multiple brokers on same host?
3) If partition count is set to 2 per topic. Having multiple brokers on
each machine , multiply the partition count per machine? i.e with 4 brokers
per machine on 5 machines will generate 2*4*5 = 40 partitions per topic?
4) Is there a way to use all the directories from a single broker?

Thanks and Regards
Ravi


Re: Recommendation for number of brokers on kafka(0.7.2) hosts

2013-10-01 Thread rk vishu
Thank you Neha for the suggestion.


On Tue, Oct 1, 2013 at 1:50 PM, Neha Narkhede wrote:

> 1) Will setting 4 brokers per host with different ports and different log
> data directories be beneficial to use all the available space?
> 2) Will there be any disadvantage using multiple brokers on same host?
>
> It is recommended that you do not deploy multiple brokers on the same box
> since that will not lead to optimal page cache usage, which will affect IO
> performance.
>
> 3) If partition count is set to 2 per topic. Having multiple brokers on
> each machine , multiply the partition count per machine? i.e with 4 brokers
> per machine on 5 machines will generate 2*4*5 = 40 partitions per topic?
>
> Correct.
>
> 4) Is there a way to use all the directories from a single broker?
>
> Kafka 0.8 supports specifying multiple log directories through the log.dirs
> config.
>
> Thanks,
> Neha
>
>
> On Tue, Oct 1, 2013 at 1:17 PM, rk vishu  wrote:
>
> > Hello All,
> >
> > I am currently using 5 node kafka cluster with 0.7.2 version. Would like
> to
> > get some advice on optimal number of brokers on each kafka host. Below is
> > the specification of each machine
> >
> > - 4 data directories /data1,/data2, /data3, /data4 with 200+GB usable
> > space. RAID10
> > - 24 Core CPU
> > - 32GB RAM
> >
> > From the server properties, i believe we can only configure one data
> > log.dir.
> > So below are my questions
> > 1) Will setting 4 brokers per host with different ports and different log
> > data directories be beneficial to use all the available space?
> > 2) Will there be any disadvantage using multiple brokers on same host?
> > 3) If partition count is set to 2 per topic. Having multiple brokers on
> > each machine , multiply the partition count per machine? i.e with 4
> brokers
> > per machine on 5 machines will generate 2*4*5 = 40 partitions per topic?
> > 4) Is there a way to use all the directories from a single broker?
> >
> > Thanks and Regards
> > Ravi
> >
>