I've updated Kafka-3159 with my findings.

Thanks,
Rajiv

On Thu, Feb 4, 2016 at 10:25 PM, Rajiv Kurian <ra...@signalfx.com> wrote:

> I think I found out when the problem happens. When a broker that is sent a
> fetch request has no messages for any of the partitions it is being asked
> messages for, it returns immediately instead of waiting out the poll
> period. Both the kafka 0.9 consumer and my own hand written consumer suffer
> the same problem caused by the broker returning immediately. Since my log
> retention time is only 10 minutes as soon as I stop all of my producers I
> only need to wait for 10 minutes for the logs to age out and for this
> problem to happen.
>
> Though it might not be a big problem for most people,it is quite
> conceivable that some users have low frequency topic-partitions that a lot
> of clients listen to with big min_wait_time_ms parameters. Examples being
> some infrequent metadata update topic. My guess is that such use cases
> would actually cause the problem when that low frequency topic-partition is
> isolated to a broker or two. It is especially harrowing because it goes
> against all intuition that a topic with no traffic should cause little to
> no over head.
>
> I'll update KAFKA-3159 with my findings, but it would be great to get
> confirmation that you can make this happen Jason.
>
> Thanks,
> Rajiv
>
> On Thu, Feb 4, 2016 at 8:58 PM, Rajiv Kurian <ra...@signalfx.com> wrote:
>
>> I actually restarted my application with the consumer config I mentioned
>> at https://issues.apache.org/jira/browse/KAFKA-3159 and I can't get it
>> to use high CPU any more :( Not quite sure about how to proceed. I'll try
>> to shut down all producers and let the logs age out to see if the problem
>> happens under those conditions.
>>
>> On Thu, Feb 4, 2016 at 8:40 PM, Rajiv Kurian <ra...@signalfx.com> wrote:
>>
>>> Hey Jason,
>>>
>>> Yes I checked for error codes. There were none. The message was
>>> perfectly legal as parsed by my hand written parser. I also verified the
>>> size of the response which was exactly the size of a response with an empty
>>> message set per partition.
>>>
>>> The topic has 128 partitions and has a retention of 10 minutes and a
>>> replication factor of 3. The 128 partitions are divided amongst 3 brokers
>>> but I managed to replicate the problem of premature responses even running
>>> my own code in a debugger connected to a locally running kafka instance.
>>>
>>> I haven't made any changes to the topic configuration while running
>>> these tests. All the changes I have made are to the settings of my fetch
>>> request i.e. min_bytes_per_fetch, max_wait_ms and max_bytes_per_partition.
>>> I haven't exactly noted all the changes I made but I think I can try to get
>>> my original configuration and see if that reproduces the problem both for
>>> the consumer I wrote myself and the stock 0.9 consumer.
>>>
>>> I definitely saw empty responses being returned really quickly when
>>> running my own client locally (under a debugger) and so it's just a theory
>>> that that might have been the problem being all those EOFExceptions.
>>>
>>> Rajiv
>>>
>>> On Thu, Feb 4, 2016 at 6:42 PM, Jason Gustafson <ja...@confluent.io>
>>> wrote:
>>>
>>>> Hey Rajiv,
>>>>
>>>> Just to be clear, when you received the empty fetch response, did you
>>>> check
>>>> the error codes? It would help to also include some more information
>>>> (such
>>>> as broker and topic settings). If you can come up with a way to
>>>> reproduce
>>>> it, that will help immensely.
>>>>
>>>> Also, would you mind updating KAFKA-3159 with your findings about the
>>>> high
>>>> CPU issue? If the problem went away after a configuration change, does
>>>> it
>>>> come back when those changes are reverted?
>>>>
>>>> Thanks,
>>>> Jason
>>>>
>>>> On Thu, Feb 4, 2016 at 5:27 PM, Rajiv Kurian <ra...@signalfx.com>
>>>> wrote:
>>>>
>>>> > Indeed this seems to be the case. I am now running the client
>>>> mentioned in
>>>> > https://issues.apache.org/jira/browse/KAFKA-3159  and it is no longer
>>>> > taking up high CPU. The high number of EOF exceptions are also gone.
>>>> It is
>>>> > performing very well now. I can't understand if the improvement is
>>>> because
>>>> > of my config  changes (min_bytes, max_bytes_per_partition, max wait
>>>> time)
>>>> > etc or because of a bug in the 0.9 broker. I have definitely under a
>>>> > debugger seen a problem where I was getting back empty messages from
>>>> the
>>>> > broker running locally. It might be worth creating a bug for this.
>>>> >
>>>> > Thanks,
>>>> > Rajiv
>>>> >
>>>> > On Thu, Feb 4, 2016 at 4:56 PM, Rajiv Kurian <ra...@signalfx.com>
>>>> wrote:
>>>> >
>>>> > > And just like that it stopped happening even though I didn't change
>>>> any
>>>> > of
>>>> > > my code. I had filed
>>>> https://issues.apache.org/jira/browse/KAFKA-3159
>>>> > > where the stock 0.9 kafka consumer was using very high CPU and
>>>> seeing a
>>>> > lot
>>>> > > of EOFExceptions on the same topic and partition. I wonder if it was
>>>> > > hitting the same problem (lots of empty messages) even though we
>>>> asked
>>>> > the
>>>> > > broker to park the request till enough bytes came through.
>>>> > >
>>>> > > On Thu, Feb 4, 2016 at 3:21 PM, Rajiv Kurian <ra...@signalfx.com>
>>>> wrote:
>>>> > >
>>>> > >> I am writing a Kafka consumer client using the document at
>>>> > >>
>>>> >
>>>> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
>>>> > >>
>>>> > >> One place where I am having problems is the fetch request itself.
>>>> I am
>>>> > >> able to send fetch requests and can get fetch responses that I can
>>>> parse
>>>> > >> properly, but it seems like the broker is not respecting my max
>>>> wait
>>>> > time
>>>> > >> and min fetch bytes parameters.
>>>> > >>
>>>> > >> To test this part I am sending in a fetch request for 128
>>>> partitions of
>>>> > a
>>>> > >> single topic  that hasn't seen any messages for a while and is
>>>> currently
>>>> > >> empty. All 128 partitions are on the same broker (running 0.9). I
>>>> would
>>>> > >> expect the broker to NOT send me any replies till my
>>>> max_wait_time_ms
>>>> > >> elapses but it is sending me a reply immediately. This reply is
>>>> empty
>>>> > (as
>>>> > >> expected) since the partitions have no data and I can parse the
>>>> data
>>>> > just
>>>> > >> fine but I don't understand why the broker is sending me a reply
>>>> > >> immediately instead of waiting long enough.
>>>> > >>
>>>> > >> Here is how I make a request:
>>>> > >>
>>>> > >> private ByteBuffer createFetchRequestBuffer(int numPartitions) {
>>>> > >>     // This does the math to get the size required.
>>>> > >>     final int sizeRequired =
>>>> > >> numBytesRequiredForFetchRequest(numPartitions);
>>>> > >>     final ByteBuffer buffer =
>>>> ByteBuffer.allocateDirect(sizeRequired);
>>>> > >>     // Size field
>>>> > >>     int sizeField = sizeRequired - 4;
>>>> > >>     buffer.putInt(sizeField);
>>>> > >>     // API key.
>>>> > >>     buffer.putShort(FECTH_REQUEST_API_KEY);  // 1.
>>>> > >>     // API version.
>>>> > >>     buffer.putShort((short) 0);
>>>> > >>     // Correlation id.
>>>> > >>     buffer.putInt(-3);  // Just a random correlation id.
>>>> > >>     // Client id.
>>>> > >>     buffer.putShort(numClientStringBytes); // The length of the
>>>> client
>>>> > >> string as a short.
>>>> > >>     buffer.put(clientStringBytes); // The client string bytes.
>>>> > >>     // Replica id.
>>>> > >>     buffer.putInt(-1);  // As per the recommendation.
>>>> > >>     // Max wait time in ms.
>>>> > >>     buffer.putInt(30 * 1000); // Should be 30 seconds.
>>>> > >>     // Min bytes field size.
>>>> > >>     buffer.putInt(1000000);  // A big number.
>>>> > >>     // Num topics.
>>>> > >>     buffer.putInt(1); // A single topic.
>>>> > >>     // Topic string.
>>>> > >>     buffer.putShort(numTopicBytes); // The length of the topic
>>>> string as
>>>> > >> a short.
>>>> > >>     buffer.put(topicBytes); // The topic string bytes.
>>>> > >>     // Num partitions field.
>>>> > >>     buffer.putInt(numPartitions); // 128 like I said.
>>>> > >>     for (int i = 0; i < numPartitions; i++) {
>>>> > >>       final int partitionId = i;
>>>> > >>       // partition number.
>>>> > >>       buffer.putInt(partitionId);
>>>> > >>       // offset.
>>>> > >>       buffer.putLong(partitionToOffset[partitionId]); // I have an
>>>> array
>>>> > >> of longs to get this from.
>>>> > >>       // maxBytesPerPartition.
>>>> > >>       buffer.putInt(maxBytesPerPartition);
>>>> > >>     }
>>>> > >>
>>>> > >>     buffer.flip();
>>>> > >>
>>>> > >>     return buffer;
>>>> > >> }
>>>> > >>
>>>> > >> I get a response pretty much immediately when I write this request
>>>> to
>>>> > the
>>>> > >> broker. The response parses just fine but has no actual non zero
>>>> size
>>>> > >> message sets.
>>>> > >>
>>>> > >> Thanks in advance.
>>>> > >> Rajiv
>>>> > >>
>>>> > >>
>>>> > >
>>>> >
>>>>
>>>
>>>
>>
>

Reply via email to