Kafka broker and producer max message default config

2015-05-12 Thread Rendy Bambang Junior
Hi,

I see configuration for broker "max.message.bytes" 1,000,000
and configuration for producer "max.request.size" 1,048,576

Why is default config for broker is less than producer? If that is the case
then there will be message sent by producer which is bigger than what
broker could receive.

Could anyone please clarify my understanding?

Rendy


New Producer Async - Metadata Fetch Timeout

2015-05-12 Thread Rendy Bambang Junior
Hi, sorry if my understanding is incorrect.

I am integrating kafka producer with application, when i try to shutdown
all kafka broker (preparing for prod env) I notice that 'send' method is
blocking.

Is new producer fetch metadata not async?

Rendy


questions

2015-05-12 Thread ram kumar
hi,

How can i send data from a log file to kafka server?
Can i use kafka with flume.
or is there any other way to do it

Thanks


Re: questions

2015-05-12 Thread Gwen Shapira
Kafka with Flume is one way (Just use Flume's SpoolingDirectory source with
Kafka Channel or Kafka Sink).

Also, Kafka itself has a Log4J appender as part of the project, this will
work if the log is written with log4j.

Gwen

On Tue, May 12, 2015 at 12:52 PM, ram kumar  wrote:

> hi,
>
> How can i send data from a log file to kafka server?
> Can i use kafka with flume.
> or is there any other way to do it
>
> Thanks
>


Changing log.retention setting

2015-05-12 Thread Mathias Söderberg
Good day,

I'm wondering a bit about the effects of changing the log.retention
setting, be it in the configuration or on-the-fly via ZooKeeper, will it
apply to already existing log segments, or only for new ones?

For example, we have a default of 12 hours, if I change the value to 24
hours in the configuration file and then bounce all of the brokers, will
the brokers still remove "old" log segments (ones created before the
restart) after 12 hours, or will the new setting of 24 hours apply
immediately?

Best regards,
Mathias


Kafka 0.9 release and future 0.8.2.1 client compatibility

2015-05-12 Thread Enrico Olivelli - Diennea
Hi,
I would like to start using Kafka, can I start from 0.9 or is it better to 
develop on 0.8.2.1 and than migrate to 0.9 ?
My plans are to be in production by september

Will a 0.8.2.1 client (producer/consumer) be able to talk to 0.9 brokers ?

Is there any public maven artifact for Kafka 0.9 ?

thanks

Enrico Olivelli
Software Development Manager @Diennea
Tel.: (+39) 0546 066100 - Int. 925
Viale G.Marconi 30/14 - 48018 Faenza (RA)

MagNews - E-mail Marketing Solutions
http://www.magnews.it
Diennea - Digital Marketing Solutions
http://www.diennea.com





Iscriviti alla nostra newsletter per rimanere aggiornato su digital ed email 
marketing! http://www.magnews.it/newsletter/


Re: New Producer Async - Metadata Fetch Timeout

2015-05-12 Thread Jiangjie Qin
That¹s right. Send() will first try to get metadata of a topic, that is a
blocking operation.

On 5/12/15, 2:48 AM, "Rendy Bambang Junior" 
wrote:

>Hi, sorry if my understanding is incorrect.
>
>I am integrating kafka producer with application, when i try to shutdown
>all kafka broker (preparing for prod env) I notice that 'send' method is
>blocking.
>
>Is new producer fetch metadata not async?
>
>Rendy



Reassigning partitions off of a permanently failed broker

2015-05-12 Thread Stephen Donnelly
Hi there,

I'm working on a script that fails kafka v8.2 brokers from the cluster,
mostly intended for dealing with long term downtimes such as hardware
failures.  The script generates a new partition assignment, moving any
replica on the failed host to other available hosts.

The problem I'm having is that the reassignment script wont complete (the
--verify option reports "In Progress") until the failed broker comes back
online so its data can be deleted.  However, I'm trying to handle the case
where the failed machine never comes back online.

Is there a recommended way for removing permanently failed brokers from the
partition assignment?  Do I need to start up a new server that reuses the
old broker id, so it can pretend to be the old machine and perform a no-op
for the deletion?

Thanks for your help,

Steve Donnelly


Re: New Producer Async - Metadata Fetch Timeout

2015-05-12 Thread Mayuresh Gharat
The way it works I suppose is that, the producer will do fetchMetadata, if
the last fetched metadata is stale (the refresh interval has expired) or if
it is not able to send data to a particular broker in its current metadata
(This might happen in some cases like if the leader moves).

It cannot produce without having the right metadata.

Thanks,

Mayuresh

On Tue, May 12, 2015 at 10:09 AM, Jiangjie Qin 
wrote:

> That¹s right. Send() will first try to get metadata of a topic, that is a
> blocking operation.
>
> On 5/12/15, 2:48 AM, "Rendy Bambang Junior" 
> wrote:
>
> >Hi, sorry if my understanding is incorrect.
> >
> >I am integrating kafka producer with application, when i try to shutdown
> >all kafka broker (preparing for prod env) I notice that 'send' method is
> >blocking.
> >
> >Is new producer fetch metadata not async?
> >
> >Rendy
>
>


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Could this be happening?

2015-05-12 Thread Scott Chapman
We are basically using kafka as a transport mechanism for multi-line log
files.

So, for this we are using single partition topics (with a replica for good
measure) writing to a multi-broker cluster.

Our producer basically reads a file line-by-line (as it is being written
to) and publishes each line as a message to the topic. We are also writing
as quickly as we can (not waiting for ACK).

What I am seeing is occasionally the messages in the topic appear to be
slightly out of order when compared to the source file they were based on.

I am wonder if this might happen when the producer switches brokers because
we are not waiting for the ACK before continuing to write.

Does this make any sense??

Thanks in advance!

-Scott


Can't fetch partition metadata from KafkaServer

2015-05-12 Thread Laran Evans
I’m using this version of kafka:


org.apache.kafka
kafka_2.9.2
0.8.1.1


I’m using kafka.server.KafkaServer in memory for some integration tests. I 
start KafkaServer and use AdminUtils.createTopic(ZkClient, String, Integer, 
Integer, Properties) to create a Topic.

I then use the following code to find the leader (adapted from the sample code 
on the SimpleConsumerExample on the Kafka wiki 
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example).

consumer = new SimpleConsumer(seed.getHostName(), seed.getPort(), getTimeout(), 
getBufferSize(), "leaderLookup");
List topics = Collections.singletonList(getTopic());
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

List metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
if (part.partitionId() == getPartition()) {
returnMetaData = part;
break loop;
}
}
}

The problem I have is that item.partitionsMetadata() is always null. So my code 
interprets the situation as that the one broker doesn’t know anything about the 
topic.

