I actually restarted my application with the consumer config I mentioned at
https://issues.apache.org/jira/browse/KAFKA-3159 and I can't get it to use
high CPU any more :( Not quite sure about how to proceed. I'll try to shut
down all producers and let the logs age out to see if the problem happens
under those conditions.

On Thu, Feb 4, 2016 at 8:40 PM, Rajiv Kurian <ra...@signalfx.com> wrote:

> Hey Jason,
>
> Yes I checked for error codes. There were none. The message was perfectly
> legal as parsed by my hand written parser. I also verified the size of the
> response which was exactly the size of a response with an empty message set
> per partition.
>
> The topic has 128 partitions and has a retention of 10 minutes and a
> replication factor of 3. The 128 partitions are divided amongst 3 brokers
> but I managed to replicate the problem of premature responses even running
> my own code in a debugger connected to a locally running kafka instance.
>
> I haven't made any changes to the topic configuration while running these
> tests. All the changes I have made are to the settings of my fetch request
> i.e. min_bytes_per_fetch, max_wait_ms and max_bytes_per_partition. I
> haven't exactly noted all the changes I made but I think I can try to get
> my original configuration and see if that reproduces the problem both for
> the consumer I wrote myself and the stock 0.9 consumer.
>
> I definitely saw empty responses being returned really quickly when
> running my own client locally (under a debugger) and so it's just a theory
> that that might have been the problem being all those EOFExceptions.
>
> Rajiv
>
> On Thu, Feb 4, 2016 at 6:42 PM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
>> Hey Rajiv,
>>
>> Just to be clear, when you received the empty fetch response, did you
>> check
>> the error codes? It would help to also include some more information (such
>> as broker and topic settings). If you can come up with a way to reproduce
>> it, that will help immensely.
>>
>> Also, would you mind updating KAFKA-3159 with your findings about the high
>> CPU issue? If the problem went away after a configuration change, does it
>> come back when those changes are reverted?
>>
>> Thanks,
>> Jason
>>
>> On Thu, Feb 4, 2016 at 5:27 PM, Rajiv Kurian <ra...@signalfx.com> wrote:
>>
>> > Indeed this seems to be the case. I am now running the client mentioned
>> in
>> > https://issues.apache.org/jira/browse/KAFKA-3159  and it is no longer
>> > taking up high CPU. The high number of EOF exceptions are also gone. It
>> is
>> > performing very well now. I can't understand if the improvement is
>> because
>> > of my config  changes (min_bytes, max_bytes_per_partition, max wait
>> time)
>> > etc or because of a bug in the 0.9 broker. I have definitely under a
>> > debugger seen a problem where I was getting back empty messages from the
>> > broker running locally. It might be worth creating a bug for this.
>> >
>> > Thanks,
>> > Rajiv
>> >
>> > On Thu, Feb 4, 2016 at 4:56 PM, Rajiv Kurian <ra...@signalfx.com>
>> wrote:
>> >
>> > > And just like that it stopped happening even though I didn't change
>> any
>> > of
>> > > my code. I had filed https://issues.apache.org/jira/browse/KAFKA-3159
>> > > where the stock 0.9 kafka consumer was using very high CPU and seeing
>> a
>> > lot
>> > > of EOFExceptions on the same topic and partition. I wonder if it was
>> > > hitting the same problem (lots of empty messages) even though we asked
>> > the
>> > > broker to park the request till enough bytes came through.
>> > >
>> > > On Thu, Feb 4, 2016 at 3:21 PM, Rajiv Kurian <ra...@signalfx.com>
>> wrote:
>> > >
>> > >> I am writing a Kafka consumer client using the document at
>> > >>
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
>> > >>
>> > >> One place where I am having problems is the fetch request itself. I
>> am
>> > >> able to send fetch requests and can get fetch responses that I can
>> parse
>> > >> properly, but it seems like the broker is not respecting my max wait
>> > time
>> > >> and min fetch bytes parameters.
>> > >>
>> > >> To test this part I am sending in a fetch request for 128 partitions
>> of
>> > a
>> > >> single topic  that hasn't seen any messages for a while and is
>> currently
>> > >> empty. All 128 partitions are on the same broker (running 0.9). I
>> would
>> > >> expect the broker to NOT send me any replies till my max_wait_time_ms
>> > >> elapses but it is sending me a reply immediately. This reply is empty
>> > (as
>> > >> expected) since the partitions have no data and I can parse the data
>> > just
>> > >> fine but I don't understand why the broker is sending me a reply
>> > >> immediately instead of waiting long enough.
>> > >>
>> > >> Here is how I make a request:
>> > >>
>> > >> private ByteBuffer createFetchRequestBuffer(int numPartitions) {
>> > >>     // This does the math to get the size required.
>> > >>     final int sizeRequired =
>> > >> numBytesRequiredForFetchRequest(numPartitions);
>> > >>     final ByteBuffer buffer =
>> ByteBuffer.allocateDirect(sizeRequired);
>> > >>     // Size field
>> > >>     int sizeField = sizeRequired - 4;
>> > >>     buffer.putInt(sizeField);
>> > >>     // API key.
>> > >>     buffer.putShort(FECTH_REQUEST_API_KEY);  // 1.
>> > >>     // API version.
>> > >>     buffer.putShort((short) 0);
>> > >>     // Correlation id.
>> > >>     buffer.putInt(-3);  // Just a random correlation id.
>> > >>     // Client id.
>> > >>     buffer.putShort(numClientStringBytes); // The length of the
>> client
>> > >> string as a short.
>> > >>     buffer.put(clientStringBytes); // The client string bytes.
>> > >>     // Replica id.
>> > >>     buffer.putInt(-1);  // As per the recommendation.
>> > >>     // Max wait time in ms.
>> > >>     buffer.putInt(30 * 1000); // Should be 30 seconds.
>> > >>     // Min bytes field size.
>> > >>     buffer.putInt(1000000);  // A big number.
>> > >>     // Num topics.
>> > >>     buffer.putInt(1); // A single topic.
>> > >>     // Topic string.
>> > >>     buffer.putShort(numTopicBytes); // The length of the topic
>> string as
>> > >> a short.
>> > >>     buffer.put(topicBytes); // The topic string bytes.
>> > >>     // Num partitions field.
>> > >>     buffer.putInt(numPartitions); // 128 like I said.
>> > >>     for (int i = 0; i < numPartitions; i++) {
>> > >>       final int partitionId = i;
>> > >>       // partition number.
>> > >>       buffer.putInt(partitionId);
>> > >>       // offset.
>> > >>       buffer.putLong(partitionToOffset[partitionId]); // I have an
>> array
>> > >> of longs to get this from.
>> > >>       // maxBytesPerPartition.
>> > >>       buffer.putInt(maxBytesPerPartition);
>> > >>     }
>> > >>
>> > >>     buffer.flip();
>> > >>
>> > >>     return buffer;
>> > >> }
>> > >>
>> > >> I get a response pretty much immediately when I write this request to
>> > the
>> > >> broker. The response parses just fine but has no actual non zero size
>> > >> message sets.
>> > >>
>> > >> Thanks in advance.
>> > >> Rajiv
>> > >>
>> > >>
>> > >
>> >
>>
>
>

Reply via email to