unable to create topic

2014-12-08 Thread kishore kumar
I have running zookeeper 3.4.5-cdh5.2.0 cluster on 3 nodes and managing
with cm5.2, Integrated kafka with "
http://www.cloudera.com/content/cloudera/en/developers/home/cloudera-labs/apache-kafka.html";,


The problem is, when I try to create a topic with



*# bin/kafka-topics.sh --create --zookeeper localhost:2181
--replication-factor 1 --partitions 1 --topic test*



*the error is *



*Nonode for /brokers/ids*


*I created the path /brokers/ids/0,1,2 in zookeeper manually, still
the problem is same.*




*How to get rid from this, any help ?*


*Thanks,*


*Kishore.*


Re: unable to create topic

2014-12-08 Thread Joe Stein
Cloudera Manager utilizes the chroot znode structures so you should connect
with --zookeeper localhost:2181/kafka

Or whatever value CM has set for the chroot path of your installation.

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop 
/

On Mon, Dec 8, 2014 at 4:04 AM, kishore kumar  wrote:

> I have running zookeeper 3.4.5-cdh5.2.0 cluster on 3 nodes and managing
> with cm5.2, Integrated kafka with "
>
> http://www.cloudera.com/content/cloudera/en/developers/home/cloudera-labs/apache-kafka.html
> ",
>
>
> The problem is, when I try to create a topic with
>
>
>
> *# bin/kafka-topics.sh --create --zookeeper localhost:2181
> --replication-factor 1 --partitions 1 --topic test*
>
>
>
> *the error is *
>
>
>
> *Nonode for /brokers/ids*
>
>
> *I created the path /brokers/ids/0,1,2 in zookeeper manually, still
> the problem is same.*
>
>
>
>
> *How to get rid from this, any help ?*
>
>
> *Thanks,*
>
>
> *Kishore.*
>


Increased CPU usage with 0.8.2-beta

2014-12-08 Thread Mathias Söderberg
Good day,

I upgraded a Kafka cluster from v0.8.1.1 to v0.8.2-beta and noticed that
the CPU usage on the broker machines went up by roughly 40%, from ~60% to
~100% and am wondering if anyone else has experienced something similar?
The load average also went up by 2x-3x.

We're running on EC2 and the cluster currently consists of four m1.xlarge,
with roughly 1100 topics / 4000 partitions. Using Java 7 (1.7.0_65 to be
exact) and Scala 2.9.2. Configurations can be found over here:
https://gist.github.com/mthssdrbrg/7df34a795e07eef10262.

I'm assuming that this is not expected behaviour for 0.8.2-beta?

Best regards,
Mathias


In what condition does getoffsetbefore (latest) not return latest offset?

2014-12-08 Thread Helin Xiang
Hi,

We have currently upgraded our kafka cluster from 0.7.2 to 0.8.1.1.

In one of our application, we want to get all partitions' latest offsets,
so we use getoffsetbefore java API (latest).

We believe at some time, 1 of the partition's latest offset we got is much
smaller than its real latest offset,(we saw in the application's log that
the partition's offset is much smaller than other partitions'). Since the
real data file of that partition was already deleted, we cannot prove our
guess, but we found some clue in the kafka's application log which helps us
to conclude that the partition's latest offset at that moment did have a
much larger number.

some additional useful information: the partition have 1 additional
replica(follower), and at that time, it was not synced with the leader
partition(far away behind the leader).

Does any one have the same issue? In what condition could lead to this
situation?

Thanks.

-- 


*Best Regards向河林*


Re: In what condition does getoffsetbefore (latest) not return latest offset?

2014-12-08 Thread Helin Xiang
1 additional information we found in the kafka’s application log since the
MAGIC time:

2014-12-04 09:59:36,726 [kafka-scheduler-2] INFO
kafka.cluster.Partition  - Partition [a.s.3,26] on broker 5: Shrinking
ISR for partition [a.s.3,26] from 5,4 to 5
2014-12-04 09:59:36,728 [kafka-scheduler-2] ERROR kafka.utils.ZkUtils$
 - Conditional update of path
/brokers/topics/a.s.3/partitions/26/state with data
{"controller_epoch":2,"leader":5,"version":1,"leader_epoch":4,"isr":[5]}
and expected version 675 failed due to
org.apache.zookeeper.KeeperException$BadVersionException:
KeeperErrorCode = BadVersion for
/brokers/topics/a.s.3/partitions/26/state

​

On Mon, Dec 8, 2014 at 6:59 PM, Helin Xiang  wrote:

> Hi,
>
> We have currently upgraded our kafka cluster from 0.7.2 to 0.8.1.1.
>
> In one of our application, we want to get all partitions' latest offsets,
> so we use getoffsetbefore java API (latest).
>
> We believe at some time, 1 of the partition's latest offset we got is much
> smaller than its real latest offset,(we saw in the application's log that
> the partition's offset is much smaller than other partitions'). Since the
> real data file of that partition was already deleted, we cannot prove our
> guess, but we found some clue in the kafka's application log which helps us
> to conclude that the partition's latest offset at that moment did have a
> much larger number.
>
> some additional useful information: the partition have 1 additional
> replica(follower), and at that time, it was not synced with the leader
> partition(far away behind the leader).
>
> Does any one have the same issue? In what condition could lead to this
> situation?
>
> Thanks.
>
> --
>
>
> *Best Regards向河林*
>



-- 


*Best Regards向河林*


Please subscribe for the group

2014-12-08 Thread ajay mittal
Please subscribe for the group
Ajay


How to Setup MirrorMaker in Generalized way

2014-12-08 Thread Madhukar Bharti
Hi,

I am going to setup Kafka clusters having 3 brokers in Datacenter 1. Topics
can be created time to time. Each topic can have varying partitions mostly
1,10 or 20. Each application might have different partitioning algorithm
that we don't know(let it be hidden from ops team).

