Consumer example not working for 0.8 beta

2013-11-18 Thread Menka
I am a newbie for Kafka and trying to execute the samples for 0.8 beta
release from

https://github.com/apache/kafka/tree/0.8.0-beta1candidate1/examples/src/main/java/kafka/examples

I tried KafkaConsumerProducerDemo and wanted to print what Consumer is
consuming but its not doing anything (Consumer.java).


public void run() {
System.out.println(name + "(consumer)--> in run");
Map topicCountMap = new HashMap();
topicCountMap.put(topic, new Integer(1));
Map>> consumerMap =
consumer.createMessageStreams(topicCountMap);
KafkaStream stream =  consumerMap.get(topic).get(0);
ConsumerIterator it = stream.iterator();
while(it.hasNext()) {
   //
System.out.println(ExampleUtils.getMessage(it.next().message()));
System.out.println(name + "(consumer)--> " + new
String(it.next().message()));
}

Any suggestion on what I might be doing wrong here?

Thanks,
Menka

-- 
Blog: http://menkag.blogspot.com
"Don't let what you cannot do interfere with what you can do" - by John
Wooden


Re: Kafka cluster with lots of topics

2013-11-18 Thread Robert Rodgers
If I understand your comment correctly you are trying to use kafka topics as 
per-endpoint message queues.  

I may be mistaken, but to me Kafka seems does not really seem like a good match 
for that.  For such a purpose you will eventually want something that is not 
actually a queue - a means to perform state compression, event-type-based and 
priority-based dequeue and other operations which are not appropriate to a 
throughput/FIFO oriented system.   


On Nov 14, 2013, at 6:18 AM, Joe Freeman  wrote:

> Thanks for the replies. I don't think Kafka quite fits our use case,
> unfortunately. To abstractly answer Edward's question: in a system with
> lots of users, we were considering having a topic per user (such that an
> individual user can connect from a number of endpoints and receive events,
> including events that were sent while the user was disconnected -
> persisting the events to disk and using offsets means we don't have to
> track which events each individual endpoint has received).
> 
> 
> 
> On 14 November 2013 04:38, Edward Capriolo  wrote:
> 
>> Zookeeper will not be the only problem. The first is that each topic is a
>> directory on the file system. Each of those is going to have files inside
>> it. This is going to be fairly overwhelming for the file system. Also I can
>> not speak for the internals but there may be cases where this many topics
>> allocates a big array or some other non-optimal behaviour.
>> 
>> Like a RDBMS with this many tables one might ask, why? Isn't there a way to
>> design the system multi-tennent where so many physical topics are not
>> needed?
>> 
>> 
>> On Wed, Nov 13, 2013 at 9:41 AM, Neha Narkhede >> wrote:
>> 
>>> At those many topics, zookeeper will be the main bottleneck. Leader
>>> election process will take very long increasing the unavailability window
>>> of the cluster.
>>> 
>>> Thanks,
>>> Neha
>>> On Nov 13, 2013 4:49 AM, "Joe Freeman"  wrote:
>>> 
 Would I be correct in assuming that a Kafka cluster won't scale well to
 support lots (tens of millions) of topics? If I understand correctly, a
 node being added or removed would involve a leader election for each
>>> topic,
 which is a relatively expensive operation?
 
>>> 
>> 
> 
> 
> 
> -- 
> Bitroot - http://bitroot.com



Mirror Maker [Aggregate cluster]

2013-11-18 Thread Vinicius Carvalho
Hi there, I'm sorry about this silly question, just trying to make sure I
got this right:

Having a separate cluster for aggregate, means that I would need a second
zookeeper for that cluster right?

I would need a ZK for my local cluster, and then a ZK for my aggregate, is
this correct?

Regards

-- 
The intuitive mind is a sacred gift and the
rational mind is a faithful servant. We have
created a society that honors the servant and
has forgotten the gift.


Re: Mirror Maker [Aggregate cluster]

2013-11-18 Thread Neha Narkhede
You can also just use a different namespace on the same zookeeper cluster
if both local and aggregate are in the same colo.

Thanks,
Neha


On Mon, Nov 18, 2013 at 12:44 PM, Vinicius Carvalho <
viniciusccarva...@gmail.com> wrote:

> Hi there, I'm sorry about this silly question, just trying to make sure I
> got this right:
>
> Having a separate cluster for aggregate, means that I would need a second
> zookeeper for that cluster right?
>
> I would need a ZK for my local cluster, and then a ZK for my aggregate, is
> this correct?
>
> Regards
>
> --
> The intuitive mind is a sacred gift and the
> rational mind is a faithful servant. We have
> created a society that honors the servant and
> has forgotten the gift.
>


./sbt assembly-package-dependency needed for Kafka 0.7.2?

2013-11-18 Thread Menka
Do we need to run

./sbt assembly-package-dependency for Kafka 0.7.2?

I am running into this.


[idhuser@vmidh kafka-0.7.2-incubating-src]$ ./sbt
assembly-package-dependency
[info] Building project Kafka 0.7.2 against Scala 2.8.0
[info]using KafkaProject with sbt 0.7.5 and Scala 2.7.7
*[error] No action named 'assembly-package-dependency' exists.*
[info] Execute 'help' for a list of commands or 'actions' for a list of
available project actions and methods.
[info]
[info] Total time: 0 s, completed Nov 18, 2013 1:38:54 PM
[info]
[info] Total session time: 1 s, completed Nov 18, 2013 1:38:54 PM
[error] Error during build.


-- 
Blog: http://menkag.blogspot.com
Facebook: http://www.facebook.com/MenkasJewelry

"Don't let what you cannot do interfere with what you can do" - by John
Wooden


Re: ./sbt assembly-package-dependency needed for Kafka 0.7.2?

2013-11-18 Thread Joe Stein
Nope, assembly-package-dependency was introduced for 0.8

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop 
/


On Mon, Nov 18, 2013 at 4:56 PM, Menka  wrote:

> Do we need to run
>
> ./sbt assembly-package-dependency for Kafka 0.7.2?
>
> I am running into this.
>
>
> [idhuser@vmidh kafka-0.7.2-incubating-src]$ ./sbt
> assembly-package-dependency
> [info] Building project Kafka 0.7.2 against Scala 2.8.0
> [info]using KafkaProject with sbt 0.7.5 and Scala 2.7.7
> *[error] No action named 'assembly-package-dependency' exists.*
> [info] Execute 'help' for a list of commands or 'actions' for a list of
> available project actions and methods.
> [info]
> [info] Total time: 0 s, completed Nov 18, 2013 1:38:54 PM
> [info]
> [info] Total session time: 1 s, completed Nov 18, 2013 1:38:54 PM
> [error] Error during build.
>
>
> --
> Blog: http://menkag.blogspot.com
> Facebook: http://www.facebook.com/MenkasJewelry
>
> "Don't let what you cannot do interfere with what you can do" - by John
> Wooden
>


High-level consumer not rebalancing.

2013-11-18 Thread Drew Goya
So I've run into a problem where occasionally, some partitions within a
topic end up in a "none" owner state for a long time.

I'm using the high-level consumer on several machines, each consumer has 4
threads.

Normally when I run the ConsumerOffsetChecker, all partitions have owners
and similar lag.

Occasionally I end up in this state:

trackingGroup   Events232  552506856
569853398   17346542none
trackingGroup   Events233  553649131
569775298   16126167none
trackingGroup   Events234  552380321
569572719   17192398none
trackingGroup   Events235  553206745
569448821   16242076none
trackingGroup   Events236  553673576
570084283   16410707none
trackingGroup   Events237  552669833
569765642   17095809none
trackingGroup   Events238  553147178
569766985   16619807none
trackingGroup   Events239  552495219
569837815   17342596none
trackingGroup   Events240  570108655
570111080   2425trackingGroup_host6-1384385417822-23157ae8-0
trackingGroup   Events241  570288505
570291068   2563trackingGroup_host6-1384385417822-23157ae8-0
trackingGroup   Events242  569929870
569932330   2460trackingGroup_host6-1384385417822-23157ae8-0

I'm at the point where I'm considering writing my own client but hopefully
the user group has the answer!

I am using this commit of 8.0 on both the brokers and clients:
d4553da609ea9af6db8a79faf116d1623c8a856f


Re: High-level consumer not rebalancing.

2013-11-18 Thread Guozhang Wang
Hello Drew,

Do you see any rebalance failure exceptions in the consumer log?

Guozhang


On Mon, Nov 18, 2013 at 2:14 PM, Drew Goya  wrote:

> So I've run into a problem where occasionally, some partitions within a
> topic end up in a "none" owner state for a long time.
>
> I'm using the high-level consumer on several machines, each consumer has 4
> threads.
>
> Normally when I run the ConsumerOffsetChecker, all partitions have owners
> and similar lag.
>
> Occasionally I end up in this state:
>
> trackingGroup   Events232  552506856
> 569853398   17346542none
> trackingGroup   Events233  553649131
> 569775298   16126167none
> trackingGroup   Events234  552380321
> 569572719   17192398none
> trackingGroup   Events235  553206745
> 569448821   16242076none
> trackingGroup   Events236  553673576
> 570084283   16410707none
> trackingGroup   Events237  552669833
> 569765642   17095809none
> trackingGroup   Events238  553147178
> 569766985   16619807none
> trackingGroup   Events239  552495219
> 569837815   17342596none
> trackingGroup   Events240  570108655
> 570111080   2425
>  trackingGroup_host6-1384385417822-23157ae8-0
> trackingGroup   Events241  570288505
> 570291068   2563
>  trackingGroup_host6-1384385417822-23157ae8-0
> trackingGroup   Events242  569929870
> 569932330   2460
>  trackingGroup_host6-1384385417822-23157ae8-0
>
> I'm at the point where I'm considering writing my own client but hopefully
> the user group has the answer!
>
> I am using this commit of 8.0 on both the brokers and clients:
> d4553da609ea9af6db8a79faf116d1623c8a856f
>



-- 
-- Guozhang


Re: High-level consumer not rebalancing.

2013-11-18 Thread Drew Goya
Hey Guozhang, I just forced the error by killing one of my consumer JVMs
and I am getting a consumer rebalance failure:

2013-11-18 22:46:54 k.c.ZookeeperConsumerConnector [ERROR]
[bridgeTopology_host-1384493092466-7099d843], error during syncedRebalance
kafka.common.ConsumerRebalanceFailedException:
bridgeTopology_host-1384493092466-7099d843 can't rebalance after 10 retries
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:428)
~[stormjar.jar:na]
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:355)
~[stormjar.jar:na]

