Re: Cannot run producer from local windows

2013-10-04 Thread Aniket Bhatnagar
Did you set host.name property as described @
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-OnEC2%2Cwhycan%27tmyhighlevelconsumersconnecttothebrokers%3F
 ?

When accessing brokers from outside AWS, host.name should be set to public
domain/IP. This also means that all brokers would need a public domain/IP.


On 4 October 2013 11:46, Yi Jiang  wrote:

> Hi, everyone
> I am currently installed Kafka cluster in EC2, I am sure all ports have
> been opened, but I want to kick some data into kafka in cloud, and i even
> cannot run the producer from my local. It always throw the exception with
> "failed after 3 retries". But there is no any problem when I run it in the
> cluster in cloud. I have nothing in my local server, no zookeeper etc, I am
> wondering if its the problem that I am missing zookeeper in my local
> windows? Can someone tell me how to resolve this issue?
>
> Sent from my iPhone


Kafka stops accepting messages for existing topics after a hostname change and restart

2013-10-04 Thread Aniket Bhatnagar
Because Kafka was detecting localhost.domain as hostname, I commented out
the line "127.0.0.1 localhost.localdomain localhost" and added
"127.0.0.1 ip-10-0-1-20.localdomain" in etc/hosts. When I restart Kafka
(issues kill -15 pid), writes to existing topics are failing and I see
several of the following errors:

[2013-10-04 03:43:23,565] WARN [KafkaApi-1] Produce request with
correlation id 12868451 from client  on partition
[consumertracking.gnomes.4002,5] failed due to Partition
[consumertracking.gnomes.4002,5] doesn't exist on 1 (kafka.server.KafkaApis)


I have checked the kafka data directory and all files are intact. What
could have caused this issue and how can it be fixed?


RE: Too many open files

2013-10-04 Thread Nicolas Berthet
Hi Mark,

Sorry for the delay. We're not using a load balancer if it's what you mean by 
LB. 

After applying the change I mentioned last time (the netfilter thing), I 
couldn't see any improvement. We even restart kafka, but since the restart, I 
saw connection count slowly getting higher.
  
Best regards,

Nicolas Berthet 


-Original Message-
From: Mark [mailto:static.void@gmail.com] 
Sent: Saturday, September 28, 2013 12:35 AM
To: users@kafka.apache.org
Subject: Re: Too many open files

No, this is all within the same DC. I think the problem has to do with the LB. 
We've upgraded our producers to point directory to a node for testing and after 
running it all night, I don't see any more connections then there are supposed 
to be. 

Can I ask which LB are you using? We are using A10's

On Sep 26, 2013, at 6:41 PM, Nicolas Berthet  wrote:

> Hi Mark,
> 
> I'm using centos 6.2. My file limit is something like 500k, the value is 
> arbitrary.
> 
> One of the thing I changed so far are the TCP keepalive parameters, it had 
> moderate success so far.
> 
> net.ipv4.tcp_keepalive_time
> net.ipv4.tcp_keepalive_intvl
> net.ipv4.tcp_keepalive_probes
> 
> I still notice an abnormal number of ESTABLISHED connections, I've 
> been doing some search and came over this page 
> (http://www.lognormal.com/blog/2012/09/27/linux-tcpip-tuning/)
> 
> I'll change the "net.netfilter.nf_conntrack_tcp_timeout_established" as 
> indicated there, it looks closer to the solution to my issue.
> 
> Are you also experiencing the issue in a cross data center context ? 
> 
> Best regards,
> 
> Nicolas Berthet
> 
> 
> -Original Message-
> From: Mark [mailto:static.void@gmail.com]
> Sent: Friday, September 27, 2013 6:08 AM
> To: users@kafka.apache.org
> Subject: Re: Too many open files
> 
> What OS settings did you change? How high is your huge file limit?
> 
> 
> On Sep 25, 2013, at 10:06 PM, Nicolas Berthet  
> wrote:
> 
>> Jun,
>> 
>> I observed similar kind of things recently. (didn't notice before 
>> because our file limit is huge)
>> 
>> I have a set of brokers in a datacenter, and producers in different data 
>> centers. 
>> 
>> At some point I got disconnections, from the producer perspective I had 
>> something like 15 connections to the broker. On the other hand on the broker 
>> side, I observed hundreds of connections from the producer in an ESTABLISHED 
>> state.
>> 
>> We had some default settings for the socket timeout on the OS level, which 
>> we reduced hoping it would prevent the issue in the future. I'm not sure if 
>> the issue is from the broker or OS configuration though. I'm still keeping 
>> the broker under observation for the time being.
>> 
>> Note that, for clients in the same datacenter, we didn't see this issue, the 
>> socket count matches on both ends.
>> 
>> Nicolas Berthet
>> 
>> -Original Message-
>> From: Jun Rao [mailto:jun...@gmail.com]
>> Sent: Thursday, September 26, 2013 12:39 PM
>> To: users@kafka.apache.org
>> Subject: Re: Too many open files
>> 
>> If a client is gone, the broker should automatically close those broken 
>> sockets. Are you using a hardware load balancer?
>> 
>> Thanks,
>> 
>> Jun
>> 
>> 
>> On Wed, Sep 25, 2013 at 4:48 PM, Mark  wrote:
>> 
>>> FYI if I kill all producers I don't see the number of open files drop. 
>>> I still see all the ESTABLISHED connections.
>>> 
>>> Is there a broker setting to automatically kill any inactive TCP 
>>> connections?
>>> 
>>> 
>>> On Sep 25, 2013, at 4:30 PM, Mark  wrote:
>>> 
 Any other ideas?
 
 On Sep 25, 2013, at 9:06 AM, Jun Rao  wrote:
 
> We haven't seen any socket leaks with the java producer. If you 
> have
>>> lots
> of unexplained socket connections in established mode, one 
> possible
>>> cause
> is that the client created new producer instances, but didn't 
> close the
>>> old
> ones.
> 
> Thanks,
> 
> Jun
> 
> 
> On Wed, Sep 25, 2013 at 6:08 AM, Mark 
>>> wrote:
> 
>> No. We are using the kafka-rb ruby gem producer.
>> https://github.com/acrosa/kafka-rb
>> 
>> Now that you asked that question I need to ask. Is there a 
>> problem with the java producer?
>> 
>> Sent from my iPhone
>> 
>>> On Sep 24, 2013, at 9:01 PM, Jun Rao  wrote:
>>> 
>>> Are you using the java producer client?
>>> 
>>> Thanks,
>>> 
>>> Jun
>>> 
>>> 
 On Tue, Sep 24, 2013 at 5:33 PM, Mark 
 
>> wrote:
 
 Our 0.7.2 Kafka cluster keeps crashing with:
 
 2013-09-24 17:21:47,513 -  [kafka-acceptor:Acceptor@153] - 
 Error in acceptor
java.io.IOException: Too many open
 
 The obvious fix is to bump up the number of open files but I'm
>>> wondering
 if there is a leak on the Kafka side and/or our application 
 side. We currently have the ul

Is 30 a too high partition number?

2013-10-04 Thread Aniket Bhatnagar
I am using kafka as a buffer for data streaming in from various sources.
Since its a time series data, I generate the key to the message by
combining source ID and minute in the timestamp. This means I can utmost
have 60 partitions per topic (as each source has its own topic). I have
set num.partitions to be 30 (60/2) for each topic in broker config. I don't
have a very good reason to pick 30 as default number of partitions per
topic but I wanted it to be a high number so that I can achieve high
parallelism during in-stream processing. I am worried that having a high
number  like 30 (default configuration had it as 2), it can negatively
impact kafka performance in terms of message throughput or memory
consumption. I understand that this can lead to many files per partition
but I am thinking of dealing with it by having multiple directories on the
same disk if at all I run into issues.

My question to the community is that am I prematurely attempting to
optimizing the partition number as right now even a partition number of 5
seems sufficient and hence will run into unwanted issues? Or is 30 an Ok
number to use for number of partitions?


Re: Too many open files

2013-10-04 Thread Florian Weingarten
Hi Nicolas,

we did run into a similar issue here (lots of ESTABLISHED connections on
the brokers, but non on the consumers/producers). In our case, it was a
firewall issue where connections that were inactive for more than a
certain time were silently dropped by the firewall (but no TCP RST was
sent) and only one side of the connection noticed the drop.

Maybe that helps

Flo


On 2013-09-26 9:41 PM, Nicolas Berthet wrote:
> Hi Mark,
> 
> I'm using centos 6.2. My file limit is something like 500k, the value is 
> arbitrary.
> 
> One of the thing I changed so far are the TCP keepalive parameters, it had 
> moderate success so far.
> 
> net.ipv4.tcp_keepalive_time
> net.ipv4.tcp_keepalive_intvl
> net.ipv4.tcp_keepalive_probes
> 
> I still notice an abnormal number of ESTABLISHED connections, I've been doing 
> some search and came over this page 
> (http://www.lognormal.com/blog/2012/09/27/linux-tcpip-tuning/)
> 
> I'll change the "net.netfilter.nf_conntrack_tcp_timeout_established" as 
> indicated there, it looks closer to the solution to my issue.
> 
> Are you also experiencing the issue in a cross data center context ? 
> 
> Best regards,
> 
> Nicolas Berthet 
> 
> 
> -Original Message-
> From: Mark [mailto:static.void@gmail.com] 
> Sent: Friday, September 27, 2013 6:08 AM
> To: users@kafka.apache.org
> Subject: Re: Too many open files
> 
> What OS settings did you change? How high is your huge file limit?
> 
> 
> On Sep 25, 2013, at 10:06 PM, Nicolas Berthet  
> wrote:
> 
>> Jun,
>>
>> I observed similar kind of things recently. (didn't notice before 
>> because our file limit is huge)
>>
>> I have a set of brokers in a datacenter, and producers in different data 
>> centers. 
>>
>> At some point I got disconnections, from the producer perspective I had 
>> something like 15 connections to the broker. On the other hand on the broker 
>> side, I observed hundreds of connections from the producer in an ESTABLISHED 
>> state.
>>
>> We had some default settings for the socket timeout on the OS level, which 
>> we reduced hoping it would prevent the issue in the future. I'm not sure if 
>> the issue is from the broker or OS configuration though. I'm still keeping 
>> the broker under observation for the time being.
>>
>> Note that, for clients in the same datacenter, we didn't see this issue, the 
>> socket count matches on both ends.
>>
>> Nicolas Berthet
>>
>> -Original Message-
>> From: Jun Rao [mailto:jun...@gmail.com]
>> Sent: Thursday, September 26, 2013 12:39 PM
>> To: users@kafka.apache.org
>> Subject: Re: Too many open files
>>
>> If a client is gone, the broker should automatically close those broken 
>> sockets. Are you using a hardware load balancer?
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Wed, Sep 25, 2013 at 4:48 PM, Mark  wrote:
>>
>>> FYI if I kill all producers I don't see the number of open files drop. 
>>> I still see all the ESTABLISHED connections.
>>>
>>> Is there a broker setting to automatically kill any inactive TCP 
>>> connections?
>>>
>>>
>>> On Sep 25, 2013, at 4:30 PM, Mark  wrote:
>>>
 Any other ideas?

 On Sep 25, 2013, at 9:06 AM, Jun Rao  wrote:

> We haven't seen any socket leaks with the java producer. If you 
> have
>>> lots
> of unexplained socket connections in established mode, one possible
>>> cause
> is that the client created new producer instances, but didn't close 
> the
>>> old
> ones.
>
> Thanks,
>
> Jun
>
>
> On Wed, Sep 25, 2013 at 6:08 AM, Mark 
>>> wrote:
>
>> No. We are using the kafka-rb ruby gem producer.
>> https://github.com/acrosa/kafka-rb
>>
>> Now that you asked that question I need to ask. Is there a problem 
>> with the java producer?
>>
>> Sent from my iPhone
>>
>>> On Sep 24, 2013, at 9:01 PM, Jun Rao  wrote:
>>>
>>> Are you using the java producer client?
>>>
>>> Thanks,
>>>
>>> Jun
>>>
>>>
 On Tue, Sep 24, 2013 at 5:33 PM, Mark 
 
>> wrote:

 Our 0.7.2 Kafka cluster keeps crashing with:

 2013-09-24 17:21:47,513 -  [kafka-acceptor:Acceptor@153] - Error 
 in acceptor
 java.io.IOException: Too many open

 The obvious fix is to bump up the number of open files but I'm
>>> wondering
 if there is a leak on the Kafka side and/or our application 
 side. We currently have the ulimit set to a generous 4096 but 
 obviously we are hitting this ceiling. What's a recommended value?

 We are running rails and our Unicorn workers are connecting to 
 our
>>> Kafka
 cluster via round-robin load balancing. We have about 1500 
 workers to
>> that
 would be 1500 connections right there but they should be split 
 across
>> our 3
 nodes. Instead Netstat shows thousands of connections that look 
 like
>> this:

>>

Re: Kafka stops accepting messages for existing topics after a hostname change and restart

2013-10-04 Thread Neha Narkhede
When a broker starts up, it receives a LeaderAndIsrRequest from the
controller broker telling the broker which partitions it should host and
either lead or follow those partitions. If clients send requests to the
broker before it has received this request, it throws this error you see.
Did you restart the broker very quickly?

You can mitigate this issue by using controlled shutdown to stop a broker.

Thanks,
Neha
On Oct 4, 2013 2:45 AM, "Aniket Bhatnagar" 
wrote:

> Because Kafka was detecting localhost.domain as hostname, I commented out
> the line "127.0.0.1 localhost.localdomain localhost" and added
> "127.0.0.1 ip-10-0-1-20.localdomain" in etc/hosts. When I restart Kafka
> (issues kill -15 pid), writes to existing topics are failing and I see
> several of the following errors:
>
> [2013-10-04 03:43:23,565] WARN [KafkaApi-1] Produce request with
> correlation id 12868451 from client  on partition
> [consumertracking.gnomes.4002,5] failed due to Partition
> [consumertracking.gnomes.4002,5] doesn't exist on 1
> (kafka.server.KafkaApis)
>
>
> I have checked the kafka data directory and all files are intact. What
> could have caused this issue and how can it be fixed?
>


Re: Is 30 a too high partition number?

2013-10-04 Thread Neha Narkhede
You probably want to think of this in terms of number of partitions on a
single broker, instead of per topic since I/O is the limiting factor in
this case. Another factor to consider is total number of partitions in the
cluster as Zookeeper becomes a limiting factor there. 30 partitions is not
too large provided the total number of partitions doesn't exceed roughly
couple thousand. To give you an example, some of our clusters are 16 nodes
big and some of the topics on those clusters have 30 partitions.

Thanks,
Neha
On Oct 4, 2013 4:15 AM, "Aniket Bhatnagar" 
wrote:

> I am using kafka as a buffer for data streaming in from various sources.
> Since its a time series data, I generate the key to the message by
> combining source ID and minute in the timestamp. This means I can utmost
> have 60 partitions per topic (as each source has its own topic). I have
> set num.partitions to be 30 (60/2) for each topic in broker config. I don't
> have a very good reason to pick 30 as default number of partitions per
> topic but I wanted it to be a high number so that I can achieve high
> parallelism during in-stream processing. I am worried that having a high
> number  like 30 (default configuration had it as 2), it can negatively
> impact kafka performance in terms of message throughput or memory
> consumption. I understand that this can lead to many files per partition
> but I am thinking of dealing with it by having multiple directories on the
> same disk if at all I run into issues.
>
> My question to the community is that am I prematurely attempting to
> optimizing the partition number as right now even a partition number of 5
> seems sufficient and hence will run into unwanted issues? Or is 30 an Ok
> number to use for number of partitions?
>


RE: is it possible to commit offsets on a per stream basis?

2013-10-04 Thread Yu, Libo
This will improve efficiency on the client side greatly. And multiple threads 
don't have to synchronize
before committing offsets. Thanks, Jason.

Regards,

Libo


-Original Message-
From: Jason Rosenberg [mailto:j...@squareup.com] 
Sent: Thursday, October 03, 2013 4:13 PM
To: users@kafka.apache.org
Subject: Re: is it possible to commit offsets on a per stream basis?

I added a comment/suggestion to:
https://issues.apache.org/jira/browse/KAFKA-966

Basically to expose an api for marking an offset for commit, such that the 
auto-commit would only commit offsets up to the last message 'markedForCommit', 
and not the last 'consumed' offset, which may or may not have succeeded.  This 
way, consumer code can just call 'markForCommit()'
after successfully processing each message successfully.

Does that make sense?


On Mon, Sep 9, 2013 at 5:21 PM, Yu, Libo  wrote:

> Thanks, Neha. That number of connections formula is very helpful.
>
> Regards,
>
> Libo
>
>
> -Original Message-
> From: Neha Narkhede [mailto:neha.narkh...@gmail.com]
> Sent: Monday, September 09, 2013 12:17 PM
> To: users@kafka.apache.org
> Subject: Re: is it possible to commit offsets on a per stream basis?
>
> Memory might become an issue if all the connectors are part of the 
> same process. But this is easily solvable by distributing the 
> connectors over several machines.
> Number of connections would be (# of connectors) * (# of brokers) and 
> will proportionately increase with the # of connectors.
>
> Thanks,
> Neha
>
>
> On Mon, Sep 9, 2013 at 9:08 AM, Yu, Libo  wrote:
>
> > If one connector is used for a single stream, when there are many 
> > topics/streams, will that cause any performance issue, e.g. too many 
> > connections or too much memory or big latency?
> >
> > Regards,
> >
> > Libo
> >
> >
> > -Original Message-
> > From: Neha Narkhede [mailto:neha.narkh...@gmail.com]
> > Sent: Sunday, September 08, 2013 12:46 PM
> > To: users@kafka.apache.org
> > Subject: Re: is it possible to commit offsets on a per stream basis?
> >
> > That should be fine too.
> >
> >
> >
> >
> > On Sat, Sep 7, 2013 at 8:33 PM, Jason Rosenberg 
> wrote:
> >
> > > To be clear, it looks like I forgot to add to my question, that I 
> > > am asking about creating multiple connectors, within the same 
> > > consumer process (as I realize I can obviously have multiple 
> > > connectors running on multiple hosts, etc.).  But I'm guessing 
> > > that should be
> fine too?
> > >
> > > Jason
> > >
> > >
> > >
> > >
> > > On Sat, Sep 7, 2013 at 3:09 PM, Neha Narkhede 
> > >  > > >wrote:
> > >
> > > > >> Can I create multiple connectors, and have each use the same 
> > > > >> Regex
> > > > for the TopicFilter?  Will each connector share the set of 
> > > > available topics?  Is this safe to do?
> > > >
> > > > >> Or is it necessary to create mutually non-intersecting 
> > > > >> regex's for
> > > each
> > > > connector?
> > > >
> > > > As long as each of those consumer connectors share the same 
> > > > group id,
> > > Kafka
> > > > consumer rebalancing should automatically re-distribute the 
> > > > topic/partitions amongst the consumer connectors/streams evenly.
> > > >
> > > > Thanks,
> > > > Neha
> > > >
> > > >
> > > > On Mon, Sep 2, 2013 at 1:35 PM, Jason Rosenberg 
> > > > 
> > > wrote:
> > > >
> > > > > Will this work if we are using a TopicFilter, that can map to 
> > > > > multiple topics.  Can I create multiple connectors, and have 
> > > > > each use the same
> > > > Regex
> > > > > for the TopicFilter?  Will each connector share the set of 
> > > > > available topics?  Is this safe to do?
> > > > >
> > > > > Or is it necessary to create mutually non-intersecting regex's 
> > > > > for each connector?
> > > > >
> > > > > It seems I have a similar issue.  I have been using auto 
> > > > > commit mode,
> > > but
> > > > > it doesn't guarantee that all messages committed have been 
> > > > > successfully processed (seems a change to the connector itself 
> > > > > might expose a way to
> > > > use
> > > > > auto offset commit, and have it never commit a message until 
> > > > > it is processed).  But that would be a change to the 
> > > > > ZookeeperConsumerConnectorEssentially, it would be great 
> > > > > if after processing each message, we could mark the message as 
> > > > > 'processed', and
> > > > thus
> > > > > use that status as the max offset to commit when the auto 
> > > > > offset commit background thread wakes up each time.
> > > > >
> > > > > Jason
> > > > >
> > > > >
> > > > > On Thu, Aug 29, 2013 at 11:58 AM, Yu, Libo 
> wrote:
> > > > >
> > > > > > Thanks, Neha. That is a great answer.
> > > > > >
> > > > > > Regards,
> > > > > >
> > > > > > Libo
> > > > > >
> > > > > >
> > > > > > -Original Message-
> > > > > > From: Neha Narkhede [mailto:neha.narkh...@gmail.com]
> > > > > > Sent: Thursday, August 29, 2013 1:55 PM
> > > > > > To: users@kafka.apache.org
> > > > > > Subject: Re: is it possible to commit o

producer API thread safety

2013-10-04 Thread Yu, Libo
Hi team,

Is it possible to use a single producer with more than one threads? I am not 
sure
If its send() is thread safe.

Regards,

Libo



Re: producer API thread safety

2013-10-04 Thread Guozhang Wang
The send() is thread safe, so the short answer would be yes.


On Fri, Oct 4, 2013 at 9:14 AM, Yu, Libo  wrote:

> Hi team,
>
> Is it possible to use a single producer with more than one threads? I am
> not sure
> If its send() is thread safe.
>
> Regards,
>
> Libo
>
>


-- 
-- Guozhang


Re: Kafka stops accepting messages for existing topics after a hostname change and restart

2013-10-04 Thread Aniket Bhatnagar
I did restart broker very quickly. I saw similar errors for about 5 mins
and that's when I decided to shutdown all kafka brokers and start them one
by one. That seems to have enabled writes in kafka instantly after brokers
were back up.

How do I do a controlled shutdown?  The kafka shutdown script doesnt seem
to work for me and I assumed the kill -15 command should trigger a shutdown
in kafka (I saw messages like kafka shutdown in the logs too)
On 4 Oct 2013 18:55, "Neha Narkhede"  wrote:

> When a broker starts up, it receives a LeaderAndIsrRequest from the
> controller broker telling the broker which partitions it should host and
> either lead or follow those partitions. If clients send requests to the
> broker before it has received this request, it throws this error you see.
> Did you restart the broker very quickly?
>
> You can mitigate this issue by using controlled shutdown to stop a broker.
>
> Thanks,
> Neha
> On Oct 4, 2013 2:45 AM, "Aniket Bhatnagar" 
> wrote:
>
> > Because Kafka was detecting localhost.domain as hostname, I commented out
> > the line "127.0.0.1 localhost.localdomain localhost" and
> added
> > "127.0.0.1 ip-10-0-1-20.localdomain" in etc/hosts. When I restart Kafka
> > (issues kill -15 pid), writes to existing topics are failing and I see
> > several of the following errors:
> >
> > [2013-10-04 03:43:23,565] WARN [KafkaApi-1] Produce request with
> > correlation id 12868451 from client  on partition
> > [consumertracking.gnomes.4002,5] failed due to Partition
> > [consumertracking.gnomes.4002,5] doesn't exist on 1
> > (kafka.server.KafkaApis)
> >
> >
> > I have checked the kafka data directory and all files are intact. What
> > could have caused this issue and how can it be fixed?
> >
>


Re: Kafka stops accepting messages for existing topics after a hostname change and restart

2013-10-04 Thread Neha Narkhede
Controlled shutdown is described here -
https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-1.ControlledShutdown


On Fri, Oct 4, 2013 at 10:18 AM, Aniket Bhatnagar <
aniket.bhatna...@gmail.com> wrote:

> I did restart broker very quickly. I saw similar errors for about 5 mins
> and that's when I decided to shutdown all kafka brokers and start them one
> by one. That seems to have enabled writes in kafka instantly after brokers
> were back up.
>
> How do I do a controlled shutdown?  The kafka shutdown script doesnt seem
> to work for me and I assumed the kill -15 command should trigger a shutdown
> in kafka (I saw messages like kafka shutdown in the logs too)
> On 4 Oct 2013 18:55, "Neha Narkhede"  wrote:
>
> > When a broker starts up, it receives a LeaderAndIsrRequest from the
> > controller broker telling the broker which partitions it should host and
> > either lead or follow those partitions. If clients send requests to the
> > broker before it has received this request, it throws this error you see.
> > Did you restart the broker very quickly?
> >
> > You can mitigate this issue by using controlled shutdown to stop a
> broker.
> >
> > Thanks,
> > Neha
> > On Oct 4, 2013 2:45 AM, "Aniket Bhatnagar" 
> > wrote:
> >
> > > Because Kafka was detecting localhost.domain as hostname, I commented
> out
> > > the line "127.0.0.1 localhost.localdomain localhost" and
> > added
> > > "127.0.0.1 ip-10-0-1-20.localdomain" in etc/hosts. When I restart Kafka
> > > (issues kill -15 pid), writes to existing topics are failing and I see
> > > several of the following errors:
> > >
> > > [2013-10-04 03:43:23,565] WARN [KafkaApi-1] Produce request with
> > > correlation id 12868451 from client  on partition
> > > [consumertracking.gnomes.4002,5] failed due to Partition
> > > [consumertracking.gnomes.4002,5] doesn't exist on 1
> > > (kafka.server.KafkaApis)
> > >
> > >
> > > I have checked the kafka data directory and all files are intact. What
> > > could have caused this issue and how can it be fixed?
> > >
> >
>


testing issue with reliable sending

2013-10-04 Thread Jason Rosenberg
All,

I'm having an issue with an integration test I've setup.  This is using
0.8-beta1.

The test is to verify that no messages are dropped (or the sender gets an
exception thrown back if failure), while doing a rolling restart of a
cluster of 2 brokers.

The producer is configured to use 'request.required.acks' = '1'.

The servers are set up to run locally on localhost, on different ports, and
different data dirs.  The producer connects with a metadata brokerlist
like:  "localhost:2024,localhost:1025" (so no vip).   The servers are set
up with a default replication factor of 2.  The servers have controlled
shutdown enabled, as well.

The producer code looks like this:
...
Producer producer = getProducer();
try {
  KeyedMessage msg = new KeyedMessage(topic,
message);
  producer.send(msg);
  return true;
} catch (RuntimeException e) {
  logger.warn("Error sending data to kafka", e);
  return false;
}
...

The test sends groups of messages at each stage of the test (e.g. servers
up, first server going down, first server coming up, second server going
down, etc.).  Then a consumer connects and consumes all the messages, to
make sure they all arrived ok.

It seems intermittently, a single message gets dropped, right after one of
the servers starts going down.  It doesn't happen always, seems to happen 1
out of every 20 test runs or so.  Here's some sample output.  I see the
exception inside the producer code, but I don't see the producer.send
method ever having an exception thrown back out to the caller (the log line
"Error sending data to kafka" is never triggered).

What's interesting, is that it looks like the exceptions are happening on
message 3, but when the consumer subsequently consumes back all the
messages in the broker cluster, it seems message 2 (and not message 3) is
missing:

...
...
7136 [Thread-1] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Sending message: test-stage: 3, message: 98
7150 [Thread-1] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Sending message: test-stage: 3, message: 99
7163 [Thread-2] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Shutting down server2
7163 [Thread-1] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Sending message: test-stage: 4, message: 0
7164 [Thread-20] INFO com.squareup.kafka.ng.server.KafkaServer  - Shutting
down KafkaServer
7176 [Thread-1] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Sending message: test-stage: 4, message: 1
7189 [Thread-1] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Sending message: test-stage: 4, message: 2
7203 [Thread-1] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Sending message: test-stage: 4, message: 3
7394 [kafka-request-handler-5] WARN state.change.logger  - Broker
1946108683 received update metadata request with correlation id 7 from an
old controller 178709090 with epoch 2. Latest known controller epoch is 3
7397 [kafka-request-handler-5] ERROR kafka.server.KafkaApis  -
[KafkaApi-1946108683] error when handling request
Name:UpdateMetadataRequest;Version:0;Controller:178709090;ControllerEpoch:2;CorrelationId:7;ClientId:id_178709090-host_null-port_1026;PartitionState:[test-topic,0]
->
(LeaderAndIsrInfo:(Leader:1946108683,ISR:1946108683,LeaderEpoch:2,ControllerEpoch:2),ReplicationFactor:2),AllReplicas:178709090,1946108683);AliveBrokers:id:1946108683,host:192.168.1.105,port:1025,id:178709090,host:192.168.1.105,port:1026
kafka.common.ControllerMovedException: Broker 1946108683 received update
metadata request with correlation id 7 from an old controller 178709090
with epoch 2. Latest known controller epoch is 3
at kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:114)
at kafka.server.KafkaApis.handle(KafkaApis.scala:72)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
at java.lang.Thread.run(Thread.java:724)
8039 [Controller-178709090-to-broker-178709090-send-thread] WARN
kafka.controller.RequestSendThread  -
[Controller-178709090-to-broker-178709090-send-thread], Controller
178709090 fails to send a request to broker 178709090
java.nio.channels.AsynchronousCloseException
at
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205)
at
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:387)
at kafka.utils.Utils$.read(Utils.scala:394)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:127)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
80