We want to setup mirror maker tool in such a way so that, the exact
partitioned data should go to the same partition without knowing the Topics
partition logic and it should be *generalized*. [This should be common for
all Topics.]

*like partition 0 at DataCenter1 should be exact mirror of  partition-0 in
Datacenter2*.

Please suggest me a solution for doing so. If MirrorMaker
 tool
provide any configurations which solve this use-case please let me know.



Regards,
Madhukar Bharti


RE: Reading only the latest message

2014-12-08 Thread Orelowitz, David
Neha,

This seems to return the offset of the next message that will be published. If 
I fetch at that offset I will block until a new message is published to that 
partition.

I am actually trying to read the contents of the latest message in the 
partition, and based on info in the message resubscribe to the data source.

-Original Message-
From: Neha Narkhede [mailto:n...@confluent.io] 
Sent: Friday, December 05, 2014 8:33 PM
To: users@kafka.apache.org
Subject: Re: Reading only the latest message

You can use the getOffsetsBefore() API and specify -1L to get the offset of the 
last committed message (at the time of the request) for that partition.

On Fri, Dec 5, 2014 at 12:42 PM, Orelowitz, David 
wrote:

> What is the best mechanism to retrieve the latest message from a kafka 
> partition.
>
> We intend for our producer, on startup or recovery, to read the 
> upstream sequence number in the last message in the partition and 
> request for the upstream system to start sending from that sequence number++.
>
> Currently we are creating a SimpleConsumer and then calling
> getOffsetBefore() using the current wall time. We then decrement the 
> offset returned and retrieve the message at this offset. We do manage 
> the case when the offset is zero.
>
> It seem to work!
>
> Is this the right approach.
>
> Thanks,
> David
>
>
> --
> This message, and any attachments, is for the intended recipient(s) 
> only, may contain information that is privileged, confidential and/or 
> proprietary and subject to important terms and conditions available at
> http://www.bankofamerica.com/emaildisclaimer.   If you are not the
> intended recipient, please delete this message.
>



--
Thanks,
Neha

--
This message, and any attachments, is for the intended recipient(s) only, may 
contain information that is privileged, confidential and/or proprietary and 
subject to important terms and conditions available at 
http://www.bankofamerica.com/emaildisclaimer.   If you are not the intended 
recipient, please delete this message.


Re: Reading only the latest message

2014-12-08 Thread Neha Narkhede
The returned latest offset - 1 will be the offset of the last message.
Sorry, should've made it clear in my last email. Let me know if that helps.

On Mon, Dec 8, 2014 at 8:32 AM, Orelowitz, David 
wrote:

> Neha,
>
> This seems to return the offset of the next message that will be
> published. If I fetch at that offset I will block until a new message is
> published to that partition.
>
> I am actually trying to read the contents of the latest message in the
> partition, and based on info in the message resubscribe to the data source.
>
> -Original Message-
> From: Neha Narkhede [mailto:n...@confluent.io]
> Sent: Friday, December 05, 2014 8:33 PM
> To: users@kafka.apache.org
> Subject: Re: Reading only the latest message
>
> You can use the getOffsetsBefore() API and specify -1L to get the offset
> of the last committed message (at the time of the request) for that
> partition.
>
> On Fri, Dec 5, 2014 at 12:42 PM, Orelowitz, David <
> david.orelow...@baml.com>
> wrote:
>
> > What is the best mechanism to retrieve the latest message from a kafka
> > partition.
> >
> > We intend for our producer, on startup or recovery, to read the
> > upstream sequence number in the last message in the partition and
> > request for the upstream system to start sending from that sequence
> number++.
> >
> > Currently we are creating a SimpleConsumer and then calling
> > getOffsetBefore() using the current wall time. We then decrement the
> > offset returned and retrieve the message at this offset. We do manage
> > the case when the offset is zero.
> >
> > It seem to work!
> >
> > Is this the right approach.
> >
> > Thanks,
> > David
> >
> >
> > --
> > This message, and any attachments, is for the intended recipient(s)
> > only, may contain information that is privileged, confidential and/or
> > proprietary and subject to important terms and conditions available at
> > http://www.bankofamerica.com/emaildisclaimer.   If you are not the
> > intended recipient, please delete this message.
> >
>
>
>
> --
> Thanks,
> Neha
>
> --
> This message, and any attachments, is for the intended recipient(s) only,
> may contain information that is privileged, confidential and/or proprietary
> and subject to important terms and conditions available at
> http://www.bankofamerica.com/emaildisclaimer.   If you are not the
> intended recipient, please delete this message.
>



-- 
Thanks,
Neha


RE: Reading only the latest message

2014-12-08 Thread Orelowitz, David
Neha,

This is not what I am seeing,  -1 seems to return the offset of the next 
message that will be published to the partition.

If I subtract 1 from the offset, then I get the offset of the last message, and 
can fetch that message and read it.


From: Neha Narkhede [mailto:n...@confluent.io]
Sent: Monday, December 08, 2014 12:43 PM
To: Orelowitz, David
Cc: users@kafka.apache.org
Subject: Re: Reading only the latest message

The returned latest offset - 1 will be the offset of the last message. Sorry, 
should've made it clear in my last email. Let me know if that helps.

On Mon, Dec 8, 2014 at 8:32 AM, Orelowitz, David 
mailto:david.orelow...@baml.com>> wrote:
Neha,

This seems to return the offset of the next message that will be published. If 
I fetch at that offset I will block until a new message is published to that 
partition.

I am actually trying to read the contents of the latest message in the 
partition, and based on info in the message resubscribe to the data source.

-Original Message-
From: Neha Narkhede [mailto:n...@confluent.io]
Sent: Friday, December 05, 2014 8:33 PM
To: users@kafka.apache.org
Subject: Re: Reading only the latest message