Is this a bug in KafkaServer?
I’ve seen a couple bugs that seem potentially related:
https://issues.apache.org/jira/browse/KAFKA-1367

I also ran this search in the Kafka JIRA which returned a lot of results.
https://issues.apache.org/jira/browse/KAFKA-1998?jql=project%20%3D%20KAFKA%20AND%20text%20~%20%22partition%20metadata%22

Can anyone shed some light on this for me?

After starting KafkaServer how can I tell whether or not the broker is handling 
a given partition for a given topic?

Thanks.


Re: New Producer Async - Metadata Fetch Timeout

2015-05-12 Thread Mohit Gupta
I could not follow the reasoning behind blocking the send method if the
metadata is not up-to-date. Though, I see that it as per design, it
requires the metadata to batch the message into appropriate topicPartition
queue. Also, if the metadata could not be updated in the specified
interval, it throws an exception and the message is not queued to be
retried once the brokers are up.

Should it not be that messages are buffered in another queue ( up-to a
limit ) if the brokers are down and retried later?
Is it not a general use case to require producer to be asynchronous in all
the scenarios?


On Tue, May 12, 2015 at 10:54 PM, Mayuresh Gharat <
gharatmayures...@gmail.com> wrote:

> The way it works I suppose is that, the producer will do fetchMetadata, if
> the last fetched metadata is stale (the refresh interval has expired) or if
> it is not able to send data to a particular broker in its current metadata
> (This might happen in some cases like if the leader moves).
>
> It cannot produce without having the right metadata.
>
> Thanks,
>
> Mayuresh
>
> On Tue, May 12, 2015 at 10:09 AM, Jiangjie Qin 
> wrote:
>
> > That¹s right. Send() will first try to get metadata of a topic, that is a
> > blocking operation.
> >
> > On 5/12/15, 2:48 AM, "Rendy Bambang Junior" 
> > wrote:
> >
> > >Hi, sorry if my understanding is incorrect.
> > >
> > >I am integrating kafka producer with application, when i try to shutdown
> > >all kafka broker (preparing for prod env) I notice that 'send' method is
> > >blocking.
> > >
> > >Is new producer fetch metadata not async?
> > >
> > >Rendy
> >
> >
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>



-- 
Best Regards,

Mohit Gupta


Re: Could this be happening?

2015-05-12 Thread Magnus Edenhill
Hi Scott,

what producer client are you using?

Reordering is possible in async producers in the case of temporary broker
failures
and the combination of request.required.acks != 0 and retries > 0.

Consider the case where a producer has 20 messages in-flight to the broker,
out of those
messages # 1-10 fails due to some temporary failure (?) on the broker side,
but message # 11-20 are accepted.
When the producer receives error results from the broker for message # 1-10
it will try to resend
these 10 failed messages, that are now accepted, causing them to end up
after message #20 in the log - thus reordered.

This failure scenario should be rather rare though.


Regards,
Magnus

2015-05-12 20:18 GMT+02:00 Scott Chapman :

> We are basically using kafka as a transport mechanism for multi-line log
> files.
>
> So, for this we are using single partition topics (with a replica for good
> measure) writing to a multi-broker cluster.
>
> Our producer basically reads a file line-by-line (as it is being written
> to) and publishes each line as a message to the topic. We are also writing
> as quickly as we can (not waiting for ACK).
>
> What I am seeing is occasionally the messages in the topic appear to be
> slightly out of order when compared to the source file they were based on.
>
> I am wonder if this might happen when the producer switches brokers because
> we are not waiting for the ACK before continuing to write.
>
> Does this make any sense??
>
> Thanks in advance!
>
> -Scott
>


Re: Could this be happening?

2015-05-12 Thread Scott Chapman
We are using the Java producer API (0.8.2.1 if I am not mistaken). We are
using producer type of sync though.

On Tue, May 12, 2015 at 3:50 PM Magnus Edenhill  wrote:

> Hi Scott,
>
> what producer client are you using?
>
> Reordering is possible in async producers in the case of temporary broker
> failures
> and the combination of request.required.acks != 0 and retries > 0.
>
> Consider the case where a producer has 20 messages in-flight to the broker,
> out of those
> messages # 1-10 fails due to some temporary failure (?) on the broker side,
> but message # 11-20 are accepted.
> When the producer receives error results from the broker for message # 1-10
> it will try to resend
> these 10 failed messages, that are now accepted, causing them to end up
> after message #20 in the log - thus reordered.
>
> This failure scenario should be rather rare though.
>
>
> Regards,
> Magnus
>
> 2015-05-12 20:18 GMT+02:00 Scott Chapman :
>
> > We are basically using kafka as a transport mechanism for multi-line log
> > files.
> >
> > So, for this we are using single partition topics (with a replica for
> good
> > measure) writing to a multi-broker cluster.
> >
> > Our producer basically reads a file line-by-line (as it is being written
> > to) and publishes each line as a message to the topic. We are also
> writing
> > as quickly as we can (not waiting for ACK).
> >
> > What I am seeing is occasionally the messages in the topic appear to be
> > slightly out of order when compared to the source file they were based
> on.
> >
> > I am wonder if this might happen when the producer switches brokers
> because
> > we are not waiting for the ACK before continuing to write.
> >
> > Does this make any sense??
> >
> > Thanks in advance!
> >
> > -Scott
> >
>


Re: New Producer Async - Metadata Fetch Timeout

2015-05-12 Thread Magnus Edenhill
I completely agree with Mohit, an application should not have to know or
care about
producer implementation internals.
Given a message and its delivery constraints (produce retry count and
timeout) the producer
should hide any temporal failures until the message is succesfully
delivered, a permanent
error is encountered or the constraints are hit.
This should also include internal start up sequencing, such as metadata
retrieval.



2015-05-12 21:22 GMT+02:00 Mohit Gupta :