RE: producer API thread safety

2013-10-04 Thread Yu, Libo
Great. Thanks.

Regards,

Libo


-Original Message-
From: Guozhang Wang [mailto:wangg...@gmail.com] 
Sent: Friday, October 04, 2013 12:27 PM
To: users@kafka.apache.org
Subject: Re: producer API thread safety

The send() is thread safe, so the short answer would be yes.


On Fri, Oct 4, 2013 at 9:14 AM, Yu, Libo  wrote:

> Hi team,
>
> Is it possible to use a single producer with more than one threads? I 
> am not sure If its send() is thread safe.
>
> Regards,
>
> Libo
>
>


--
-- Guozhang


Re: testing issue with reliable sending

2013-10-04 Thread Neha Narkhede
The occasional single message loss could happen since
required.request.acks=1 and the leader is shut down before the follower
gets a chance to copy the message. Can you try your test with num acks set
to -1 ?

Thanks,
Neha
On Oct 4, 2013 1:21 PM, "Jason Rosenberg"  wrote:

> All,
>
> I'm having an issue with an integration test I've setup.  This is using
> 0.8-beta1.
>
> The test is to verify that no messages are dropped (or the sender gets an
> exception thrown back if failure), while doing a rolling restart of a
> cluster of 2 brokers.
>
> The producer is configured to use 'request.required.acks' = '1'.
>
> The servers are set up to run locally on localhost, on different ports, and
> different data dirs.  The producer connects with a metadata brokerlist
> like:  "localhost:2024,localhost:1025" (so no vip).   The servers are set
> up with a default replication factor of 2.  The servers have controlled
> shutdown enabled, as well.
>
> The producer code looks like this:
> ...
> Producer producer = getProducer();
> try {
>   KeyedMessage msg = new KeyedMessage(topic,
> message);
>   producer.send(msg);
>   return true;
> } catch (RuntimeException e) {
>   logger.warn("Error sending data to kafka", e);
>   return false;
> }
> ...
>
> The test sends groups of messages at each stage of the test (e.g. servers
> up, first server going down, first server coming up, second server going
> down, etc.).  Then a consumer connects and consumes all the messages, to
> make sure they all arrived ok.
>
> It seems intermittently, a single message gets dropped, right after one of
> the servers starts going down.  It doesn't happen always, seems to happen 1
> out of every 20 test runs or so.  Here's some sample output.  I see the
> exception inside the producer code, but I don't see the producer.send
> method ever having an exception thrown back out to the caller (the log line
> "Error sending data to kafka" is never triggered).
>
> What's interesting, is that it looks like the exceptions are happening on
> message 3, but when the consumer subsequently consumes back all the
> messages in the broker cluster, it seems message 2 (and not message 3) is
> missing:
>
> ...
> ...
> 7136 [Thread-1] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Sending message: test-stage: 3, message: 98
> 7150 [Thread-1] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Sending message: test-stage: 3, message: 99
> 7163 [Thread-2] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Shutting down server2
> 7163 [Thread-1] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Sending message: test-stage: 4, message: 0
> 7164 [Thread-20] INFO com.squareup.kafka.ng.server.KafkaServer  - Shutting
> down KafkaServer
> 7176 [Thread-1] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Sending message: test-stage: 4, message: 1
> 7189 [Thread-1] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Sending message: test-stage: 4, message: 2
> 7203 [Thread-1] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Sending message: test-stage: 4, message: 3
> 7394 [kafka-request-handler-5] WARN state.change.logger  - Broker
> 1946108683 received update metadata request with correlation id 7 from an
> old controller 178709090 with epoch 2. Latest known controller epoch is 3
> 7397 [kafka-request-handler-5] ERROR kafka.server.KafkaApis  -
> [KafkaApi-1946108683] error when handling request
>
> Name:UpdateMetadataRequest;Version:0;Controller:178709090;ControllerEpoch:2;CorrelationId:7;ClientId:id_178709090-host_null-port_1026;PartitionState:[test-topic,0]
> ->
>
> (LeaderAndIsrInfo:(Leader:1946108683,ISR:1946108683,LeaderEpoch:2,ControllerEpoch:2),ReplicationFactor:2),AllReplicas:178709090,1946108683);AliveBrokers:id:1946108683,host:192.168.1.105,port:1025,id:178709090,host:192.168.1.105,port:1026
> kafka.common.ControllerMovedException: Broker 1946108683 received update
> metadata request with correlation id 7 from an old controller 178709090
> with epoch 2. Latest known controller epoch is 3
> at kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:114)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:72)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> at java.lang.Thread.run(Thread.java:724)
> 8039 [Controller-178709090-to-broker-178709090-send-thread] WARN
> kafka.controller.RequestSendThread  -
> [Controller-178709090-to-broker-178709090-send-thread], Controller
> 178709090 fails to send a request to broker 178709090
> java.nio.channels.AsynchronousCloseException
> at
>
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205)
> at
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:387)
> at kafka.utils.Utils$.read(Utils.scala:394

Hi, Can anyone tell me why I cannot produce any message to remote kafka server from my local machine

2013-10-04 Thread Jiang Jacky
It is very weird, I have a kafka cluster in EC2, There is no any problem to
produce message from one of node by same producer. But when I move the
producer to my local machine at home, then it gives me the below error:
Failed to send messages after 3 tries.

Can anyone tell me how do I fix this issue? I have opened all ports of my
machine at my home, and the security group is also opened for kafka server
and zookeeper in EC2. Everything is fine, but I just cannot send any
message to kafka server.
Please help me.
Thanks Everyone!

Jacky


Re: testing issue with reliable sending

2013-10-04 Thread Jason Rosenberg
Neha,

I'm not sure I understand.  I would have thought that if the leader
acknowledges receipt of a message, and is then shut down cleanly (with
controlled shutdown enabled), that it would be able to reliably persist any
in memory buffered messages (and replicate them), before shutting down.
 Shouldn't this be part of the contract?  It should be able to make sure
this happens before shutting down, no?

I would understand a message dropped if it were a hard shutdown.

I'm not sure then how to implement reliable delivery semantics, while
allowing a rolling restart of the broker cluster (or even to tolerate a
single node failure, where one node might be down for awhile and need to be
replaced or have a disk repaired).  In this case, if we need to use
required.request.acks=-1, that will pretty much prevent any successful
message producing while any of the brokers for a partition is unavailable.
 So, I don't think that's an option.  (Not to mention the performance
degradation).

Is there not a way to make this work more reliably with leader only
acknowledgment, and clean/controlled shutdown?

My test does succeed, as expected, with acks = -1, at least for the 100 or
so iterations I've let it run so far.  It does on occasion send duplicates
(but that's ok with me).

Jason


On Fri, Oct 4, 2013 at 6:38 PM, Neha Narkhede wrote:

> The occasional single message loss could happen since
> required.request.acks=1 and the leader is shut down before the follower
> gets a chance to copy the message. Can you try your test with num acks set
> to -1 ?
>
> Thanks,
> Neha
> On Oct 4, 2013 1:21 PM, "Jason Rosenberg"  wrote:
>
> > All,
> >
> > I'm having an issue with an integration test I've setup.  This is using
> > 0.8-beta1.
> >
> > The test is to verify that no messages are dropped (or the sender gets an
> > exception thrown back if failure), while doing a rolling restart of a
> > cluster of 2 brokers.
> >
> > The producer is configured to use 'request.required.acks' = '1'.
> >
> > The servers are set up to run locally on localhost, on different ports,
> and
> > different data dirs.  The producer connects with a metadata brokerlist
> > like:  "localhost:2024,localhost:1025" (so no vip).   The servers are set
> > up with a default replication factor of 2.  The servers have controlled
> > shutdown enabled, as well.
> >
> > The producer code looks like this:
> > ...
> > Producer producer = getProducer();
> > try {
> >   KeyedMessage msg = new KeyedMessage(topic,
> > message);
> >   producer.send(msg);
> >   return true;
> > } catch (RuntimeException e) {
> >   logger.warn("Error sending data to kafka", e);
> >   return false;
> > }
> > ...
> >
> > The test sends groups of messages at each stage of the test (e.g. servers
> > up, first server going down, first server coming up, second server going
> > down, etc.).  Then a consumer connects and consumes all the messages, to
> > make sure they all arrived ok.
> >
> > It seems intermittently, a single message gets dropped, right after one
> of
> > the servers starts going down.  It doesn't happen always, seems to
> happen 1
> > out of every 20 test runs or so.  Here's some sample output.  I see the
> > exception inside the producer code, but I don't see the producer.send
> > method ever having an exception thrown back out to the caller (the log
> line
> > "Error sending data to kafka" is never triggered).
> >
> > What's interesting, is that it looks like the exceptions are happening on
> > message 3, but when the consumer subsequently consumes back all the
> > messages in the broker cluster, it seems message 2 (and not message 3) is
> > missing:
> >
> > ...
> > ...
> > 7136 [Thread-1] INFO
> > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > Sending message: test-stage: 3, message: 98
> > 7150 [Thread-1] INFO
> > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > Sending message: test-stage: 3, message: 99
> > 7163 [Thread-2] INFO
> > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > Shutting down server2
> > 7163 [Thread-1] INFO
> > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > Sending message: test-stage: 4, message: 0
> > 7164 [Thread-20] INFO com.squareup.kafka.ng.server.KafkaServer  -
> Shutting
> > down KafkaServer
> > 7176 [Thread-1] INFO
> > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > Sending message: test-stage: 4, message: 1
> > 7189 [Thread-1] INFO
> > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > Sending message: test-stage: 4, message: 2
> > 7203 [Thread-1] INFO
> > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > Sending message: test-stage: 4, message: 3
> > 7394 [kafka-request-handler-5] WARN state.change.logger  - Broker
> > 1946108683 received update metadata request with correlation id 7 from an
> > old controller 178709090 with epoch 2.