You can use the getOffsetsBefore() API and specify -1L to get the offset of the 
last committed message (at the time of the request) for that partition.

On Fri, Dec 5, 2014 at 12:42 PM, Orelowitz, David 
mailto:david.orelow...@baml.com>>
wrote:

> What is the best mechanism to retrieve the latest message from a kafka
> partition.
>
> We intend for our producer, on startup or recovery, to read the
> upstream sequence number in the last message in the partition and
> request for the upstream system to start sending from that sequence number++.
>
> Currently we are creating a SimpleConsumer and then calling
> getOffsetBefore() using the current wall time. We then decrement the
> offset returned and retrieve the message at this offset. We do manage
> the case when the offset is zero.
>
> It seem to work!
>
> Is this the right approach.
>
> Thanks,
> David
>
>
> --
> This message, and any attachments, is for the intended recipient(s)
> only, may contain information that is privileged, confidential and/or
> proprietary and subject to important terms and conditions available at
> http://www.bankofamerica.com/emaildisclaimer.   If you are not the
> intended recipient, please delete this message.
>



--
Thanks,
Neha

--
This message, and any attachments, is for the intended recipient(s) only, may 
contain information that is privileged, confidential and/or proprietary and 
subject to important terms and conditions available at 
http://www.bankofamerica.com/emaildisclaimer.   If you are not the intended 
recipient, please delete this message.



--
Thanks,
Neha

--
This message, and any attachments, is for the intended recipient(s) only, may 
contain information that is privileged, confidential and/or proprietary and 
subject to important terms and conditions available at 
http://www.bankofamerica.com/emaildisclaimer.   If you are not the intended 
recipient, please delete this message.


Re: How to Setup MirrorMaker in Generalized way

2014-12-08 Thread Neha Narkhede
Hi Madhukar,

>From the same documentation link you referred to -

The source and destination clusters are completely independent entities:
> they can have different numbers of partitions and the offsets will not be
> the same. For this reason the mirror cluster is not really intended as a
> fault-tolerance mechanism (as the consumer position will be different); for
> that we recommend using normal in-cluster replication. The mirror maker
> process will, however, retain and use the message key for partitioning so
> order is preserved on a per-key basis.


There is no way to setup an *exact* Kafka mirror yet.

Thanks,
Neha

On Mon, Dec 8, 2014 at 7:47 AM, Madhukar Bharti 
wrote:

> Hi,
>
> I am going to setup Kafka clusters having 3 brokers in Datacenter 1. Topics
> can be created time to time. Each topic can have varying partitions mostly
> 1,10 or 20. Each application might have different partitioning algorithm
> that we don't know(let it be hidden from ops team).
>
> We want to setup mirror maker tool in such a way so that, the exact
> partitioned data should go to the same partition without knowing the Topics
> partition logic and it should be *generalized*. [This should be common for
> all Topics.]
>
> *like partition 0 at DataCenter1 should be exact mirror of  partition-0 in
> Datacenter2*.
>
> Please suggest me a solution for doing so. If MirrorMaker
>  tool
> provide any configurations which solve this use-case please let me know.
>
>
>
> Regards,
> Madhukar Bharti
>



-- 
Thanks,
Neha


Re: Increased CPU usage with 0.8.2-beta

2014-12-08 Thread Neha Narkhede
Thanks for reporting the issue. Would you mind running hprof and sending
the output?

On Mon, Dec 8, 2014 at 1:25 AM, Mathias Söderberg <
mathias.soederb...@gmail.com> wrote:

> Good day,
>
> I upgraded a Kafka cluster from v0.8.1.1 to v0.8.2-beta and noticed that
> the CPU usage on the broker machines went up by roughly 40%, from ~60% to
> ~100% and am wondering if anyone else has experienced something similar?
> The load average also went up by 2x-3x.
>
> We're running on EC2 and the cluster currently consists of four m1.xlarge,
> with roughly 1100 topics / 4000 partitions. Using Java 7 (1.7.0_65 to be
> exact) and Scala 2.9.2. Configurations can be found over here:
> https://gist.github.com/mthssdrbrg/7df34a795e07eef10262.
>
> I'm assuming that this is not expected behaviour for 0.8.2-beta?
>
> Best regards,
> Mathias
>



-- 
Thanks,
Neha


Re: Producer can writes to a follower during preferred lead election?

2014-12-08 Thread Gwen Shapira
I think that A will not be able to become a follower until B becomes a leader.

On Sun, Dec 7, 2014 at 11:07 AM, Xiaoyu Wang  wrote:
> On preferred replica election, controller sends LeaderAndIsr requests to
> brokers. Broker will handle the LeaderAndIsr request by either become a
> leader or become a follower.
>
> In the previous case, when A receive the call, it will try to become the
> leader and stop fetching from B; when B receive the call, it will try to
> become a follower and stop receiving new requests. Is it possible that A
> stops fetching before B stops receiving new requests? If this is possible,
> there still may be messages goes to B but not A, right?
>
> On Sun, Dec 7, 2014 at 7:20 AM, Thunder Stumpges 
> wrote:
>
>> In this case B will return "not leader for partition" error as soon as the
>> leader is re-elected and I imagine the producer will correct itself.
>>
>> -Thunder
>>
>>
>> -Original Message-
>> From: Xiaoyu Wang [xw...@rocketfuel.com]
>> Received: Saturday, 06 Dec 2014, 6:49PM
>> To: users@kafka.apache.org [users@kafka.apache.org]
>> Subject: Producer can writes to a follower during preferred lead election?
>>
>> Hello,
>>
>> I am looking at producer code and found that producer updates its
>> broker/partition info under the two conditions
>>
>>1. has reached the topicMetadataRefreshInterval
>>2. failed sending message, before retry
>>
>> So, assume we have broker A and B, B is the current lead and A is the
>> preferred lead and a producer is publishing to B. If someone execute
>> preferred lead election command now, A will become the new lead and the
>> producer won't know the lead is now A and will still writes to B until the
>> metadata refresh interval has been reached. Is this correct? Or did I did
>> miss anything.
>>
>>
>> Thanks.
>>