These are the relevant lines in my consumer properties file:

rebalance.max.retries=10
rebalance.backoff.ms=1

My topic has 128 partitions

Are there some other configuration settings I should be using?


On Mon, Nov 18, 2013 at 2:37 PM, Guozhang Wang  wrote:

> Hello Drew,
>
> Do you see any rebalance failure exceptions in the consumer log?
>
> Guozhang
>
>
> On Mon, Nov 18, 2013 at 2:14 PM, Drew Goya  wrote:
>
> > So I've run into a problem where occasionally, some partitions within a
> > topic end up in a "none" owner state for a long time.
> >
> > I'm using the high-level consumer on several machines, each consumer has
> 4
> > threads.
> >
> > Normally when I run the ConsumerOffsetChecker, all partitions have owners
> > and similar lag.
> >
> > Occasionally I end up in this state:
> >
> > trackingGroup   Events232  552506856
> > 569853398   17346542none
> > trackingGroup   Events233  553649131
> > 569775298   16126167none
> > trackingGroup   Events234  552380321
> > 569572719   17192398none
> > trackingGroup   Events235  553206745
> > 569448821   16242076none
> > trackingGroup   Events236  553673576
> > 570084283   16410707none
> > trackingGroup   Events237  552669833
> > 569765642   17095809none
> > trackingGroup   Events238  553147178
> > 569766985   16619807none
> > trackingGroup   Events239  552495219
> > 569837815   17342596none
> > trackingGroup   Events240  570108655
> > 570111080   2425
> >  trackingGroup_host6-1384385417822-23157ae8-0
> > trackingGroup   Events241  570288505
> > 570291068   2563
> >  trackingGroup_host6-1384385417822-23157ae8-0
> > trackingGroup   Events242  569929870
> > 569932330   2460
> >  trackingGroup_host6-1384385417822-23157ae8-0
> >
> > I'm at the point where I'm considering writing my own client but
> hopefully
> > the user group has the answer!
> >
> > I am using this commit of 8.0 on both the brokers and clients:
> > d4553da609ea9af6db8a79faf116d1623c8a856f
> >
>
>
>
> --
> -- Guozhang
>