> I could not follow the reasoning behind blocking the send method if the
> metadata is not up-to-date. Though, I see that it as per design, it
> requires the metadata to batch the message into appropriate topicPartition
> queue. Also, if the metadata could not be updated in the specified
> interval, it throws an exception and the message is not queued to be
> retried once the brokers are up.
>
> Should it not be that messages are buffered in another queue ( up-to a
> limit ) if the brokers are down and retried later?
> Is it not a general use case to require producer to be asynchronous in all
> the scenarios?
>
>
> On Tue, May 12, 2015 at 10:54 PM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
>
> > The way it works I suppose is that, the producer will do fetchMetadata,
> if
> > the last fetched metadata is stale (the refresh interval has expired) or
> if
> > it is not able to send data to a particular broker in its current
> metadata
> > (This might happen in some cases like if the leader moves).
> >
> > It cannot produce without having the right metadata.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Tue, May 12, 2015 at 10:09 AM, Jiangjie Qin  >
> > wrote:
> >
> > > That¹s right. Send() will first try to get metadata of a topic, that
> is a
> > > blocking operation.
> > >
> > > On 5/12/15, 2:48 AM, "Rendy Bambang Junior" 
> > > wrote:
> > >
> > > >Hi, sorry if my understanding is incorrect.
> > > >
> > > >I am integrating kafka producer with application, when i try to
> shutdown
> > > >all kafka broker (preparing for prod env) I notice that 'send' method
> is
> > > >blocking.
> > > >
> > > >Is new producer fetch metadata not async?
> > > >
> > > >Rendy
> > >
> > >
> >
> >
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> >
>
>
>
> --
> Best Regards,
>
> Mohit Gupta
>


Re: Kafka log compression change in 0.8.2.1?

2015-05-12 Thread Jun Rao
Andrew,

The recompression logic didn't change in 0.8.2.1. The broker still takes
all messages in a single request, assigns offsets and recompresses them
into a single compressed message.

Are you using mirror maker to copy data from the 0.8.1 cluster to the 0.8.2
cluster? If so, this may have to do with the batching in the producer in
mirror maker. Did you enable the new java producer in mirror maker?

Thanks,

Jun


On Mon, May 11, 2015 at 12:53 PM, Olson,Andrew  wrote:

> After a recent 0.8.2.1 upgrade we noticed a significant increase in used
> filesystem space for our Kafka log data. We have another Kafka cluster
> still on 0.8.1.1 whose Kafka data is being copied over to the upgraded
> cluster, and it is clear that the disk consumption is higher on 0.8.2.1 for
> the same message data. The log retention config for the two clusters is the
> same also.
>
> We ran some tests to figure out what was happening, and it appears that in
> 0.8.2.1 the Kafka brokers re-compress each message individually (we’re
> using Snappy), while in 0.8.1.1 they applied the compression across an
> entire batch of messages written to the log. For producers sending large
> batches of small similar messages, the difference can be quite substantial
> (in our case, it looks like a little over 2x). Is this a bug, or the
> expected new behavior?
>
> thanks,
> Andrew
>
> CONFIDENTIALITY NOTICE This message and any included attachments are from
> Cerner Corporation and are intended only for the addressee. The information
> contained in this message is confidential and may constitute inside or
> non-public information under international, federal, or state securities
> laws. Unauthorized forwarding, printing, copying, distribution, or use of
> such information is strictly prohibited and may be unlawful. If you are not
> the addressee, please promptly delete this message and notify the sender of
> the delivery error by e-mail or you may call Cerner's corporate offices in
> Kansas City, Missouri, U.S.A at (+1) (816)221-1024.
>


Re: Kafka log compression change in 0.8.2.1?

2015-05-12 Thread Olson,Andrew
Hi Jun,

I figured it out this morning and opened
https://issues.apache.org/jira/browse/KAFKA-2189 --
it turned out to be a bug in versions 1.1.1.2 through 1.1.1.6 of
snappy-java that has recently
been fixed (I was very happy to see their new unit test named
"batchingOfWritesShouldNotAffectCompressedDataSize"). We will be patching
1.1.1.7 out to our
clusters as soon as we can.

Regarding the mirror maker question, we wrote our own custom replication
code and are not
using the mirror maker to copy the data between clusters. We¹re still
using the old java
producer, and confirmed the issue was present with both the 0.8.1.1 and
0.8.2.1 old producer
client.

thanks,
Andrew

On 5/12/15, 3:08 PM, "Jun Rao"  wrote:

>Andrew,
>
>The recompression logic didn't change in 0.8.2.1. The broker still takes
>all messages in a single request, assigns offsets and recompresses them
>into a single compressed message.
>
>Are you using mirror maker to copy data from the 0.8.1 cluster to the
>0.8.2
>cluster? If so, this may have to do with the batching in the producer in
>mirror maker. Did you enable the new java producer in mirror maker?
>
>Thanks,
>
>Jun
>
>
>On Mon, May 11, 2015 at 12:53 PM, Olson,Andrew  wrote:
>
>> After a recent 0.8.2.1 upgrade we noticed a significant increase in used
>> filesystem space for our Kafka log data. We have another Kafka cluster
>> still on 0.8.1.1 whose Kafka data is being copied over to the upgraded
>> cluster, and it is clear that the disk consumption is higher on 0.8.2.1
>>for
>> the same message data. The log retention config for the two clusters is
>>the
>> same also.
>>
>> We ran some tests to figure out what was happening, and it appears that
>>in
>> 0.8.2.1 the Kafka brokers re-compress each message individually (we¹re
>> using Snappy), while in 0.8.1.1 they applied the compression across an
>> entire batch of messages written to the log. For producers sending large
>> batches of small similar messages, the difference can be quite
>>substantial
>> (in our case, it looks like a little over 2x). Is this a bug, or the
>> expected new behavior?
>>
>> thanks,
>> Andrew
>>
>> CONFIDENTIALITY NOTICE This message and any included attachments are
>>from
>> Cerner Corporation and are intended only for the addressee. The
>>information
>> contained in this message is confidential and may constitute inside or
>> non-public information under international, federal, or state securities
>> laws. Unauthorized forwarding, printing, copying, distribution, or use
>>of
>> such information is strictly prohibited and may be unlawful. If you are
>>not
>> the addressee, please promptly delete this message and notify the
>>sender of
>> the delivery error by e-mail or you may call Cerner's corporate offices
>>in
>> Kansas City, Missouri, U.S.A at (+1) (816)221-1024.
>>



Re: Kafka broker and producer max message default config

2015-05-12 Thread Ewen Cheslack-Postava
The max.request.size effectively caps the largest size message the producer
will send, but the actual purpose is, as the name implies, to limit the
size of a request, which could potentially include many messages. This
keeps the producer from sending very large requests to the broker. The
limitation on message size is just a side effect.


On Tue, May 12, 2015 at 12:33 AM, Rendy Bambang Junior <
rendy.b.jun...@gmail.com> wrote:

> Hi,
>
> I see configuration for broker "max.message.bytes" 1,000,000
> and configuration for producer "max.request.size" 1,048,576
>
> Why is default config for broker is less than producer? If that is the case
> then there will be message sent by producer which is bigger than what
> broker could receive.
>
> Could anyone please clarify my understanding?
>
> Rendy
>



-- 
Thanks,
Ewen


Re: Kafka log compression change in 0.8.2.1?

2015-05-12 Thread Jun Rao
Hi, Andrew,