Re: Producer can writes to a follower during preferred lead election?

2014-12-08 Thread Guozhang Wang
Xiaoyu,

I think your question is whether the following sequence can happen?

1. A received becomes-leader, stop fetching from B.
2. B received a producer request, and accepts it and append to log.
3. B received becomes-follower, cap its log and start fetching from A, and
hence the message sent on step 2) will be lost even the producer gets acked
from B.

Since controller sends the leaderAndIsr request to all the replicas (both
leader and followers) at the same time this is possible to happen with ack
<=1. With ack > 1 since the produce request will not be returned to the
producer until it has been replicated the above scenario will not happen.

Guozhang

On Mon, Dec 8, 2014 at 11:29 AM, Gwen Shapira  wrote:

> I think that A will not be able to become a follower until B becomes a
> leader.
>
> On Sun, Dec 7, 2014 at 11:07 AM, Xiaoyu Wang  wrote:
> > On preferred replica election, controller sends LeaderAndIsr requests to
> > brokers. Broker will handle the LeaderAndIsr request by either become a
> > leader or become a follower.
> >
> > In the previous case, when A receive the call, it will try to become the
> > leader and stop fetching from B; when B receive the call, it will try to
> > become a follower and stop receiving new requests. Is it possible that A
> > stops fetching before B stops receiving new requests? If this is
> possible,
> > there still may be messages goes to B but not A, right?
> >
> > On Sun, Dec 7, 2014 at 7:20 AM, Thunder Stumpges 
> > wrote:
> >
> >> In this case B will return "not leader for partition" error as soon as
> the
> >> leader is re-elected and I imagine the producer will correct itself.
> >>
> >> -Thunder
> >>
> >>
> >> -Original Message-
> >> From: Xiaoyu Wang [xw...@rocketfuel.com]
> >> Received: Saturday, 06 Dec 2014, 6:49PM
> >> To: users@kafka.apache.org [users@kafka.apache.org]
> >> Subject: Producer can writes to a follower during preferred lead
> election?
> >>
> >> Hello,
> >>
> >> I am looking at producer code and found that producer updates its
> >> broker/partition info under the two conditions
> >>
> >>1. has reached the topicMetadataRefreshInterval
> >>2. failed sending message, before retry
> >>
> >> So, assume we have broker A and B, B is the current lead and A is the
> >> preferred lead and a producer is publishing to B. If someone execute
> >> preferred lead election command now, A will become the new lead and the
> >> producer won't know the lead is now A and will still writes to B until
> the
> >> metadata refresh interval has been reached. Is this correct? Or did I
> did
> >> miss anything.
> >>
> >>
> >> Thanks.
> >>
>



-- 
-- Guozhang


Re: In what condition does getoffsetbefore (latest) not return latest offset?

2014-12-08 Thread Guozhang Wang
Helin,

Is there a leader movement just before the get latest offset call? If your
follower is not synced and it then becomes the leader due to some reason,
it will not have the complete partition data.

Guozhang

On Mon, Dec 8, 2014 at 3:02 AM, Helin Xiang  wrote:

> 1 additional information we found in the kafka’s application log since the
> MAGIC time:
>
> 2014-12-04 09:59:36,726 [kafka-scheduler-2] INFO
> kafka.cluster.Partition  - Partition [a.s.3,26] on broker 5: Shrinking
> ISR for partition [a.s.3,26] from 5,4 to 5
> 2014-12-04 09:59:36,728 [kafka-scheduler-2] ERROR kafka.utils.ZkUtils$
>  - Conditional update of path
> /brokers/topics/a.s.3/partitions/26/state with data
> {"controller_epoch":2,"leader":5,"version":1,"leader_epoch":4,"isr":[5]}
> and expected version 675 failed due to
> org.apache.zookeeper.KeeperException$BadVersionException:
> KeeperErrorCode = BadVersion for
> /brokers/topics/a.s.3/partitions/26/state
>
> ​
>
> On Mon, Dec 8, 2014 at 6:59 PM, Helin Xiang  wrote:
>
> > Hi,
> >
> > We have currently upgraded our kafka cluster from 0.7.2 to 0.8.1.1.
> >
> > In one of our application, we want to get all partitions' latest offsets,
> > so we use getoffsetbefore java API (latest).
> >
> > We believe at some time, 1 of the partition's latest offset we got is
> much
> > smaller than its real latest offset,(we saw in the application's log that
> > the partition's offset is much smaller than other partitions'). Since the
> > real data file of that partition was already deleted, we cannot prove our
> > guess, but we found some clue in the kafka's application log which helps
> us
> > to conclude that the partition's latest offset at that moment did have a
> > much larger number.
> >
> > some additional useful information: the partition have 1 additional
> > replica(follower), and at that time, it was not synced with the leader
> > partition(far away behind the leader).
> >
> > Does any one have the same issue? In what condition could lead to this
> > situation?
> >
> > Thanks.
> >
> > --
> >
> >
> > *Best Regards向河林*
> >
>
>
>
> --
>
>
> *Best Regards向河林*
>



-- 
-- Guozhang


Re: Is Kafka documentation regarding null key misleading?

2014-12-08 Thread Guozhang Wang
Hi Yury,

Originally the producer behavior under null-key is "random" random, but
later changed to this "periodic" random to reduce the number of sockets on
the server side: imagine if you have n brokers and m producers where m >>>
n, with random random distribution each server will need to maintain a
socket with each of the m producers.

