I've updated Kafka-3159 with my findings. Thanks, Rajiv
On Thu, Feb 4, 2016 at 10:25 PM, Rajiv Kurian <ra...@signalfx.com> wrote: > I think I found out when the problem happens. When a broker that is sent a > fetch request has no messages for any of the partitions it is being asked > messages for, it returns immediately instead of waiting out the poll > period. Both the kafka 0.9 consumer and my own hand written consumer suffer > the same problem caused by the broker returning immediately. Since my log > retention time is only 10 minutes as soon as I stop all of my producers I > only need to wait for 10 minutes for the logs to age out and for this > problem to happen. > > Though it might not be a big problem for most people,it is quite > conceivable that some users have low frequency topic-partitions that a lot > of clients listen to with big min_wait_time_ms parameters. Examples being > some infrequent metadata update topic. My guess is that such use cases > would actually cause the problem when that low frequency topic-partition is > isolated to a broker or two. It is especially harrowing because it goes > against all intuition that a topic with no traffic should cause little to > no over head. > > I'll update KAFKA-3159 with my findings, but it would be great to get > confirmation that you can make this happen Jason. > > Thanks, > Rajiv > > On Thu, Feb 4, 2016 at 8:58 PM, Rajiv Kurian <ra...@signalfx.com> wrote: > >> I actually restarted my application with the consumer config I mentioned >> at https://issues.apache.org/jira/browse/KAFKA-3159 and I can't get it >> to use high CPU any more :( Not quite sure about how to proceed. I'll try >> to shut down all producers and let the logs age out to see if the problem >> happens under those conditions. >> >> On Thu, Feb 4, 2016 at 8:40 PM, Rajiv Kurian <ra...@signalfx.com> wrote: >> >>> Hey Jason, >>> >>> Yes I checked for error codes. There were none. The message was >>> perfectly legal as parsed by my hand written parser. I also verified the >>> size of the response which was exactly the size of a response with an empty >>> message set per partition. >>> >>> The topic has 128 partitions and has a retention of 10 minutes and a >>> replication factor of 3. The 128 partitions are divided amongst 3 brokers >>> but I managed to replicate the problem of premature responses even running >>> my own code in a debugger connected to a locally running kafka instance. >>> >>> I haven't made any changes to the topic configuration while running >>> these tests. All the changes I have made are to the settings of my fetch >>> request i.e. min_bytes_per_fetch, max_wait_ms and max_bytes_per_partition. >>> I haven't exactly noted all the changes I made but I think I can try to get >>> my original configuration and see if that reproduces the problem both for >>> the consumer I wrote myself and the stock 0.9 consumer. >>> >>> I definitely saw empty responses being returned really quickly when >>> running my own client locally (under a debugger) and so it's just a theory >>> that that might have been the problem being all those EOFExceptions. >>> >>> Rajiv >>> >>> On Thu, Feb 4, 2016 at 6:42 PM, Jason Gustafson <ja...@confluent.io> >>> wrote: >>> >>>> Hey Rajiv, >>>> >>>> Just to be clear, when you received the empty fetch response, did you >>>> check >>>> the error codes? It would help to also include some more information >>>> (such >>>> as broker and topic settings). If you can come up with a way to >>>> reproduce >>>> it, that will help immensely. >>>> >>>> Also, would you mind updating KAFKA-3159 with your findings about the >>>> high >>>> CPU issue? If the problem went away after a configuration change, does >>>> it >>>> come back when those changes are reverted? >>>> >>>> Thanks, >>>> Jason >>>> >>>> On Thu, Feb 4, 2016 at 5:27 PM, Rajiv Kurian <ra...@signalfx.com> >>>> wrote: >>>> >>>> > Indeed this seems to be the case. I am now running the client >>>> mentioned in >>>> > https://issues.apache.org/jira/browse/KAFKA-3159 and it is no longer >>>> > taking up high CPU. The high number of EOF exceptions are also gone. >>>> It is >>>> > performing very well now. I can't understand if the improvement is >>>> because >>>> > of my config changes (min_bytes, max_bytes_per_partition, max wait >>>> time) >>>> > etc or because of a bug in the 0.9 broker. I have definitely under a >>>> > debugger seen a problem where I was getting back empty messages from >>>> the >>>> > broker running locally. It might be worth creating a bug for this. >>>> > >>>> > Thanks, >>>> > Rajiv >>>> > >>>> > On Thu, Feb 4, 2016 at 4:56 PM, Rajiv Kurian <ra...@signalfx.com> >>>> wrote: >>>> > >>>> > > And just like that it stopped happening even though I didn't change >>>> any >>>> > of >>>> > > my code. I had filed >>>> https://issues.apache.org/jira/browse/KAFKA-3159 >>>> > > where the stock 0.9 kafka consumer was using very high CPU and >>>> seeing a >>>> > lot >>>> > > of EOFExceptions on the same topic and partition. I wonder if it was >>>> > > hitting the same problem (lots of empty messages) even though we >>>> asked >>>> > the >>>> > > broker to park the request till enough bytes came through. >>>> > > >>>> > > On Thu, Feb 4, 2016 at 3:21 PM, Rajiv Kurian <ra...@signalfx.com> >>>> wrote: >>>> > > >>>> > >> I am writing a Kafka consumer client using the document at >>>> > >> >>>> > >>>> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol >>>> > >> >>>> > >> One place where I am having problems is the fetch request itself. >>>> I am >>>> > >> able to send fetch requests and can get fetch responses that I can >>>> parse >>>> > >> properly, but it seems like the broker is not respecting my max >>>> wait >>>> > time >>>> > >> and min fetch bytes parameters. >>>> > >> >>>> > >> To test this part I am sending in a fetch request for 128 >>>> partitions of >>>> > a >>>> > >> single topic that hasn't seen any messages for a while and is >>>> currently >>>> > >> empty. All 128 partitions are on the same broker (running 0.9). I >>>> would >>>> > >> expect the broker to NOT send me any replies till my >>>> max_wait_time_ms >>>> > >> elapses but it is sending me a reply immediately. This reply is >>>> empty >>>> > (as >>>> > >> expected) since the partitions have no data and I can parse the >>>> data >>>> > just >>>> > >> fine but I don't understand why the broker is sending me a reply >>>> > >> immediately instead of waiting long enough. >>>> > >> >>>> > >> Here is how I make a request: >>>> > >> >>>> > >> private ByteBuffer createFetchRequestBuffer(int numPartitions) { >>>> > >> // This does the math to get the size required. >>>> > >> final int sizeRequired = >>>> > >> numBytesRequiredForFetchRequest(numPartitions); >>>> > >> final ByteBuffer buffer = >>>> ByteBuffer.allocateDirect(sizeRequired); >>>> > >> // Size field >>>> > >> int sizeField = sizeRequired - 4; >>>> > >> buffer.putInt(sizeField); >>>> > >> // API key. >>>> > >> buffer.putShort(FECTH_REQUEST_API_KEY); // 1. >>>> > >> // API version. >>>> > >> buffer.putShort((short) 0); >>>> > >> // Correlation id. >>>> > >> buffer.putInt(-3); // Just a random correlation id. >>>> > >> // Client id. >>>> > >> buffer.putShort(numClientStringBytes); // The length of the >>>> client >>>> > >> string as a short. >>>> > >> buffer.put(clientStringBytes); // The client string bytes. >>>> > >> // Replica id. >>>> > >> buffer.putInt(-1); // As per the recommendation. >>>> > >> // Max wait time in ms. >>>> > >> buffer.putInt(30 * 1000); // Should be 30 seconds. >>>> > >> // Min bytes field size. >>>> > >> buffer.putInt(1000000); // A big number. >>>> > >> // Num topics. >>>> > >> buffer.putInt(1); // A single topic. >>>> > >> // Topic string. >>>> > >> buffer.putShort(numTopicBytes); // The length of the topic >>>> string as >>>> > >> a short. >>>> > >> buffer.put(topicBytes); // The topic string bytes. >>>> > >> // Num partitions field. >>>> > >> buffer.putInt(numPartitions); // 128 like I said. >>>> > >> for (int i = 0; i < numPartitions; i++) { >>>> > >> final int partitionId = i; >>>> > >> // partition number. >>>> > >> buffer.putInt(partitionId); >>>> > >> // offset. >>>> > >> buffer.putLong(partitionToOffset[partitionId]); // I have an >>>> array >>>> > >> of longs to get this from. >>>> > >> // maxBytesPerPartition. >>>> > >> buffer.putInt(maxBytesPerPartition); >>>> > >> } >>>> > >> >>>> > >> buffer.flip(); >>>> > >> >>>> > >> return buffer; >>>> > >> } >>>> > >> >>>> > >> I get a response pretty much immediately when I write this request >>>> to >>>> > the >>>> > >> broker. The response parses just fine but has no actual non zero >>>> size >>>> > >> message sets. >>>> > >> >>>> > >> Thanks in advance. >>>> > >> Rajiv >>>> > >> >>>> > >> >>>> > > >>>> > >>>> >>> >>> >> >