Thanks for finding this out. I marked KAFKA-2189 as a blocker for 0.8.3.
Could you share your experience on snappy 1.1.1.7 in the jira once you have
tried it out? If the result looks good, we can upgrade the snappy version
in trunk.

Jun

On Tue, May 12, 2015 at 1:23 PM, Olson,Andrew  wrote:

> Hi Jun,
>
> I figured it out this morning and opened
> https://issues.apache.org/jira/browse/KAFKA-2189 --
> it turned out to be a bug in versions 1.1.1.2 through 1.1.1.6 of
> snappy-java that has recently
> been fixed (I was very happy to see their new unit test named
> "batchingOfWritesShouldNotAffectCompressedDataSize"). We will be patching
> 1.1.1.7 out to our
> clusters as soon as we can.
>
> Regarding the mirror maker question, we wrote our own custom replication
> code and are not
> using the mirror maker to copy the data between clusters. We¹re still
> using the old java
> producer, and confirmed the issue was present with both the 0.8.1.1 and
> 0.8.2.1 old producer
> client.
>
> thanks,
> Andrew
>
> On 5/12/15, 3:08 PM, "Jun Rao"  wrote:
>
> >Andrew,
> >
> >The recompression logic didn't change in 0.8.2.1. The broker still takes
> >all messages in a single request, assigns offsets and recompresses them
> >into a single compressed message.
> >
> >Are you using mirror maker to copy data from the 0.8.1 cluster to the
> >0.8.2
> >cluster? If so, this may have to do with the batching in the producer in
> >mirror maker. Did you enable the new java producer in mirror maker?
> >
> >Thanks,
> >
> >Jun
> >
> >
> >On Mon, May 11, 2015 at 12:53 PM, Olson,Andrew 
> wrote:
> >
> >> After a recent 0.8.2.1 upgrade we noticed a significant increase in used
> >> filesystem space for our Kafka log data. We have another Kafka cluster
> >> still on 0.8.1.1 whose Kafka data is being copied over to the upgraded
> >> cluster, and it is clear that the disk consumption is higher on 0.8.2.1
> >>for
> >> the same message data. The log retention config for the two clusters is
> >>the
> >> same also.
> >>
> >> We ran some tests to figure out what was happening, and it appears that
> >>in
> >> 0.8.2.1 the Kafka brokers re-compress each message individually (we¹re
> >> using Snappy), while in 0.8.1.1 they applied the compression across an
> >> entire batch of messages written to the log. For producers sending large
> >> batches of small similar messages, the difference can be quite
> >>substantial
> >> (in our case, it looks like a little over 2x). Is this a bug, or the
> >> expected new behavior?
> >>
> >> thanks,
> >> Andrew
> >>
> >> CONFIDENTIALITY NOTICE This message and any included attachments are
> >>from
> >> Cerner Corporation and are intended only for the addressee. The
> >>information
> >> contained in this message is confidential and may constitute inside or
> >> non-public information under international, federal, or state securities
> >> laws. Unauthorized forwarding, printing, copying, distribution, or use
> >>of
> >> such information is strictly prohibited and may be unlawful. If you are
> >>not
> >> the addressee, please promptly delete this message and notify the
> >>sender of
> >> the delivery error by e-mail or you may call Cerner's corporate offices
> >>in
> >> Kansas City, Missouri, U.S.A at (+1) (816)221-1024.
> >>
>
>


Using 0.8.1.1 kafka producer/consumer with 0.8.2.1 broker

2015-05-12 Thread Virendra Pratap Singh
I am in the process of testing and migrating our prod kafka from 0.8.1.1 to 
0.8.2.1.
Wanted to do a quick check with the community if anyone has observed any issue 
with writing/reading data to 0.8.2.1 kafka broker(s), using 0.8.1.1 producer 
and consumer.
Any gotchas to watch for or any concerns?

Regards,virendra


Re: New Producer Async - Metadata Fetch Timeout

2015-05-12 Thread Jiangjie Qin
Send() will only block if the metadata is *not available* for the topic.
It won’t block if metadata there is stale. The metadata refresh is async
to send(). However, if you send the message to a topic for the first time,
send() will trigger a metadata refresh and block until it has metadata for
that topic.

Jiangjie (Becket) Qin

On 5/12/15, 12:58 PM, "Magnus Edenhill"  wrote:

>I completely agree with Mohit, an application should not have to know or
>care about
>producer implementation internals.
>Given a message and its delivery constraints (produce retry count and
>timeout) the producer
>should hide any temporal failures until the message is succesfully
>delivered, a permanent
>error is encountered or the constraints are hit.
>This should also include internal start up sequencing, such as metadata
>retrieval.
>
>
>
>2015-05-12 21:22 GMT+02:00 Mohit Gupta :
>
>> I could not follow the reasoning behind blocking the send method if the
>> metadata is not up-to-date. Though, I see that it as per design, it
>> requires the metadata to batch the message into appropriate
>>topicPartition
>> queue. Also, if the metadata could not be updated in the specified
>> interval, it throws an exception and the message is not queued to be
>> retried once the brokers are up.
>>
>> Should it not be that messages are buffered in another queue ( up-to a
>> limit ) if the brokers are down and retried later?
>> Is it not a general use case to require producer to be asynchronous in
>>all
>> the scenarios?
>>
>>
>> On Tue, May 12, 2015 at 10:54 PM, Mayuresh Gharat <
>> gharatmayures...@gmail.com> wrote:
>>
>> > The way it works I suppose is that, the producer will do
>>fetchMetadata,
>> if
>> > the last fetched metadata is stale (the refresh interval has expired)
>>or
>> if
>> > it is not able to send data to a particular broker in its current
>> metadata
>> > (This might happen in some cases like if the leader moves).
>> >
>> > It cannot produce without having the right metadata.
>> >
>> > Thanks,
>> >
>> > Mayuresh
>> >
>> > On Tue, May 12, 2015 at 10:09 AM, Jiangjie Qin
>>> >
>> > wrote:
>> >
>> > > That¹s right. Send() will first try to get metadata of a topic, that
>> is a
>> > > blocking operation.
>> > >
>> > > On 5/12/15, 2:48 AM, "Rendy Bambang Junior"
>>
>> > > wrote:
>> > >
>> > > >Hi, sorry if my understanding is incorrect.
>> > > >
>> > > >I am integrating kafka producer with application, when i try to
>> shutdown
>> > > >all kafka broker (preparing for prod env) I notice that 'send'
>>method
>> is
>> > > >blocking.
>> > > >
>> > > >Is new producer fetch metadata not async?
>> > > >
>> > > >Rendy
>> > >
>> > >
>> >
>> >
>> > --
>> > -Regards,
>> > Mayuresh R. Gharat
>> > (862) 250-7125
>> >
>>
>>
>>
>> --
>> Best Regards,
>>
>> Mohit Gupta
>>