We realized that this change IS misleading and we have changed back to
random random in the new producer released in 0.8.2.


Guozhang

On Fri, Dec 5, 2014 at 10:43 AM, Andrew Jorgensen <
ajorgen...@twitter.com.invalid> wrote:

> If you look under Producer configs you see the following key ‘
> topic.metadata.refresh.interval.ms’ with a default of 600 * 1000 (10
> minutes). It is not entirely clear but this controls how often a producer
> will a null key partitioner will switch partitions that it is writing to.
> In my production app I set this down to 1 minute and haven’t seen any ill
> effects but it is good to note that the shorter you get *could* cause some
> issues and extra overhead. I agree this could probably be a little more
> clear in the documentation.
> -
> Andrew Jorgensen
> @ajorgensen
>
> On December 5, 2014 at 1:34:00 PM, Yury Ruchin (yuri.ruc...@gmail.com)
> wrote:
>
> Hello,
>
> I've come across a (seemingly) strange situation when my Kafka producer
> gave so uneven distribution across partitions. I found that I used null key
> to produce messages, guided by the following clause in the documentation:
> "If the key is null, then a random broker partition is picked." However,
> after looking at the code, I found that the broker partition is not truly
> random for every message - instead, the randomly picked partition number
> sticks and only refreshes after the topic.metadata.refresh.ms expires,
> which is 10 minutes by default. So, with null key the producer keeps
> writing to the same partition for 10 minutes.
>
> Is my understanding of partitioning with null key correct? If yes,
> shouldn't the documentation be fixed then to explicitly describe the sticky
> pseudo-random partition assignment?
>
> Thanks,
> Yury
>



-- 
-- Guozhang


Re: Producer can writes to a follower during preferred lead election?

2014-12-08 Thread Xiaoyu Wang
Guozhang, thanks for the super clear answer. Yes, that's the scenario I was
wondering.

On Mon, Dec 8, 2014 at 1:01 PM, Guozhang Wang  wrote:

> Xiaoyu,
>
> I think your question is whether the following sequence can happen?
>
> 1. A received becomes-leader, stop fetching from B.
> 2. B received a producer request, and accepts it and append to log.
> 3. B received becomes-follower, cap its log and start fetching from A, and
> hence the message sent on step 2) will be lost even the producer gets acked
> from B.
>
> Since controller sends the leaderAndIsr request to all the replicas (both
> leader and followers) at the same time this is possible to happen with ack
> <=1. With ack > 1 since the produce request will not be returned to the
> producer until it has been replicated the above scenario will not happen.
>
> Guozhang
>
> On Mon, Dec 8, 2014 at 11:29 AM, Gwen Shapira 
> wrote:
>
> > I think that A will not be able to become a follower until B becomes a
> > leader.
> >
> > On Sun, Dec 7, 2014 at 11:07 AM, Xiaoyu Wang 
> wrote:
> > > On preferred replica election, controller sends LeaderAndIsr requests
> to
> > > brokers. Broker will handle the LeaderAndIsr request by either become a
> > > leader or become a follower.
> > >
> > > In the previous case, when A receive the call, it will try to become
> the
> > > leader and stop fetching from B; when B receive the call, it will try
> to
> > > become a follower and stop receiving new requests. Is it possible that
> A
> > > stops fetching before B stops receiving new requests? If this is
> > possible,
> > > there still may be messages goes to B but not A, right?
> > >
> > > On Sun, Dec 7, 2014 at 7:20 AM, Thunder Stumpges 
> > > wrote:
> > >
> > >> In this case B will return "not leader for partition" error as soon as
> > the
> > >> leader is re-elected and I imagine the producer will correct itself.
> > >>
> > >> -Thunder
> > >>
> > >>
> > >> -Original Message-
> > >> From: Xiaoyu Wang [xw...@rocketfuel.com]
> > >> Received: Saturday, 06 Dec 2014, 6:49PM
> > >> To: users@kafka.apache.org [users@kafka.apache.org]
> > >> Subject: Producer can writes to a follower during preferred lead
> > election?
> > >>
> > >> Hello,
> > >>
> > >> I am looking at producer code and found that producer updates its
> > >> broker/partition info under the two conditions
> > >>
> > >>1. has reached the topicMetadataRefreshInterval
> > >>2. failed sending message, before retry
> > >>
> > >> So, assume we have broker A and B, B is the current lead and A is the
> > >> preferred lead and a producer is publishing to B. If someone execute
> > >> preferred lead election command now, A will become the new lead and
> the
> > >> producer won't know the lead is now A and will still writes to B until
> > the
> > >> metadata refresh interval has been reached. Is this correct? Or did I
> > did
> > >> miss anything.
> > >>
> > >>
> > >> Thanks.
> > >>
> >
>
>
>
> --
> -- Guozhang
>


Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-08 Thread Jun Rao
Ok, based on all the feedbacks that we have heard, I plan to do the
following.

1. Keep the generic api in KAFKA-1797.
2. Add a new constructor in Producer/Consumer that takes the key and the
value serializer instance.
3. Have KAFKA-1797 reviewed and checked into 0.8.2 and trunk.

This will make it easy for people to reuse common serializers while at the
same time allow people to use the byte array api if one chooses to do so.

I plan to make those changes in the next couple of days unless someone
strongly objects.

Thanks,

Jun


On Fri, Dec 5, 2014 at 5:46 PM, Jiangjie Qin 
wrote:

> Hi Jun,
>
> Thanks for pointing out this. Yes, putting serialization/deserialization
> code into record does lose some flexibility. Some more thinking, I think
> no matter what we do to bind the producer and serializer/deserializer, we
> can always to the same thing on Record, i.e. We can also have some
> constructor like ProducerRecor, Deserializer>. The
> downside of this is that we could potentially have a
> serializer/deserializer instance for each record (that's actually the very
> reason that I propose to put the code in record). This problem could be
> addressed by either using a singleton class or factory for
> serializer/deserializer library. But it might be a little bit complicated
> and we are not able to enforce that to external library either. So it
> seems only make sense if we really want to:
> 1. Have a single simple producer interface.
> AND
> 2. use a single producer send all type of messages
>
> I'm not sure if these requirement are strong enough to make us take the
> complexity of singleton/factory class serializer/deserializer library.
>
> Thanks.
>
> Jiangjie (Becket) Qin
>
> On 12/5/14, 3:16 PM, "Jun Rao"  wrote:
>
> >Jiangjie,
> >
> >The issue with adding the serializer in ProducerRecord is that you need to
> >implement all combinations of serializers for key and value. So, instead
> >of
> >just implementing int and string serializers, you will have to implement
> >all 4 combinations.
> >
> >Adding a new producer constructor like Producer(KeySerializer,
> >ValueSerializer, Properties properties) can be useful.
> >
> >Thanks,
> >
> >Jun
> >
> >On Thu, Dec 4, 2014 at 10:33 AM, Jiangjie Qin 
> >wrote:
> >
> >>
> >> I'm just thinking instead of binding serialization with producer,
> >>another
> >> option is to bind serializer/deserializer with
> >> ProducerRecord/ConsumerRecord (please see the detail proposal below.)
> >>The arguments for this option is:
> >> A. A single producer could send different message types. There
> >>are
> >> several use cases in LinkedIn for per record serializer
> >> - In Samza, there are some in-stream order-sensitive control
> >> messages
> >> having different deserializer from other messages.
> >> - There are use cases which need support for sending both Avro
> >> messages
> >> and raw bytes.
> >> - Some use cases needs to deserialize some Avro messages into
> >> generic
> >> record and some other messages into specific record.
> >> B. In current proposal, the serializer/deserilizer is
> >>instantiated
> >> according to config. Compared with that, binding serializer with
> >> ProducerRecord and ConsumerRecord is less error prone.
> >>
> >>
> >> This option includes the following changes:
> >> A. Add serializer and deserializer interfaces to replace
> >>serializer
> >> instance from config.
> >> Public interface Serializer  {
> >> public byte[] serializeKey(K key);
> >> public byte[] serializeValue(V value);
> >> }
> >> Public interface deserializer  {
> >> Public K deserializeKey(byte[] key);
> >> public V deserializeValue(byte[] value);
> >> }
> >>
> >> B. Make ProducerRecord and ConsumerRecord abstract class
> >> implementing
> >> Serializer  and Deserializer  respectively.
> >> Public abstract class ProducerRecord  implements
> >> Serializer 
> >> {...}
> >> Public abstract class ConsumerRecord  implements
> >> Deserializer  >> V> {...}
> >>
> >> C. Instead of instantiate the serializer/Deserializer from
> >>config,
> >> let
> >> concrete ProducerRecord/ConsumerRecord extends the abstract class and
> >> override the serialize/deserialize methods.
> >>
> >> Public class AvroProducerRecord extends ProducerRecord
> >>  >> GenericRecord> {
> >> ...
> >> @Override
> >> Public byte[] serializeKey(String key) {Š}
> >> @Override
> >> public byte[] serializeValue(GenericRecord
> >>value);
> >> }
> >>
> >> Public class AvroConsumerRecord extends ConsumerRecord
> >>  >> GenericRecord> {
> >> ...
> >> 

Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-08 Thread Sriram Subramanian
Thank you Jay. I agree with the issue that you point w.r.t paired
serializers. I also think having mix serialization types is rare. To get
the current behavior, one can simply use a ByteArraySerializer. This is
best understood by talking with many customers and you seem to have done
that. I am convinced about the change.

For the rest who gave -1 or 0 for this proposal, does the answers for the
three points(updated) below seem reasonable? Are these explanations
convincing? 


1. Can we keep the serialization semantics outside the Producer interface
and have simple bytes in / bytes out for the interface (This is what we
have today).

The points for this is to keep the interface simple and usage easy to
understand. The points against this is that it gets hard to share common
usage patterns around serialization/message validations for the future.

2. Can we create a wrapper producer that does the serialization and have
different variants of it for different data formats?

The points for this is again to keep the main API clean. The points
against this is that it duplicates the API, increases the surface area and
creates redundancy for a minor addition.

3. Do we need to support different data types per record? The current
interface (bytes in/bytes out) lets you instantiate one producer and use
it to send multiple data formats. There seems to be some valid use cases
for this.


Mixed serialization types are rare based on interactions with customers.
To get the current behavior, one can simply use a ByteArraySerializer.

On 12/5/14 5:00 PM, "Jay Kreps"  wrote:

>Hey Sriram,
>
>Thanks! I think this is a very helpful summary.
>
>Let me try to address your point about passing in the serde at send time.
>
>I think the first objection is really to the paired key/value serializer
>interfaces. This leads to kind of a weird combinatorial thing where you
>would have an avro/avro serializer a string/avro serializer, a pb/pb
>serializer, and a string/pb serializer, and so on. But your proposal would
>work as well with separate serializers for key and value.
>
>I think the downside is just the one you call out--that this is a corner
>case and you end up with two versions of all the apis to support it. This
>also makes the serializer api more annoying to implement. I think the
>alternative solution to this case and any other we can give people is just
>configuring ByteArraySerializer which gives you basically the api that you
>have now with byte arrays. If this is incredibly common then this would be
>a silly solution, but I guess the belief is that these cases are rare and
>a
>really well implemented avro or json serializer should be 100% of what
>most
>people need.
>
>In practice the cases that actually mix serialization types in a single
>stream are pretty rare I think just because the consumer then has the
>problem of guessing how to deserialize, so most of these will end up with
>at least some marker or schema id or whatever that tells you how to read
>the data. Arguable this mixed serialization with marker is itself a
>serializer type and should have a serializer of its own...
>
>-Jay
>
>On Fri, Dec 5, 2014 at 3:48 PM, Sriram Subramanian <
>srsubraman...@linkedin.com.invalid> wrote:
>
>> This thread has diverged multiple times now and it would be worth
>> summarizing them.
>>
>> There seems to be the following points of discussion -
>>
>> 1. Can we keep the serialization semantics outside the Producer
>>interface
>> and have simple bytes in / bytes out for the interface (This is what we
>> have today).
>>
>> The points for this is to keep the interface simple and usage easy to
>> understand. The points against this is that it gets hard to share common
>> usage patterns around serialization/message validations for the future.
>>
>> 2. Can we create a wrapper producer that does the serialization and have
>> different variants of it for different data formats?
>>
>> The points for this is again to keep the main API clean. The points
>> against this is that it duplicates the API, increases the surface area
>>and
>> creates redundancy for a minor addition.
>>
>> 3. Do we need to support different data types per record? The current
>> interface (bytes in/bytes out) lets you instantiate one producer and use
>> it to send multiple data formats. There seems to be some valid use cases
>> for this.
>>
>> I have still not seen a strong argument against not having this
>> functionality. Can someone provide their views on why we don't need this
>> support that is possible with the current API?
>>
>> One possible approach for the per record serialization would be to
>>define
>>
>> public interface SerDe {
>>   public byte[] serializeKey();
>>
>>   public K deserializeKey();
>>
>>   public byte[] serializeValue();
>>
>>   public V deserializeValue();
>> }
>>
>> This would be used by both the Producer and the Consumer.
>>
>> The send APIs can then be
>>
>> public Future send(ProducerRecord record);
>> public Future send(ProducerRecord

Re: Partition reassignment reversed

2014-12-08 Thread Jun Rao
Topic deletion doesn't quite work in 0.8.1.1. It's fixed in the upcoming
0.8.2 release.

Thanks,

Jun

On Wed, Dec 3, 2014 at 6:17 PM, Andrew Jorgensen <
ajorgen...@twitter.com.invalid> wrote:

> We are currently running 0.8.1.1, I just double checked. One other thing
> that may be related is I brought up a second kafka cluster today matching
> the first. I noticed that if I deleted a topic and the re-created it with
> the same name when I re-created the topic none of the leader elections
> happened and doing a describe on the topic all of the partitions say
> “Leader: none”. If I bounce the controller process then the leader election
> happens and the partitions get properly assigned. The other interesting
> thing that happened was I found that if I kill -9 the MirrorMaker process
> while its running it seems like the state cannot be properly resumed when
> the MirrorMaker is restarted. Here are what the errors looked like:
>
> [2014-12-04 02:14:01,025] ERROR
> [ConsumerFetcherThread-topic_1__mirror_kafka2009-1417659239211-f603e23f-0-7],
> Current offset 81717 for partition [topic-1,266] out of range; reset
> offset to 823942799 (kafka.consumer.ConsumerFetcherThread)
> [2014-12-04 02:14:01,028] ERROR
> [ConsumerFetcherThread-topic_1_mirror_kafka2009-1417659239211-f603e23f-0-7],
> Current offset 941288745 for partition [topic-1,158] out of range; reset
> offset to 947524545 (kafka.consumer.ConsumerFetcherThread)
>
> These errors are remedied by bouncing the controller process and then
> restarting the MirrorMaker. To me it looks like there is some disconnect
> between the state that exists in zookeeper and what the controller know
> about the world. I can reliably repeat both the MirrorMaker and the topic
> deletion experiments. If there is anything specific I can help to diagnose
> let me know. I can also open a JIRA ticket some these details if it will
> help.
>
> --
> Andrew Jorgensen
> @ajorgensen
>
> On December 3, 2014 at 7:48:20 PM, Jun Rao (jun...@gmail.com) wrote:
>
> Not sure exactly what happened there. We did fix a few bugs in reassigning
> partitions in 0.8.1.1. So, you probably want to upgrade to that one or the
> upcoming 0.8.2 release.
>
> Thanks,
>
> Jun
>
> On Tue, Dec 2, 2014 at 2:33 PM, Andrew Jorgensen 
> wrote:
>
>>  I am using kafka 0.8.
>>  Yes I did run —verify, but got some weird output from it I had never
>> seen before that looked something like:
>>
>>  Status of partition reassignment:
>>  ERROR: Assigned replicas (5,2) don't match the list of replicas for
>> reassignment (5) for partition [topic-1,248]
>>  ERROR: Assigned replicas (7,3) don't match the list of replicas for
>> reassignment (7) for partition [topic-2,228]
>>
>> There were a large number of these but it seems to just be for topic-1,
>> and topic-2. In this case I was migrating around 4 or 5 topics. These two
>> are also the ones that got reversed when I bounced all the processes
>> yesterday.
>>
>> Here are some more logs that I found from that day that may help piece
>> together what might have happened
>>
>>  [2014-11-19 16:56:52,938] ERROR [KafkaApi-1] Error when processing fetch
>> request for partition [topic-2,317] offset 408324093 from follower with
>> correlation id 2458 (kafka.server.KafkaApis)
>>  kafka.common.OffsetOutOfRangeException: Request for offset 408324093 but
>> we only have log segments in the range 409018400 to 425346400.
>>  at kafka.log.Log.read(Log.scala:380)
>>  at
>> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
>>  at
>> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
>>  at
>> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
>>  at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
>>  at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
>>  at 
>> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:17[image:
>> 8)]
>>  at
>> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347)
>>  at
>> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347)
>>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
>>  at scala.collection.immutable.HashMap.map(HashMap.scala:3[image: 8)]
>>  at
>> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
>>  at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:437)
>>  at kafka.server.KafkaApis.handle(KafkaApis.scala:186)
>>  at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
>>  at java.lang.Thread.run(Thread.java:724)
>>
>>  -
>>
>>  [2014-11-19 16:24:37,959] ERROR Conditional update of path
>> /brokers/topics/topic-2/partitions/248/state with data
>> {"controller_epoch":15,"leader":2,"version":1,"leader_epoch":1,"isr":[2,5]}
>> and expected version 1 failed due to
>> org.apache.zookeeper.KeeperException$BadVersio