Re: ./sbt assembly-package-dependency needed for Kafka 0.7.2?

2013-11-18 Thread Menka
Joe,

Thanks so much.

Regards,
Menka



On Mon, Nov 18, 2013 at 2:13 PM, Joe Stein  wrote:

> Nope, assembly-package-dependency was introduced for 0.8
>
> /***
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop 
> /
>
>
> On Mon, Nov 18, 2013 at 4:56 PM, Menka  wrote:
>
> > Do we need to run
> >
> > ./sbt assembly-package-dependency for Kafka 0.7.2?
> >
> > I am running into this.
> >
> >
> > [idhuser@vmidh kafka-0.7.2-incubating-src]$ ./sbt
> > assembly-package-dependency
> > [info] Building project Kafka 0.7.2 against Scala 2.8.0
> > [info]using KafkaProject with sbt 0.7.5 and Scala 2.7.7
> > *[error] No action named 'assembly-package-dependency' exists.*
> > [info] Execute 'help' for a list of commands or 'actions' for a list of
> > available project actions and methods.
> > [info]
> > [info] Total time: 0 s, completed Nov 18, 2013 1:38:54 PM
> > [info]
> > [info] Total session time: 1 s, completed Nov 18, 2013 1:38:54 PM
> > [error] Error during build.
> >
> >
> > --
> > Blog: http://menkag.blogspot.com
> > Facebook: http://www.facebook.com/MenkasJewelry
> >
> > "Don't let what you cannot do interfere with what you can do" - by John
> > Wooden
> >
>



-- 
Blog: http://menkag.blogspot.com
Facebook: http://www.facebook.com/MenkasJewelry

"Don't let what you cannot do interfere with what you can do" - by John
Wooden


Re: High-level consumer not rebalancing.

2013-11-18 Thread Drew Goya
Also of note, this is all running from within a storm topology, when I kill
a JVM, another is started very quickly.

Could this be a problem with a consumer leaving and rejoining within a
small window?


On Mon, Nov 18, 2013 at 2:52 PM, Drew Goya  wrote:

> Hey Guozhang, I just forced the error by killing one of my consumer JVMs
> and I am getting a consumer rebalance failure:
>
> 2013-11-18 22:46:54 k.c.ZookeeperConsumerConnector [ERROR]
> [bridgeTopology_host-1384493092466-7099d843], error during syncedRebalance
> kafka.common.ConsumerRebalanceFailedException:
> bridgeTopology_host-1384493092466-7099d843 can't rebalance after 10 retries
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:428)
> ~[stormjar.jar:na]
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:355)
> ~[stormjar.jar:na]
>
> These are the relevant lines in my consumer properties file:
>
> rebalance.max.retries=10
> rebalance.backoff.ms=1
>
> My topic has 128 partitions
>
> Are there some other configuration settings I should be using?
>
>
> On Mon, Nov 18, 2013 at 2:37 PM, Guozhang Wang  wrote:
>
>> Hello Drew,
>>
>> Do you see any rebalance failure exceptions in the consumer log?
>>
>> Guozhang
>>
>>
>> On Mon, Nov 18, 2013 at 2:14 PM, Drew Goya  wrote:
>>
>> > So I've run into a problem where occasionally, some partitions within a
>> > topic end up in a "none" owner state for a long time.
>> >
>> > I'm using the high-level consumer on several machines, each consumer
>> has 4
>> > threads.
>> >
>> > Normally when I run the ConsumerOffsetChecker, all partitions have
>> owners
>> > and similar lag.
>> >
>> > Occasionally I end up in this state:
>> >
>> > trackingGroup   Events232  552506856
>> > 569853398   17346542none
>> > trackingGroup   Events233  553649131
>> > 569775298   16126167none
>> > trackingGroup   Events234  552380321
>> > 569572719   17192398none
>> > trackingGroup   Events235  553206745
>> > 569448821   16242076none
>> > trackingGroup   Events236  553673576
>> > 570084283   16410707none
>> > trackingGroup   Events237  552669833
>> > 569765642   17095809none
>> > trackingGroup   Events238  553147178
>> > 569766985   16619807none
>> > trackingGroup   Events239  552495219
>> > 569837815   17342596none
>> > trackingGroup   Events240  570108655
>> > 570111080   2425
>> >  trackingGroup_host6-1384385417822-23157ae8-0
>> > trackingGroup   Events241  570288505
>> > 570291068   2563
>> >  trackingGroup_host6-1384385417822-23157ae8-0
>> > trackingGroup   Events242  569929870
>> > 569932330   2460
>> >  trackingGroup_host6-1384385417822-23157ae8-0
>> >
>> > I'm at the point where I'm considering writing my own client but
>> hopefully
>> > the user group has the answer!
>> >
>> > I am using this commit of 8.0 on both the brokers and clients:
>> > d4553da609ea9af6db8a79faf116d1623c8a856f
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>


Re: High-level consumer not rebalancing.

2013-11-18 Thread Guozhang Wang
Could you find some entries in the log with the key word "conflict"? If yes
could you paste them here?

Guozhang


On Mon, Nov 18, 2013 at 2:56 PM, Drew Goya  wrote:

> Also of note, this is all running from within a storm topology, when I kill
> a JVM, another is started very quickly.
>
> Could this be a problem with a consumer leaving and rejoining within a
> small window?
>
>
> On Mon, Nov 18, 2013 at 2:52 PM, Drew Goya  wrote:
>
> > Hey Guozhang, I just forced the error by killing one of my consumer JVMs
> > and I am getting a consumer rebalance failure:
> >
> > 2013-11-18 22:46:54 k.c.ZookeeperConsumerConnector [ERROR]
> > [bridgeTopology_host-1384493092466-7099d843], error during
> syncedRebalance
> > kafka.common.ConsumerRebalanceFailedException:
> > bridgeTopology_host-1384493092466-7099d843 can't rebalance after 10
> retries
> > at
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:428)
> > ~[stormjar.jar:na]
> > at
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:355)
> > ~[stormjar.jar:na]
> >
> > These are the relevant lines in my consumer properties file:
> >
> > rebalance.max.retries=10
> > rebalance.backoff.ms=1
> >
> > My topic has 128 partitions
> >
> > Are there some other configuration settings I should be using?
> >
> >
> > On Mon, Nov 18, 2013 at 2:37 PM, Guozhang Wang 
> wrote:
> >
> >> Hello Drew,
> >>
> >> Do you see any rebalance failure exceptions in the consumer log?
> >>
> >> Guozhang
> >>
> >>
> >> On Mon, Nov 18, 2013 at 2:14 PM, Drew Goya  wrote:
> >>
> >> > So I've run into a problem where occasionally, some partitions within
> a
> >> > topic end up in a "none" owner state for a long time.
> >> >
> >> > I'm using the high-level consumer on several machines, each consumer
> >> has 4
> >> > threads.
> >> >
> >> > Normally when I run the ConsumerOffsetChecker, all partitions have
> >> owners
> >> > and similar lag.
> >> >
> >> > Occasionally I end up in this state:
> >> >
> >> > trackingGroup   Events232  552506856
> >> > 569853398   17346542none
> >> > trackingGroup   Events233  553649131
> >> > 569775298   16126167none
> >> > trackingGroup   Events234  552380321
> >> > 569572719   17192398none
> >> > trackingGroup   Events235  553206745
> >> > 569448821   16242076none
> >> > trackingGroup   Events236  553673576
> >> > 570084283   16410707none
> >> > trackingGroup   Events237  552669833
> >> > 569765642   17095809none
> >> > trackingGroup   Events238  553147178
> >> > 569766985   16619807none
> >> > trackingGroup   Events239  552495219
> >> > 569837815   17342596none
> >> > trackingGroup   Events240  570108655
> >> > 570111080   2425
> >> >  trackingGroup_host6-1384385417822-23157ae8-0
> >> > trackingGroup   Events241  570288505
> >> > 570291068   2563
> >> >  trackingGroup_host6-1384385417822-23157ae8-0
> >> > trackingGroup   Events242  569929870
> >> > 569932330   2460
> >> >  trackingGroup_host6-1384385417822-23157ae8-0
> >> >
> >> > I'm at the point where I'm considering writing my own client but
> >> hopefully
> >> > the user group has the answer!
> >> >
> >> > I am using this commit of 8.0 on both the brokers and clients:
> >> > d4553da609ea9af6db8a79faf116d1623c8a856f
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>



-- 
-- Guozhang


Re: High-level consumer not rebalancing.

2013-11-18 Thread Drew Goya
So I think I got to the root of the problem. Thanks for pointing me in the
direction of zookeeper data conflicts.

I turned the log level up to INFO and captured a bunch of conflict messages
from the zookeeper client.

I did an "rmr" on the consumers/ zookeeper node to clear out
any lingering data and fired up my consumers again.

Whatever node data was present seems to have been corrupted by an earlier
version of Kafka.

I can now terminate consumer JVMs (I've even rebooted a machine running 4
consumers) and the topic immediately rebalances.

I'll keep testing and follow up here if I can replicate the error with
clean ZK data.


On Mon, Nov 18, 2013 at 3:10 PM, Guozhang Wang  wrote:

> Could you find some entries in the log with the key word "conflict"? If yes
> could you paste them here?
>
> Guozhang
>
>
> On Mon, Nov 18, 2013 at 2:56 PM, Drew Goya  wrote:
>
> > Also of note, this is all running from within a storm topology, when I
> kill
> > a JVM, another is started very quickly.
> >
> > Could this be a problem with a consumer leaving and rejoining within a
> > small window?
> >
> >
> > On Mon, Nov 18, 2013 at 2:52 PM, Drew Goya  wrote:
> >
> > > Hey Guozhang, I just forced the error by killing one of my consumer
> JVMs
> > > and I am getting a consumer rebalance failure:
> > >
> > > 2013-11-18 22:46:54 k.c.ZookeeperConsumerConnector [ERROR]
> > > [bridgeTopology_host-1384493092466-7099d843], error during
> > syncedRebalance
> > > kafka.common.ConsumerRebalanceFailedException:
> > > bridgeTopology_host-1384493092466-7099d843 can't rebalance after 10
> > retries
> > > at
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:428)
> > > ~[stormjar.jar:na]
> > > at
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:355)
> > > ~[stormjar.jar:na]
> > >
> > > These are the relevant lines in my consumer properties file:
> > >
> > > rebalance.max.retries=10
> > > rebalance.backoff.ms=1
> > >
> > > My topic has 128 partitions
> > >
> > > Are there some other configuration settings I should be using?
> > >
> > >
> > > On Mon, Nov 18, 2013 at 2:37 PM, Guozhang Wang 
> > wrote:
> > >
> > >> Hello Drew,
> > >>
> > >> Do you see any rebalance failure exceptions in the consumer log?
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Mon, Nov 18, 2013 at 2:14 PM, Drew Goya 
> wrote:
> > >>
> > >> > So I've run into a problem where occasionally, some partitions
> within
> > a
> > >> > topic end up in a "none" owner state for a long time.
> > >> >
> > >> > I'm using the high-level consumer on several machines, each consumer
> > >> has 4
> > >> > threads.
> > >> >
> > >> > Normally when I run the ConsumerOffsetChecker, all partitions have
> > >> owners
> > >> > and similar lag.
> > >> >
> > >> > Occasionally I end up in this state:
> > >> >
> > >> > trackingGroup   Events232  552506856
> > >> > 569853398   17346542none
> > >> > trackingGroup   Events233  553649131
> > >> > 569775298   16126167none
> > >> > trackingGroup   Events234  552380321
> > >> > 569572719   17192398none
> > >> > trackingGroup   Events235  553206745
> > >> > 569448821   16242076none
> > >> > trackingGroup   Events236  553673576
> > >> > 570084283   16410707none
> > >> > trackingGroup   Events237  552669833
> > >> > 569765642   17095809none
> > >> > trackingGroup   Events238  553147178
> > >> > 569766985   16619807none
> > >> > trackingGroup   Events239  552495219
> > >> > 569837815   17342596none
> > >> > trackingGroup   Events240  570108655
> > >> > 570111080   2425
> > >> >  trackingGroup_host6-1384385417822-23157ae8-0
> > >> > trackingGroup   Events241  570288505
> > >> > 570291068   2563
> > >> >  trackingGroup_host6-1384385417822-23157ae8-0
> > >> > trackingGroup   Events242  569929870
> > >> > 569932330   2460
> > >> >  trackingGroup_host6-1384385417822-23157ae8-0
> > >> >
> > >> > I'm at the point where I'm considering writing my own client but
> > >> hopefully
> > >> > the user group has the answer!
> > >> >
> > >> > I am using this commit of 8.0 on both the brokers and clients:
> > >> > d4553da609ea9af6db8a79faf116d1623c8a856f
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>


Replica Configuration Settings

2013-11-18 Thread Tom Amon
Hi All,

I don't see the replica configuration settings (outlined in the Kafka 0.8
documentation) in the configuration file that comes with the distribution.
I was wondering if they are necessary or if they have reasonable defaults?
Are there implications for not having them in the configuration file at
all? Can they be used as is (with the usual caveats) from the documentation?

Thanks


Re: Replica Configuration Settings

2013-11-18 Thread Guozhang Wang
Hi Tom,

All the replication parameters have some default values:

http://kafka.apache.org/documentation.html#brokerconfigs

And they are not overridden in the release so default values will be
picked, which actually opt to not use replication unless specified per
topic since default.replication.factor is default as 1.

Guozhang


On Mon, Nov 18, 2013 at 5:13 PM, Tom Amon  wrote:

> Hi All,
>
> I don't see the replica configuration settings (outlined in the Kafka 0.8
> documentation) in the configuration file that comes with the distribution.
> I was wondering if they are necessary or if they have reasonable defaults?
> Are there implications for not having them in the configuration file at
> all? Can they be used as is (with the usual caveats) from the
> documentation?
>
> Thanks
>



-- 
-- Guozhang


kafka.common.OffsetOutOfRangeException

2013-11-18 Thread Oleg Ruchovets
We are working with kafka  (0.7.2) + storm.
   1) We deployed 1st topology which subscribed on Kafka topic and it is
working fine already couple of weeks.
2) Yesterday we deploy 2nd topology which subscribed on the  same Kafka
topic , but 2nd topology immediately failed with exception:

*What can cause such behavior and how we can resolve the issue: *


java.lang.RuntimeException: kafka.common.OffsetOutOfRangeException

at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)

at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58)

at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)

at
backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(executor.clj:658)

at
backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)

at clojure.lang.AFn.run(AFn.java:24)

at java.lang.Thread.run(Thread.java:662)

Caused by: kafka.common.OffsetOutOfRangeException

at
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)

at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)

at
java.lang.reflect.Constructor.newInstance(Constructor.java:513)

at java.lang.Class.newInstance0(Class.java:355)

at java.lang.Class.newInstance(Class.java:308)

at
kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53)

at
kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99)

at
kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82)

at
kafka.javaapi.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:51)

at
kafka.javaapi.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:50)

at
storm.kafka.KafkaUtils.emitPartitionBatchNew(KafkaUtils.java:36)

at
storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:75)

at
storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:64)

at
backtype.storm.transactional.partitioned.OpaquePartitionedTransactionalSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTransactionalSpoutExecutor.java:90)

at
backtype.storm.transactional.TransactionalSpoutBatchExecutor.execute(TransactionalSpoutBatchExecutor.java:47)

at
backtype.storm.coordination.CoordinatedBolt.execute(CoordinatedBolt.java:307)

at
backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke(executor.clj:566)

at
backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(executor.clj:345)

at
backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disruptor.clj:43)

at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:79)

... 6 more

Mon, 18 Nov 2013 12:36:25 +

java.lang.RuntimeException: kafka.common.OffsetOutOfRangeException

at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)

at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58)

at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)

at
backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(executor.clj:658)

at
backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)

at clojure.lang.AFn.run(AFn.java:24)

at java.lang.Thread.run(Thread.java:662)

Caused by: kafka.common.OffsetOutOfRangeException

at
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)

at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)

at
java.lang.reflect.Constructor.newInstance(Constructor.java:513)

at java.lang.Class.newInstance0(Class.java:355)

at java.lang.Class.newInstance(Class.java:308)

at
kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53)

at
kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99)

at
kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82)

at
kafka.javaapi.message.ByteBufferMessa

Re: kafka.common.OffsetOutOfRangeException

2013-11-18 Thread Philip O'Toole
Don't get scared, this if perfectly normal and easily fixed. :-) The second
topology attempted to fetch messages from an offset in Kafka that does not
exists. This could happen due to Kafka retention policies (messages
deleted) or a bug in your code. Your code needs to catch this exception,
and then ask Kafka for the earliest -- or latest offset (take your pick) --
and then re-issue the fetch using the returned offset.

Are you using a separate path in ZK for the second topology? It is of a
completely different nature than the first?

Philip




On Mon, Nov 18, 2013 at 7:40 PM, Oleg Ruchovets wrote:

> We are working with kafka  (0.7.2) + storm.
>1) We deployed 1st topology which subscribed on Kafka topic and it is
> working fine already couple of weeks.
> 2) Yesterday we deploy 2nd topology which subscribed on the  same Kafka
> topic , but 2nd topology immediately failed with exception:
>
> *What can cause such behavior and how we can resolve the issue: *
>
>
> java.lang.RuntimeException: kafka.common.OffsetOutOfRangeException
>
> at
>
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
>
> at
>
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58)
>
> at
>
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
>
> at
>
> backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(executor.clj:658)
>
> at
> backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
>
> at clojure.lang.AFn.run(AFn.java:24)
>
> at java.lang.Thread.run(Thread.java:662)
>
> Caused by: kafka.common.OffsetOutOfRangeException
>
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>
> at
>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
>
> at
>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
>
> at
> java.lang.reflect.Constructor.newInstance(Constructor.java:513)
>
> at java.lang.Class.newInstance0(Class.java:355)
>
> at java.lang.Class.newInstance(Class.java:308)
>
> at
> kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53)
>
> at
>
> kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99)
>
> at
> kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82)
>
> at
>
> kafka.javaapi.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:51)
>
> at
>
> kafka.javaapi.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:50)
>
> at
> storm.kafka.KafkaUtils.emitPartitionBatchNew(KafkaUtils.java:36)
>
> at
>
> storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:75)
>
> at
>
> storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:64)
>
> at
>
> backtype.storm.transactional.partitioned.OpaquePartitionedTransactionalSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTransactionalSpoutExecutor.java:90)
>
> at
>
> backtype.storm.transactional.TransactionalSpoutBatchExecutor.execute(TransactionalSpoutBatchExecutor.java:47)
>
> at
>
> backtype.storm.coordination.CoordinatedBolt.execute(CoordinatedBolt.java:307)
>
> at
>
> backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke(executor.clj:566)
>
> at
>
> backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(executor.clj:345)
>
> at
>
> backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disruptor.clj:43)
>
> at
>
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:79)
>
> ... 6 more
>
> Mon, 18 Nov 2013 12:36:25 +
>
> java.lang.RuntimeException: kafka.common.OffsetOutOfRangeException
>
> at
>
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
>
> at
>
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58)
>
> at
>
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
>
> at
>
> backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(executor.clj:658)
>
> at
> backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
>
> at clojure.lang.AFn.run(AFn.java:24)
>
> at java.lang.Thread.run(Thread.java:662)
>
> Caused by: kafka.common.OffsetOutOfRangeException
>
> at
> sun.reflect.NativeConstructorAccessorImpl.newI

How to link kafka servers

2013-11-18 Thread David Montgomery
Hi,

My endpoint for kafka is in Europe where the a consumers gets the final
messages.  But I want to write to kafka servers in SG, and the US east and
west and will then forward to kafka servers in the UK.  How do I do that?
Is there a good tutorial?  What are the keywords that describe what I want
to do?

Thanks


Re: Consumer example not working for 0.8 beta

2013-11-18 Thread Jun Rao
Try producing some new data after the consumer is running. Be default, the
consumer only picks up newly produced data.

Thanks,

Jun


On Mon, Nov 18, 2013 at 11:22 AM, Menka  wrote:

> I am a newbie for Kafka and trying to execute the samples for 0.8 beta
> release from
>
>
> https://github.com/apache/kafka/tree/0.8.0-beta1candidate1/examples/src/main/java/kafka/examples
> <
> https://github.com/apache/kafka/tree/0.8.0-beta1-candidate1/examples/src/main/java/kafka/examples
> >
>
> I tried KafkaConsumerProducerDemo and wanted to print what Consumer is
> consuming but its not doing anything (Consumer.java).
>
>
> public void run() {
> System.out.println(name + "(consumer)--> in run");
> Map topicCountMap = new HashMap Integer>();
> topicCountMap.put(topic, new Integer(1));
> Map>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
> KafkaStream stream =  consumerMap.get(topic).get(0);
> ConsumerIterator it = stream.iterator();
> while(it.hasNext()) {
>//
> System.out.println(ExampleUtils.getMessage(it.next().message()));
> System.out.println(name + "(consumer)--> " + new
> String(it.next().message()));
> }
>
> Any suggestion on what I might be doing wrong here?
>
> Thanks,
> Menka
>
> --
> Blog: http://menkag.blogspot.com
> "Don't let what you cannot do interfere with what you can do" - by John
> Wooden
>


Re: kafka.common.OffsetOutOfRangeException

2013-11-18 Thread Oleg Ruchovets
Hi Philip.

   It looks like this is our case:
https://github.com/nathanmarz/storm-contrib/pull/15

It is interesting that the issue is still open ( after more then 1 year) so
I am curious how people able to work on production without ability to
deploy another topology.
Can community please share is this patch resolve the issue and who is using
it on production.

Also question : should I change zookeeper , kafka configuration to resolve
the issue? If yes please share what should be changed.

Thanks
Oleg.



On Tue, Nov 19, 2013 at 11:51 AM, Philip O'Toole  wrote:

> Don't get scared, this if perfectly normal and easily fixed. :-) The second
> topology attempted to fetch messages from an offset in Kafka that does not
> exists. This could happen due to Kafka retention policies (messages
> deleted) or a bug in your code. Your code needs to catch this exception,
> and then ask Kafka for the earliest -- or latest offset (take your pick) --
> and then re-issue the fetch using the returned offset.
>
> Are you using a separate path in ZK for the second topology? It is of a
> completely different nature than the first?
>
> Philip
>
>
>
>
> On Mon, Nov 18, 2013 at 7:40 PM, Oleg Ruchovets  >wrote:
>
> > We are working with kafka  (0.7.2) + storm.
> >1) We deployed 1st topology which subscribed on Kafka topic and it is
> > working fine already couple of weeks.
> > 2) Yesterday we deploy 2nd topology which subscribed on the  same
> Kafka
> > topic , but 2nd topology immediately failed with exception:
> >
> > *What can cause such behavior and how we can resolve the issue: *
> >
> >
> > java.lang.RuntimeException: kafka.common.OffsetOutOfRangeException
> >
> > at
> >
> >
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
> >
> > at
> >
> >
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58)
> >
> > at
> >
> >
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
> >
> > at
> >
> >
> backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(executor.clj:658)
> >
> > at
> > backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
> >
> > at clojure.lang.AFn.run(AFn.java:24)
> >
> > at java.lang.Thread.run(Thread.java:662)
> >
> > Caused by: kafka.common.OffsetOutOfRangeException
> >
> > at
> > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> >
> > at
> >
> >
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
> >
> > at
> >
> >
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
> >
> > at
> > java.lang.reflect.Constructor.newInstance(Constructor.java:513)
> >
> > at java.lang.Class.newInstance0(Class.java:355)
> >
> > at java.lang.Class.newInstance(Class.java:308)
> >
> > at
> > kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53)
> >
> > at
> >
> >
> kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99)
> >
> > at
> >
> kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82)
> >
> > at
> >
> >
> kafka.javaapi.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:51)
> >
> > at
> >
> >
> kafka.javaapi.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:50)
> >
> > at
> > storm.kafka.KafkaUtils.emitPartitionBatchNew(KafkaUtils.java:36)
> >
> > at
> >
> >
> storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:75)
> >
> > at
> >
> >
> storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:64)
> >
> > at
> >
> >
> backtype.storm.transactional.partitioned.OpaquePartitionedTransactionalSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTransactionalSpoutExecutor.java:90)
> >
> > at
> >
> >
> backtype.storm.transactional.TransactionalSpoutBatchExecutor.execute(TransactionalSpoutBatchExecutor.java:47)
> >
> > at
> >
> >
> backtype.storm.coordination.CoordinatedBolt.execute(CoordinatedBolt.java:307)
> >
> > at
> >
> >
> backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke(executor.clj:566)
> >
> > at
> >
> >
> backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(executor.clj:345)
> >
> > at
> >
> >
> backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disruptor.clj:43)
> >
> > at
> >
> >
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:79)
> >
> > ... 6 more
> >
> > 

ActiveControllerCount is always will be either 0 or 1 in 3 nodes kafka cluster?

2013-11-18 Thread Monika Garg
Hi,

there is one Mbean in jconsole with name
"kafka.controller":type="KafkaController",name="ActiveControllerCount",

the Value of this Mbean is always 1 on one broker and 0 on all others
remaining broker of the 3 nodes kafka cluster.Will it always be like this?

As per my understanding it will always be 1 on one and only one broker as
there will be only one active controller in the cluster at any point of
time.

So it should be "kafka.controller":type="KafkaController",name="
*ActiveController*" and it sholud be a boolean value.

Please correct if I am wrong.

-- 
*Moniii*


Re: ActiveControllerCount is always will be either 0 or 1 in 3 nodes kafka cluster?

2013-11-18 Thread Neha Narkhede
There should only be one controller at any point of time in a Kafka
cluster. If that controller broker is bounced, then the controller moves to
another broker. In general, you want to alert if the addition of the mbean
values for ActiveControllerCount across all brokers in a cluster != 1.

Thanks,
Neha


On Mon, Nov 18, 2013 at 11:16 PM, Monika Garg  wrote:

> Hi,
>
> there is one Mbean in jconsole with name
> "kafka.controller":type="KafkaController",name="ActiveControllerCount",
>
> the Value of this Mbean is always 1 on one broker and 0 on all others
> remaining broker of the 3 nodes kafka cluster.Will it always be like this?
>
> As per my understanding it will always be 1 on one and only one broker as
> there will be only one active controller in the cluster at any point of
> time.
>
> So it should be "kafka.controller":type="KafkaController",name="
> *ActiveController*" and it sholud be a boolean value.
>
> Please correct if I am wrong.
>
> --
> *Moniii*
>


Producer reaches a max of 7Mbps

2013-11-18 Thread Abhinav Anand
Hi,
 I am using kafka producer and broker for a production setup. The expected
producer output is 20MBps but I am only getting max of 8MBps. I have
verified that we are losing packets by directly connecting to the data
source through TCP though the metrics is not reflecting any loss.
I went through the performance page where  it can reach a speed of 50MBps.
Please look at the config and suggest if there is some configuration
improvement i can do.

*** *Message Size* ***
Message size = 3KB

* Producer Config 
producer.type = async
queue.buffering.max.ms = 100
queue.buffering.max.messages = 4000
request.timeout.ms = 3
batch.num.messages = 200

 Broker Config* ***

num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=2097152
socket.request.max.bytes=104857600
log.dir=/data1/kafka/logs
num.partitions=1
log.flush.interval.messages=1000
log.flush.interval.ms=300
log.retention.hours=48
log.retention.bytes=107374182400
log.segment.bytes=536870912
log.cleanup.interval.mins=1
zookeeper.connect=dare-msgq00:2181,dare-msgq01:2181,dare-msgq02:2181
zookeeper.connection.timeout.ms=100

-- 
Abhinav Anand