Re: New Producer Async - Metadata Fetch Timeout

2015-05-12 Thread Rendy Bambang Junior
Thank you for the clarification.

I think I agree with Mohit. Sometime blocking on logging is not acceptable
by nature of application who uses kafka.

Yes it is not blocking when metadata is still available. But application
will be blocked once metada is expired.

It might be handled by application, by implementing async call when do
send() and manage buffer and async timeout internally, but it makes async
feature in kafka producer has less meaning.

Sorry if my understanding is incorrect.

Rendy
On May 13, 2015 6:59 AM, "Jiangjie Qin"  wrote:

> Send() will only block if the metadata is *not available* for the topic.
> It won’t block if metadata there is stale. The metadata refresh is async
> to send(). However, if you send the message to a topic for the first time,
> send() will trigger a metadata refresh and block until it has metadata for
> that topic.
>
> Jiangjie (Becket) Qin
>
> On 5/12/15, 12:58 PM, "Magnus Edenhill"  wrote:
>
> >I completely agree with Mohit, an application should not have to know or
> >care about
> >producer implementation internals.
> >Given a message and its delivery constraints (produce retry count and
> >timeout) the producer
> >should hide any temporal failures until the message is succesfully
> >delivered, a permanent
> >error is encountered or the constraints are hit.
> >This should also include internal start up sequencing, such as metadata
> >retrieval.
> >
> >
> >
> >2015-05-12 21:22 GMT+02:00 Mohit Gupta :
> >
> >> I could not follow the reasoning behind blocking the send method if the
> >> metadata is not up-to-date. Though, I see that it as per design, it
> >> requires the metadata to batch the message into appropriate
> >>topicPartition
> >> queue. Also, if the metadata could not be updated in the specified
> >> interval, it throws an exception and the message is not queued to be
> >> retried once the brokers are up.
> >>
> >> Should it not be that messages are buffered in another queue ( up-to a
> >> limit ) if the brokers are down and retried later?
> >> Is it not a general use case to require producer to be asynchronous in
> >>all
> >> the scenarios?
> >>
> >>
> >> On Tue, May 12, 2015 at 10:54 PM, Mayuresh Gharat <
> >> gharatmayures...@gmail.com> wrote:
> >>
> >> > The way it works I suppose is that, the producer will do
> >>fetchMetadata,
> >> if
> >> > the last fetched metadata is stale (the refresh interval has expired)
> >>or
> >> if
> >> > it is not able to send data to a particular broker in its current
> >> metadata
> >> > (This might happen in some cases like if the leader moves).
> >> >
> >> > It cannot produce without having the right metadata.
> >> >
> >> > Thanks,
> >> >
> >> > Mayuresh
> >> >
> >> > On Tue, May 12, 2015 at 10:09 AM, Jiangjie Qin
> >> >> >
> >> > wrote:
> >> >
> >> > > That¹s right. Send() will first try to get metadata of a topic, that
> >> is a
> >> > > blocking operation.
> >> > >
> >> > > On 5/12/15, 2:48 AM, "Rendy Bambang Junior"
> >>
> >> > > wrote:
> >> > >
> >> > > >Hi, sorry if my understanding is incorrect.
> >> > > >
> >> > > >I am integrating kafka producer with application, when i try to
> >> shutdown
> >> > > >all kafka broker (preparing for prod env) I notice that 'send'
> >>method
> >> is
> >> > > >blocking.
> >> > > >
> >> > > >Is new producer fetch metadata not async?
> >> > > >
> >> > > >Rendy
> >> > >
> >> > >
> >> >
> >> >
> >> > --
> >> > -Regards,
> >> > Mayuresh R. Gharat
> >> > (862) 250-7125
> >> >
> >>
> >>
> >>
> >> --
> >> Best Regards,
> >>
> >> Mohit Gupta
> >>
>
>


Compression and batching

2015-05-12 Thread Jamie X
Hi,

I'm wondering when you call kafka.javaapi.Producer.send() with a list of
messages, and also have compression on (snappy in this case), how does it
decide how many messages to put together as one?

The reason I'm asking is that even though my messages are only 70kb
uncompressed, the broker complains that I'm hitting the 1mb message limit
such as:


kafka.common.MessageSizeTooLargeException: Message size is 1035608 bytes
which exceeds the maximum configured message size of 112.
at
kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:378)
at
kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:361)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
at kafka.log.Log.analyzeAndValidateMessageSet(Log.scala:361)
at kafka.log.Log.append(Log.scala:257)
at
kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379)
at
kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:365)
at kafka.utils.Utils$.inLock(Utils.scala:535)
at kafka.utils.Utils$.inReadLock(Utils.scala:541)
at
kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365)
at
kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:291)
at
kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:282)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)

Thanks,
Jamie


Re: Kafka broker and producer max message default config

2015-05-12 Thread Rendy Bambang Junior
Thanks, I get the difference now. This is assuming request to be sent
contains at least >1 messages. Isn't it?

Rendy
On May 13, 2015 3:55 AM, "Ewen Cheslack-Postava"  wrote:

> The max.request.size effectively caps the largest size message the producer
> will send, but the actual purpose is, as the name implies, to limit the
> size of a request, which could potentially include many messages. This
> keeps the producer from sending very large requests to the broker. The
> limitation on message size is just a side effect.
>
>
> On Tue, May 12, 2015 at 12:33 AM, Rendy Bambang Junior <
> rendy.b.jun...@gmail.com> wrote:
>
> > Hi,
> >
> > I see configuration for broker "max.message.bytes" 1,000,000
> > and configuration for producer "max.request.size" 1,048,576
> >
> > Why is default config for broker is less than producer? If that is the
> case
> > then there will be message sent by producer which is bigger than what
> > broker could receive.
> >
> > Could anyone please clarify my understanding?
> >
> > Rendy
> >
>
>
>
> --
> Thanks,
> Ewen
>


Kafka 0.8.2.1 - Listing partitions owned by consumers

2015-05-12 Thread Bharath Srinivasan
Hi,

For monitorting purposes, is there a way to find the partitions for a topic
that are assigned to consumers in a group? We are using high level consumer
and the offsets are stored in kafka.

Tried searching for methods in ZKUtils, but could not find anything that
gives this information. Any pointers is appreciated.

Thanks.


RE: Kafka 0.8.2.1 - Listing partitions owned by consumers

2015-05-12 Thread Aditya Auradkar
Perhaps you could try the ConsumerOffsetChecker. The "Owner" field might be 
what you want.. 