Re: How to cleanly shut down ConsumerConnector

2014-12-08 Thread Jun Rao
kafka.network.Processor is on the broker. Are you killing the brokers as
well?

Thanks,

Jun

On Thu, Dec 4, 2014 at 5:33 PM, Shannon Lloyd  wrote:

> Hi,
>
> I am using the high-level consumer on 0.8.2-beta. I'm attempting to close a
> ConsumerConnector (actually a handful of connectors), but am not having
> much luck actually getting it to close cleanly. When I call shutdown on the
> connector, I see an error in my application's log (these are always
> IOExceptions in kafka.network.Processor - either Broken Pipe in
> FileDispatcherImpl.write0 or else Connection reset by peer in
> FileDispatcherImpl.read0), but the shutdown call itself does not return
> until the socket.timeout.ms has expired (I've tested this by setting this
> to all sorts of different values and confirmed that shutdown() always
> returns after this timeout, but never before).
>
> I don't know if it matters, but my code that works with the connector is
> running on a separate thread via an ExecutorService (essentially I'm
> consuming with one thread per group/topic combination (yes, one thread for
> all partitions within the topic)).
>
> FWIW, everything else seems to work fine - I can connect, set up the
> KafkaStream, pull down messages etc. It's just the shutting down that
> doesn't seem to be working. The reason I need this to work cleanly is that
> my use case requires me to shut down specific connectors and re-create them
> later, potentially numerous times during the running of my application. I
> could potentially redesign things to keep each connector around after it is
> no longer needed, cache it and re-use it later, but this still doesn't
> solve the problem of how I eventually shut everything down cleanly.
>
> Thanks,
> Shannon
>


