Thanks Jason.

On Fri, Feb 5, 2016 at 10:13 AM, Jason Gustafson <ja...@confluent.io> wrote:

> Hey Rajiv,
>
> Thanks for all the updates. I think I've been able to reproduce this. The
> key seems to be waiting for the old log segment to be deleted. I'll
> investigate a bit more and report what I find on the JIRA.
>
> -Jason
>
> On Fri, Feb 5, 2016 at 9:50 AM, Rajiv Kurian <ra...@signalfx.com> wrote:
>
> > I've updated Kafka-3159 with my findings.
> >
> > Thanks,
> > Rajiv
> >
> > On Thu, Feb 4, 2016 at 10:25 PM, Rajiv Kurian <ra...@signalfx.com>
> wrote:
> >
> > > I think I found out when the problem happens. When a broker that is
> sent
> > a
> > > fetch request has no messages for any of the partitions it is being
> asked
> > > messages for, it returns immediately instead of waiting out the poll
> > > period. Both the kafka 0.9 consumer and my own hand written consumer
> > suffer
> > > the same problem caused by the broker returning immediately. Since my
> log
> > > retention time is only 10 minutes as soon as I stop all of my
> producers I
> > > only need to wait for 10 minutes for the logs to age out and for this
> > > problem to happen.
> > >
> > > Though it might not be a big problem for most people,it is quite
> > > conceivable that some users have low frequency topic-partitions that a
> > lot
> > > of clients listen to with big min_wait_time_ms parameters. Examples
> being
> > > some infrequent metadata update topic. My guess is that such use cases
> > > would actually cause the problem when that low frequency
> topic-partition
> > is
> > > isolated to a broker or two. It is especially harrowing because it goes
> > > against all intuition that a topic with no traffic should cause little
> to
> > > no over head.
> > >
> > > I'll update KAFKA-3159 with my findings, but it would be great to get
> > > confirmation that you can make this happen Jason.
> > >
> > > Thanks,
> > > Rajiv
> > >
> > > On Thu, Feb 4, 2016 at 8:58 PM, Rajiv Kurian <ra...@signalfx.com>
> wrote:
> > >
> > >> I actually restarted my application with the consumer config I
> mentioned
> > >> at https://issues.apache.org/jira/browse/KAFKA-3159 and I can't get
> it
> > >> to use high CPU any more :( Not quite sure about how to proceed. I'll
> > try
> > >> to shut down all producers and let the logs age out to see if the
> > problem
> > >> happens under those conditions.
> > >>
> > >> On Thu, Feb 4, 2016 at 8:40 PM, Rajiv Kurian <ra...@signalfx.com>
> > wrote:
> > >>
> > >>> Hey Jason,
> > >>>
> > >>> Yes I checked for error codes. There were none. The message was
> > >>> perfectly legal as parsed by my hand written parser. I also verified
> > the
> > >>> size of the response which was exactly the size of a response with an
> > empty
> > >>> message set per partition.
> > >>>
> > >>> The topic has 128 partitions and has a retention of 10 minutes and a
> > >>> replication factor of 3. The 128 partitions are divided amongst 3
> > brokers
> > >>> but I managed to replicate the problem of premature responses even
> > running
> > >>> my own code in a debugger connected to a locally running kafka
> > instance.
> > >>>
> > >>> I haven't made any changes to the topic configuration while running
> > >>> these tests. All the changes I have made are to the settings of my
> > fetch
> > >>> request i.e. min_bytes_per_fetch, max_wait_ms and
> > max_bytes_per_partition.
> > >>> I haven't exactly noted all the changes I made but I think I can try
> > to get
> > >>> my original configuration and see if that reproduces the problem both
> > for
> > >>> the consumer I wrote myself and the stock 0.9 consumer.
> > >>>
> > >>> I definitely saw empty responses being returned really quickly when
> > >>> running my own client locally (under a debugger) and so it's just a
> > theory
> > >>> that that might have been the problem being all those EOFExceptions.
> > >>>
> > >>> Rajiv
> > >>>
> > >>> On Thu, Feb 4, 2016 at 6:42 PM, Jason Gustafson <ja...@confluent.io>
> > >>> wrote:
> > >>>
> > >>>> Hey Rajiv,
> > >>>>
> > >>>> Just to be clear, when you received the empty fetch response, did
> you
> > >>>> check
> > >>>> the error codes? It would help to also include some more information
> > >>>> (such
> > >>>> as broker and topic settings). If you can come up with a way to
> > >>>> reproduce
> > >>>> it, that will help immensely.
> > >>>>
> > >>>> Also, would you mind updating KAFKA-3159 with your findings about
> the
> > >>>> high
> > >>>> CPU issue? If the problem went away after a configuration change,
> does
> > >>>> it
> > >>>> come back when those changes are reverted?
> > >>>>
> > >>>> Thanks,
> > >>>> Jason
> > >>>>
> > >>>> On Thu, Feb 4, 2016 at 5:27 PM, Rajiv Kurian <ra...@signalfx.com>
> > >>>> wrote:
> > >>>>
> > >>>> > Indeed this seems to be the case. I am now running the client
> > >>>> mentioned in
> > >>>> > https://issues.apache.org/jira/browse/KAFKA-3159  and it is no
> > longer
> > >>>> > taking up high CPU. The high number of EOF exceptions are also
> gone.
> > >>>> It is
> > >>>> > performing very well now. I can't understand if the improvement is
> > >>>> because
> > >>>> > of my config  changes (min_bytes, max_bytes_per_partition, max
> wait
> > >>>> time)
> > >>>> > etc or because of a bug in the 0.9 broker. I have definitely
> under a
> > >>>> > debugger seen a problem where I was getting back empty messages
> from
> > >>>> the
> > >>>> > broker running locally. It might be worth creating a bug for this.
> > >>>> >
> > >>>> > Thanks,
> > >>>> > Rajiv
> > >>>> >
> > >>>> > On Thu, Feb 4, 2016 at 4:56 PM, Rajiv Kurian <ra...@signalfx.com>
> > >>>> wrote:
> > >>>> >
> > >>>> > > And just like that it stopped happening even though I didn't
> > change
> > >>>> any
> > >>>> > of
> > >>>> > > my code. I had filed
> > >>>> https://issues.apache.org/jira/browse/KAFKA-3159
> > >>>> > > where the stock 0.9 kafka consumer was using very high CPU and
> > >>>> seeing a
> > >>>> > lot
> > >>>> > > of EOFExceptions on the same topic and partition. I wonder if it
> > was
> > >>>> > > hitting the same problem (lots of empty messages) even though we
> > >>>> asked
> > >>>> > the
> > >>>> > > broker to park the request till enough bytes came through.
> > >>>> > >
> > >>>> > > On Thu, Feb 4, 2016 at 3:21 PM, Rajiv Kurian <
> ra...@signalfx.com>
> > >>>> wrote:
> > >>>> > >
> > >>>> > >> I am writing a Kafka consumer client using the document at
> > >>>> > >>
> > >>>> >
> > >>>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> > >>>> > >>
> > >>>> > >> One place where I am having problems is the fetch request
> itself.
> > >>>> I am
> > >>>> > >> able to send fetch requests and can get fetch responses that I
> > can
> > >>>> parse
> > >>>> > >> properly, but it seems like the broker is not respecting my max
> > >>>> wait
> > >>>> > time
> > >>>> > >> and min fetch bytes parameters.
> > >>>> > >>
> > >>>> > >> To test this part I am sending in a fetch request for 128
> > >>>> partitions of
> > >>>> > a
> > >>>> > >> single topic  that hasn't seen any messages for a while and is
> > >>>> currently
> > >>>> > >> empty. All 128 partitions are on the same broker (running
> 0.9). I
> > >>>> would
> > >>>> > >> expect the broker to NOT send me any replies till my
> > >>>> max_wait_time_ms
> > >>>> > >> elapses but it is sending me a reply immediately. This reply is
> > >>>> empty
> > >>>> > (as
> > >>>> > >> expected) since the partitions have no data and I can parse the
> > >>>> data
> > >>>> > just
> > >>>> > >> fine but I don't understand why the broker is sending me a
> reply
> > >>>> > >> immediately instead of waiting long enough.
> > >>>> > >>
> > >>>> > >> Here is how I make a request:
> > >>>> > >>
> > >>>> > >> private ByteBuffer createFetchRequestBuffer(int numPartitions)
> {
> > >>>> > >>     // This does the math to get the size required.
> > >>>> > >>     final int sizeRequired =
> > >>>> > >> numBytesRequiredForFetchRequest(numPartitions);
> > >>>> > >>     final ByteBuffer buffer =
> > >>>> ByteBuffer.allocateDirect(sizeRequired);
> > >>>> > >>     // Size field
> > >>>> > >>     int sizeField = sizeRequired - 4;
> > >>>> > >>     buffer.putInt(sizeField);
> > >>>> > >>     // API key.
> > >>>> > >>     buffer.putShort(FECTH_REQUEST_API_KEY);  // 1.
> > >>>> > >>     // API version.
> > >>>> > >>     buffer.putShort((short) 0);
> > >>>> > >>     // Correlation id.
> > >>>> > >>     buffer.putInt(-3);  // Just a random correlation id.
> > >>>> > >>     // Client id.
> > >>>> > >>     buffer.putShort(numClientStringBytes); // The length of the
> > >>>> client
> > >>>> > >> string as a short.
> > >>>> > >>     buffer.put(clientStringBytes); // The client string bytes.
> > >>>> > >>     // Replica id.
> > >>>> > >>     buffer.putInt(-1);  // As per the recommendation.
> > >>>> > >>     // Max wait time in ms.
> > >>>> > >>     buffer.putInt(30 * 1000); // Should be 30 seconds.
> > >>>> > >>     // Min bytes field size.
> > >>>> > >>     buffer.putInt(1000000);  // A big number.
> > >>>> > >>     // Num topics.
> > >>>> > >>     buffer.putInt(1); // A single topic.
> > >>>> > >>     // Topic string.
> > >>>> > >>     buffer.putShort(numTopicBytes); // The length of the topic
> > >>>> string as
> > >>>> > >> a short.
> > >>>> > >>     buffer.put(topicBytes); // The topic string bytes.
> > >>>> > >>     // Num partitions field.
> > >>>> > >>     buffer.putInt(numPartitions); // 128 like I said.
> > >>>> > >>     for (int i = 0; i < numPartitions; i++) {
> > >>>> > >>       final int partitionId = i;
> > >>>> > >>       // partition number.
> > >>>> > >>       buffer.putInt(partitionId);
> > >>>> > >>       // offset.
> > >>>> > >>       buffer.putLong(partitionToOffset[partitionId]); // I have
> > an
> > >>>> array
> > >>>> > >> of longs to get this from.
> > >>>> > >>       // maxBytesPerPartition.
> > >>>> > >>       buffer.putInt(maxBytesPerPartition);
> > >>>> > >>     }
> > >>>> > >>
> > >>>> > >>     buffer.flip();
> > >>>> > >>
> > >>>> > >>     return buffer;
> > >>>> > >> }
> > >>>> > >>
> > >>>> > >> I get a response pretty much immediately when I write this
> > request
> > >>>> to
> > >>>> > the
> > >>>> > >> broker. The response parses just fine but has no actual non
> zero
> > >>>> size
> > >>>> > >> message sets.
> > >>>> > >>
> > >>>> > >> Thanks in advance.
> > >>>> > >> Rajiv
> > >>>> > >>
> > >>>> > >>
> > >>>> > >
> > >>>> >
> > >>>>
> > >>>
> > >>>
> > >>
> > >
> >
>

Reply via email to