Aditya


From: Bharath Srinivasan [bharath...@gmail.com]
Sent: Tuesday, May 12, 2015 7:29 PM
To: users@kafka.apache.org
Subject: Kafka 0.8.2.1 - Listing partitions owned by consumers

Hi,

For monitorting purposes, is there a way to find the partitions for a topic
that are assigned to consumers in a group? We are using high level consumer
and the offsets are stored in kafka.

Tried searching for methods in ZKUtils, but could not find anything that
gives this information. Any pointers is appreciated.

Thanks.


Re: Kafka 0.8.2.1 - Listing partitions owned by consumers

2015-05-12 Thread Mayuresh Gharat
Well, there is no separate tool available for importing and exporting
offsets from kafka, which will also provide this functionality. We are
working on it.

You can try the consumerOffsetChecker as Aditya mentioned.

Thanks,

Mayuresh

On Tue, May 12, 2015 at 8:11 PM, Aditya Auradkar <
aaurad...@linkedin.com.invalid> wrote:

> Perhaps you could try the ConsumerOffsetChecker. The "Owner" field might
> be what you want..
>
> Aditya
>
> 
> From: Bharath Srinivasan [bharath...@gmail.com]
> Sent: Tuesday, May 12, 2015 7:29 PM
> To: users@kafka.apache.org
> Subject: Kafka 0.8.2.1 - Listing partitions owned by consumers
>
> Hi,
>
> For monitorting purposes, is there a way to find the partitions for a topic
> that are assigned to consumers in a group? We are using high level consumer
> and the offsets are stored in kafka.
>
> Tried searching for methods in ZKUtils, but could not find anything that
> gives this information. Any pointers is appreciated.
>
> Thanks.
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Compression and batching

2015-05-12 Thread Mayuresh Gharat
Well, the batch size is decided by the value set for the property :

 "batch.size";
 "The producer will attempt to batch records together into fewer requests
whenever multiple records are being sent to the same partition. This helps
performance on both the client and the server. This configuration controls
the  default batch size in bytes. No attempt will be made to batch records
larger than this size. Requests sent to brokers will contain multiple
batches, one for each partition with data available to be sent. A small
batch size will make batching less common and may reduce throughput (a
batch size of zero will disable batching entirely). A very large batch size
may use memory a bit more wastefully as we will always allocate a buffer of
the specified batch size in anticipation of additional records."

Also it may happen that message size may increase due to compression which
is kind of weird.

Thanks,

Mayuresh

On Tue, May 12, 2015 at 4:40 PM, Jamie X  wrote:

> Hi,
>
> I'm wondering when you call kafka.javaapi.Producer.send() with a list of
> messages, and also have compression on (snappy in this case), how does it
> decide how many messages to put together as one?
>
> The reason I'm asking is that even though my messages are only 70kb
> uncompressed, the broker complains that I'm hitting the 1mb message limit
> such as:
>
>
> kafka.common.MessageSizeTooLargeException: Message size is 1035608 bytes
> which exceeds the maximum configured message size of 112.
> at
> kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:378)
> at
> kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:361)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
> at kafka.log.Log.analyzeAndValidateMessageSet(Log.scala:361)
> at kafka.log.Log.append(Log.scala:257)
> at
>
> kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379)
> at
>
> kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:365)
> at kafka.utils.Utils$.inLock(Utils.scala:535)
> at kafka.utils.Utils$.inReadLock(Utils.scala:541)
> at
> kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365)
> at
>
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:291)
> at
>
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:282)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>
> Thanks,
> Jamie
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Kafka 0.8.2.1 - Listing partitions owned by consumers

2015-05-12 Thread Bharath Srinivasan
As we need to do this programmatically, i tried to strip out the relevant
parts from ConsumerOffsetChecker. It did work.

Thanks for the suggestions.


On Tue, May 12, 2015 at 8:58 PM, Mayuresh Gharat  wrote:

> Well, there is no separate tool available for importing and exporting
> offsets from kafka, which will also provide this functionality. We are
> working on it.
>
> You can try the consumerOffsetChecker as Aditya mentioned.
>
> Thanks,
>
> Mayuresh
>
> On Tue, May 12, 2015 at 8:11 PM, Aditya Auradkar <
> aaurad...@linkedin.com.invalid> wrote:
>
> > Perhaps you could try the ConsumerOffsetChecker. The "Owner" field might
> > be what you want..
> >
> > Aditya
> >
> > 
> > From: Bharath Srinivasan [bharath...@gmail.com]
> > Sent: Tuesday, May 12, 2015 7:29 PM
> > To: users@kafka.apache.org
> > Subject: Kafka 0.8.2.1 - Listing partitions owned by consumers
> >
> > Hi,
> >
> > For monitorting purposes, is there a way to find the partitions for a
> topic
> > that are assigned to consumers in a group? We are using high level
> consumer
> > and the offsets are stored in kafka.
> >
> > Tried searching for methods in ZKUtils, but could not find anything that
> > gives this information. Any pointers is appreciated.
> >
> > Thanks.
> >
>
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>


Re: Message corruption with new Java client + snappy + broker restart

2015-05-12 Thread Roger Hoover
Oops.  I originally sent this to the dev list but meant to send it here.

Hi,
>
> When using Samza 0.9.0 which uses the new Java producer client and snappy
> enabled, I see messages getting corrupted on the client side.  It never
> happens with the old producer and it never happens with lz4, gzip, or no
> compression.  It only happens when a broker gets restarted (or maybe just
> shutdown).
>
> The error is not always the same.  I've noticed at least three types of
> errors on the Kafka brokers.
>
> 1) java.io.IOException: failed to read chunk
> at
> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:356)
> http://pastebin.com/NZrrEHxU
> 2) java.lang.OutOfMemoryError: Java heap space
>at
> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:346)
> http://pastebin.com/yuxk1BjY
> 3) java.io.IOException: PARSING_ERROR(2)
>   at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
> http://pastebin.com/yq98Hx49
>
> I've noticed a couple different behaviors from the Samza producer/job
> A) It goes into a long retry loop where this message is logged.  I saw
> this with error #1 above.
>
> 2015-04-29 18:17:31 Sender [WARN] task[Partition 7]
> ssp[kafka,svc.call.w_deploy.c7tH4YaiTQyBEwAAhQzRXw,7] offset[253] Got
> error produce response with correlation id 4878 on topic-partition
> svc.call.w_deploy.T2UDe2PWRYWcVAAAhMOAwA-1, retrying (2147483646 attempts
> left). Error: CORRUPT_MESSAGE
>
> B) The job exits with
> org.apache.kafka.common.errors.UnknownServerException (at least when run as
> ThreadJob).  I saw this with error #3 above.
>
> org.apache.samza.SamzaException: Unable to send message from
> TaskName-Partition 6 to system kafka.
> org.apache.kafka.common.errors.UnknownServerException: The server
> experienced an unexpected error when processing the request
>
> There seem to be two issues here:
>
> 1) When leadership for a topic is transferred to another broker, the Java
> client (I think) has to move the data it was buffering for the original
> leader broker to the buffer for the new leader.  My guess is that the
> corruption is happening at this point.
>
> 2) When a producer has corrupt message, it retries 2.1 billions times in a
> hot loop even though it's not a retriable error.  It probably shouldn't
> retry on such errors.  For retriable errors, it would be much safer to have
> a backoff scheme for retries.
>
> Thanks,
>
> Roger
>