Re: Kafka replication factor mode

2014-12-08 Thread Jun Rao
Kafka tolerates 2f failures with 2f+1 replicas by default. What error are
you seeing?

Thanks,

Jun

On Fri, Dec 5, 2014 at 10:01 AM, Ankit Jain 
wrote:

> Hi All,
>
> I have two nodes kafka cluster and using replication factor 2. If both the
> node is running, then i am able to push data, but if any nodes goes down,
> then getting exception. I explored and found that, the default replication
> factor mode is quorum, means at least (f+1) node must be available in a
> cluster if you have 2f+1 replication.
>
> Please suggest the configuration to change the replication mode from
> quorum to plain one (data will insert even only leader will available).
>
> Regards,
> Ankit
>
> 
>
>
>
>
>
>
> NOTE: This message may contain information that is confidential,
> proprietary, privileged or otherwise protected by law. The message is
> intended solely for the named addressee. If received in error, please
> destroy and notify the sender. Any use of this email is prohibited when
> received in error. Impetus does not represent, warrant and/or guarantee,
> that the integrity of this communication has been maintained nor that the
> communication is free of errors, virus, interception or interference.
>


Re: How to cleanly shut down ConsumerConnector

2014-12-08 Thread Shannon Lloyd
Not explicitly. Some additional background might help. I'm running an
integration test using an embedded Kafka cluster and ZK quorum (ie all in
process). In my @Before method I fire up the cluster. In my @After method I
shut the cluster down. When I call shutdown on the connector during my
test, the test is actually erroring out and shutting down the cluster. BUT
even with a try catch(Throwable) around the shutdown call, nothing gets
caught, so something outside this call thread is throwing an error and
killing the test.
On 9 Dec 2014 14:45, "Jun Rao"  wrote:

> kafka.network.Processor is on the broker. Are you killing the brokers as
> well?
>
> Thanks,
>
> Jun
>
> On Thu, Dec 4, 2014 at 5:33 PM, Shannon Lloyd  wrote:
>
> > Hi,
> >
> > I am using the high-level consumer on 0.8.2-beta. I'm attempting to
> close a
> > ConsumerConnector (actually a handful of connectors), but am not having
> > much luck actually getting it to close cleanly. When I call shutdown on
> the
> > connector, I see an error in my application's log (these are always
> > IOExceptions in kafka.network.Processor - either Broken Pipe in
> > FileDispatcherImpl.write0 or else Connection reset by peer in
> > FileDispatcherImpl.read0), but the shutdown call itself does not return
> > until the socket.timeout.ms has expired (I've tested this by setting
> this
> > to all sorts of different values and confirmed that shutdown() always
> > returns after this timeout, but never before).
> >
> > I don't know if it matters, but my code that works with the connector is
> > running on a separate thread via an ExecutorService (essentially I'm
> > consuming with one thread per group/topic combination (yes, one thread
> for
> > all partitions within the topic)).
> >
> > FWIW, everything else seems to work fine - I can connect, set up the
> > KafkaStream, pull down messages etc. It's just the shutting down that
> > doesn't seem to be working. The reason I need this to work cleanly is
> that
> > my use case requires me to shut down specific connectors and re-create
> them
> > later, potentially numerous times during the running of my application. I
> > could potentially redesign things to keep each connector around after it
> is
> > no longer needed, cache it and re-use it later, but this still doesn't
> > solve the problem of how I eventually shut everything down cleanly.
> >
> > Thanks,
> > Shannon
> >
>