Re: Could this be happening?

2015-05-12 Thread Jiangjie Qin
If you are using new Java producer, reorder could happen if
max.inflight.requests.per.connection is set to > 1 and retries are enabled
- which are both default settings.

Can you set max.in.flight.requests.per.connection to 1 and see if this
solve the issue?

Jiangjie (Becket) Qin

On 5/12/15, 12:57 PM, "Scott Chapman"  wrote:

>We are using the Java producer API (0.8.2.1 if I am not mistaken). We are
>using producer type of sync though.
>
>On Tue, May 12, 2015 at 3:50 PM Magnus Edenhill 
>wrote:
>
>> Hi Scott,
>>
>> what producer client are you using?
>>
>> Reordering is possible in async producers in the case of temporary
>>broker
>> failures
>> and the combination of request.required.acks != 0 and retries > 0.
>>
>> Consider the case where a producer has 20 messages in-flight to the
>>broker,
>> out of those
>> messages # 1-10 fails due to some temporary failure (?) on the broker
>>side,
>> but message # 11-20 are accepted.
>> When the producer receives error results from the broker for message #
>>1-10
>> it will try to resend
>> these 10 failed messages, that are now accepted, causing them to end up
>> after message #20 in the log - thus reordered.
>>
>> This failure scenario should be rather rare though.
>>
>>
>> Regards,
>> Magnus
>>
>> 2015-05-12 20:18 GMT+02:00 Scott Chapman :
>>
>> > We are basically using kafka as a transport mechanism for multi-line
>>log
>> > files.
>> >
>> > So, for this we are using single partition topics (with a replica for
>> good
>> > measure) writing to a multi-broker cluster.
>> >
>> > Our producer basically reads a file line-by-line (as it is being
>>written
>> > to) and publishes each line as a message to the topic. We are also
>> writing
>> > as quickly as we can (not waiting for ACK).
>> >
>> > What I am seeing is occasionally the messages in the topic appear to
>>be
>> > slightly out of order when compared to the source file they were based
>> on.
>> >
>> > I am wonder if this might happen when the producer switches brokers
>> because
>> > we are not waiting for the ACK before continuing to write.
>> >
>> > Does this make any sense??
>> >
>> > Thanks in advance!
>> >
>> > -Scott
>> >
>>



Re: New Producer Async - Metadata Fetch Timeout

2015-05-12 Thread Jiangjie Qin

Application will not block on each metadata refresh or metadata is
expired. 
Application will only be blocked when
1. It sends the first message to a topic (only for that single message), or
2. The topic has been deleted from broker thus refreshed metadata loses
the topic info (which is pretty rare).

So I think the async here might mean a little bit different. It means when
you send first message to a topic, you wait till you know the topic exist,
after that point it is async.
It is very low chance that your application will block on send. If it is
then something probably really went wrong and needs immediate attention.

Thanks.

Jiangjie (Becket) Qin

On 5/12/15, 5:08 PM, "Rendy Bambang Junior" 
wrote:

>Thank you for the clarification.
>
>I think I agree with Mohit. Sometime blocking on logging is not acceptable
>by nature of application who uses kafka.
>
>Yes it is not blocking when metadata is still available. But application
>will be blocked once metada is expired.
>
>It might be handled by application, by implementing async call when do
>send() and manage buffer and async timeout internally, but it makes async
>feature in kafka producer has less meaning.
>
>Sorry if my understanding is incorrect.
>
>Rendy
>On May 13, 2015 6:59 AM, "Jiangjie Qin"  wrote:
>
>> Send() will only block if the metadata is *not available* for the topic.
>> It won’t block if metadata there is stale. The metadata refresh is async
>> to send(). However, if you send the message to a topic for the first
>>time,
>> send() will trigger a metadata refresh and block until it has metadata
>>for
>> that topic.
>>
>> Jiangjie (Becket) Qin
>>
>> On 5/12/15, 12:58 PM, "Magnus Edenhill"  wrote:
>>
>> >I completely agree with Mohit, an application should not have to know
>>or
>> >care about
>> >producer implementation internals.
>> >Given a message and its delivery constraints (produce retry count and
>> >timeout) the producer
>> >should hide any temporal failures until the message is succesfully
>> >delivered, a permanent
>> >error is encountered or the constraints are hit.
>> >This should also include internal start up sequencing, such as metadata
>> >retrieval.
>> >
>> >
>> >
>> >2015-05-12 21:22 GMT+02:00 Mohit Gupta :
>> >
>> >> I could not follow the reasoning behind blocking the send method if
>>the
>> >> metadata is not up-to-date. Though, I see that it as per design, it
>> >> requires the metadata to batch the message into appropriate
>> >>topicPartition
>> >> queue. Also, if the metadata could not be updated in the specified
>> >> interval, it throws an exception and the message is not queued to be
>> >> retried once the brokers are up.
>> >>
>> >> Should it not be that messages are buffered in another queue ( up-to
>>a
>> >> limit ) if the brokers are down and retried later?
>> >> Is it not a general use case to require producer to be asynchronous
>>in
>> >>all
>> >> the scenarios?
>> >>
>> >>
>> >> On Tue, May 12, 2015 at 10:54 PM, Mayuresh Gharat <
>> >> gharatmayures...@gmail.com> wrote:
>> >>
>> >> > The way it works I suppose is that, the producer will do
>> >>fetchMetadata,
>> >> if
>> >> > the last fetched metadata is stale (the refresh interval has
>>expired)
>> >>or
>> >> if
>> >> > it is not able to send data to a particular broker in its current
>> >> metadata
>> >> > (This might happen in some cases like if the leader moves).
>> >> >
>> >> > It cannot produce without having the right metadata.
>> >> >
>> >> > Thanks,
>> >> >
>> >> > Mayuresh
>> >> >
>> >> > On Tue, May 12, 2015 at 10:09 AM, Jiangjie Qin
>> >>> >> >
>> >> > wrote:
>> >> >
>> >> > > That¹s right. Send() will first try to get metadata of a topic,
>>that
>> >> is a
>> >> > > blocking operation.
>> >> > >
>> >> > > On 5/12/15, 2:48 AM, "Rendy Bambang Junior"
>> >>
>> >> > > wrote:
>> >> > >
>> >> > > >Hi, sorry if my understanding is incorrect.
>> >> > > >
>> >> > > >I am integrating kafka producer with application, when i try to
>> >> shutdown
>> >> > > >all kafka broker (preparing for prod env) I notice that 'send'
>> >>method
>> >> is
>> >> > > >blocking.
>> >> > > >
>> >> > > >Is new producer fetch metadata not async?
>> >> > > >
>> >> > > >Rendy
>> >> > >
>> >> > >
>> >> >
>> >> >
>> >> > --
>> >> > -Regards,
>> >> > Mayuresh R. Gharat
>> >> > (862) 250-7125
>> >> >
>> >>
>> >>
>> >>
>> >> --
>> >> Best Regards,
>> >>
>> >> Mohit Gupta
>> >>
>>
>>



Re: Compression and batching

2015-05-12 Thread Jiangjie Qin
Mayuresh, this is about the old producer instead of the new Java producer.

Jamie,
In the old producer, if you use sync mode, the list of message will be
sent as a batch. On the other hand, if you are using async mode, the
messages are just put into the queue and batched with other messages.
Notice that the old producer uses number of messages as batch limitation
instead of number of bytes.

But in your case, it seems you have a single message whose compressed size
is larger than the max message size Kafka broker accepts. Any idea why?

Thanks.

Jiangjie (Becket) Qin


On 5/12/15, 9:11 PM, "Mayuresh Gharat"  wrote:

>Well, the batch size is decided by the value set for the property :
>
> "batch.size";
> "The producer will attempt to batch records together into fewer requests
>whenever multiple records are being sent to the same partition. This helps
>performance on both the client and the server. This configuration controls
>the  default batch size in bytes. No attempt will be made to batch records
>larger than this size. Requests sent to brokers will contain multiple
>batches, one for each partition with data available to be sent. A small
>batch size will make batching less common and may reduce throughput (a
>batch size of zero will disable batching entirely). A very large batch
>size
>may use memory a bit more wastefully as we will always allocate a buffer
>of
>the specified batch size in anticipation of additional records."
>
>Also it may happen that message size may increase due to compression which
>is kind of weird.
>
>Thanks,
>
>Mayuresh
>
>On Tue, May 12, 2015 at 4:40 PM, Jamie X  wrote:
>
>> Hi,
>>
>> I'm wondering when you call kafka.javaapi.Producer.send() with a list of
>> messages, and also have compression on (snappy in this case), how does
>>it
>> decide how many messages to put together as one?
>>
>> The reason I'm asking is that even though my messages are only 70kb
>> uncompressed, the broker complains that I'm hitting the 1mb message
>>limit
>> such as:
>>
>>
>> kafka.common.MessageSizeTooLargeException: Message size is 1035608 bytes
>> which exceeds the maximum configured message size of 112.
>> at
>> 
>>kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:378
>>)
>> at
>> 
>>kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:361
>>)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at 
>>kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
>> at kafka.log.Log.analyzeAndValidateMessageSet(Log.scala:361)
>> at kafka.log.Log.append(Log.scala:257)
>> at
>>
>> 
>>kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition
>>.scala:379)
>> at
>>
>> 
>>kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition
>>.scala:365)
>> at kafka.utils.Utils$.inLock(Utils.scala:535)
>> at kafka.utils.Utils$.inReadLock(Utils.scala:541)
>> at
>> kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365)
>> at
>>
>> 
>>kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:
>>291)
>> at
>>
>> 
>>kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:
>>282)
>> at
>>
>> 
>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>>la:244)
>> at
>>
>> 
>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>>la:244)
>> at
>> 
>>scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:9
>>8)
>> at
>> 
>>scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:9
>>8)
>> at
>> 
>>scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226
>>)
>> at 
>>scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>>
>> Thanks,
>> Jamie
>>
>
>
>
>-- 
>-Regards,
>Mayuresh R. Gharat
>(862) 250-7125



Re: Getting NotLeaderForPartitionException in kafka broker

2015-05-12 Thread tao xiao
Hi,

Any updates on this issue? I keep seeing this issue happening over and over
again

On Thu, May 7, 2015 at 7:28 PM, tao xiao  wrote:

> Hi team,
>
> I have a 12 nodes cluster that has 800 topics and each of which has only 1
> partition. I observed that one of the node keeps generating
> NotLeaderForPartitionException that causes the node to be unresponsive to
> all requests. Below is the exception
>
> [2015-05-07 04:16:01,014] ERROR [ReplicaFetcherThread-1-12], Error for
> partition [topic1,0] to broker 12:class
> kafka.common.NotLeaderForPartitionException
> (kafka.server.ReplicaFetcherThread)
>
> All other nodes in the cluster generate lots of replication error too as
> shown below due to unresponsiveness of above node.
>
> [2015-05-07 04:17:34,917] WARN [Replica Manager on Broker 1]: Fetch
> request with correlation id 3630911 from client ReplicaFetcherThread-0-1 on
> partition [topic1,0] failed due to Leader not local for partition
> [cg22_user.item_attr_info.lcr,0] on broker 1 (kafka.server.ReplicaManager)
>
> Any suggestion why the node runs into the unstable stage and any
> configuration I can set to prevent this?
>
> I use kafka 0.8.2.1
>
> And here is the server.properties
>
>
> broker.id=5
> port=9092
> num.network.threads=3
> num.io.threads=8
> socket.send.buffer.bytes=1048576
> socket.receive.buffer.bytes=1048576
> socket.request.max.bytes=104857600
> log.dirs=/mnt/kafka
> num.partitions=1
> num.recovery.threads.per.data.dir=1
> log.retention.hours=1
> log.segment.bytes=1073741824
> log.retention.check.interval.ms=30
> log.cleaner.enable=false
> zookeeper.connect=ip:2181
> zookeeper.connection.timeout.ms=6000
> unclean.leader.election.enable=false
> delete.topic.enable=true
> default.replication.factor=3
> num.replica.fetchers=3
> delete.topic.enable=true
> kafka.metrics.reporters=report.KafkaMetricsCollector
> straas.hubble.conf.file=/etc/kafka/report.conf
>
>
>
>
> --
> Regards,
> Tao
>



-- 
Regards,
Tao