Thanks Jason. On Fri, Feb 5, 2016 at 10:13 AM, Jason Gustafson <ja...@confluent.io> wrote:
> Hey Rajiv, > > Thanks for all the updates. I think I've been able to reproduce this. The > key seems to be waiting for the old log segment to be deleted. I'll > investigate a bit more and report what I find on the JIRA. > > -Jason > > On Fri, Feb 5, 2016 at 9:50 AM, Rajiv Kurian <ra...@signalfx.com> wrote: > > > I've updated Kafka-3159 with my findings. > > > > Thanks, > > Rajiv > > > > On Thu, Feb 4, 2016 at 10:25 PM, Rajiv Kurian <ra...@signalfx.com> > wrote: > > > > > I think I found out when the problem happens. When a broker that is > sent > > a > > > fetch request has no messages for any of the partitions it is being > asked > > > messages for, it returns immediately instead of waiting out the poll > > > period. Both the kafka 0.9 consumer and my own hand written consumer > > suffer > > > the same problem caused by the broker returning immediately. Since my > log > > > retention time is only 10 minutes as soon as I stop all of my > producers I > > > only need to wait for 10 minutes for the logs to age out and for this > > > problem to happen. > > > > > > Though it might not be a big problem for most people,it is quite > > > conceivable that some users have low frequency topic-partitions that a > > lot > > > of clients listen to with big min_wait_time_ms parameters. Examples > being > > > some infrequent metadata update topic. My guess is that such use cases > > > would actually cause the problem when that low frequency > topic-partition > > is > > > isolated to a broker or two. It is especially harrowing because it goes > > > against all intuition that a topic with no traffic should cause little > to > > > no over head. > > > > > > I'll update KAFKA-3159 with my findings, but it would be great to get > > > confirmation that you can make this happen Jason. > > > > > > Thanks, > > > Rajiv > > > > > > On Thu, Feb 4, 2016 at 8:58 PM, Rajiv Kurian <ra...@signalfx.com> > wrote: > > > > > >> I actually restarted my application with the consumer config I > mentioned > > >> at https://issues.apache.org/jira/browse/KAFKA-3159 and I can't get > it > > >> to use high CPU any more :( Not quite sure about how to proceed. I'll > > try > > >> to shut down all producers and let the logs age out to see if the > > problem > > >> happens under those conditions. > > >> > > >> On Thu, Feb 4, 2016 at 8:40 PM, Rajiv Kurian <ra...@signalfx.com> > > wrote: > > >> > > >>> Hey Jason, > > >>> > > >>> Yes I checked for error codes. There were none. The message was > > >>> perfectly legal as parsed by my hand written parser. I also verified > > the > > >>> size of the response which was exactly the size of a response with an > > empty > > >>> message set per partition. > > >>> > > >>> The topic has 128 partitions and has a retention of 10 minutes and a > > >>> replication factor of 3. The 128 partitions are divided amongst 3 > > brokers > > >>> but I managed to replicate the problem of premature responses even > > running > > >>> my own code in a debugger connected to a locally running kafka > > instance. > > >>> > > >>> I haven't made any changes to the topic configuration while running > > >>> these tests. All the changes I have made are to the settings of my > > fetch > > >>> request i.e. min_bytes_per_fetch, max_wait_ms and > > max_bytes_per_partition. > > >>> I haven't exactly noted all the changes I made but I think I can try > > to get > > >>> my original configuration and see if that reproduces the problem both > > for > > >>> the consumer I wrote myself and the stock 0.9 consumer. > > >>> > > >>> I definitely saw empty responses being returned really quickly when > > >>> running my own client locally (under a debugger) and so it's just a > > theory > > >>> that that might have been the problem being all those EOFExceptions. > > >>> > > >>> Rajiv > > >>> > > >>> On Thu, Feb 4, 2016 at 6:42 PM, Jason Gustafson <ja...@confluent.io> > > >>> wrote: > > >>> > > >>>> Hey Rajiv, > > >>>> > > >>>> Just to be clear, when you received the empty fetch response, did > you > > >>>> check > > >>>> the error codes? It would help to also include some more information > > >>>> (such > > >>>> as broker and topic settings). If you can come up with a way to > > >>>> reproduce > > >>>> it, that will help immensely. > > >>>> > > >>>> Also, would you mind updating KAFKA-3159 with your findings about > the > > >>>> high > > >>>> CPU issue? If the problem went away after a configuration change, > does > > >>>> it > > >>>> come back when those changes are reverted? > > >>>> > > >>>> Thanks, > > >>>> Jason > > >>>> > > >>>> On Thu, Feb 4, 2016 at 5:27 PM, Rajiv Kurian <ra...@signalfx.com> > > >>>> wrote: > > >>>> > > >>>> > Indeed this seems to be the case. I am now running the client > > >>>> mentioned in > > >>>> > https://issues.apache.org/jira/browse/KAFKA-3159 and it is no > > longer > > >>>> > taking up high CPU. The high number of EOF exceptions are also > gone. > > >>>> It is > > >>>> > performing very well now. I can't understand if the improvement is > > >>>> because > > >>>> > of my config changes (min_bytes, max_bytes_per_partition, max > wait > > >>>> time) > > >>>> > etc or because of a bug in the 0.9 broker. I have definitely > under a > > >>>> > debugger seen a problem where I was getting back empty messages > from > > >>>> the > > >>>> > broker running locally. It might be worth creating a bug for this. > > >>>> > > > >>>> > Thanks, > > >>>> > Rajiv > > >>>> > > > >>>> > On Thu, Feb 4, 2016 at 4:56 PM, Rajiv Kurian <ra...@signalfx.com> > > >>>> wrote: > > >>>> > > > >>>> > > And just like that it stopped happening even though I didn't > > change > > >>>> any > > >>>> > of > > >>>> > > my code. I had filed > > >>>> https://issues.apache.org/jira/browse/KAFKA-3159 > > >>>> > > where the stock 0.9 kafka consumer was using very high CPU and > > >>>> seeing a > > >>>> > lot > > >>>> > > of EOFExceptions on the same topic and partition. I wonder if it > > was > > >>>> > > hitting the same problem (lots of empty messages) even though we > > >>>> asked > > >>>> > the > > >>>> > > broker to park the request till enough bytes came through. > > >>>> > > > > >>>> > > On Thu, Feb 4, 2016 at 3:21 PM, Rajiv Kurian < > ra...@signalfx.com> > > >>>> wrote: > > >>>> > > > > >>>> > >> I am writing a Kafka consumer client using the document at > > >>>> > >> > > >>>> > > > >>>> > > > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol > > >>>> > >> > > >>>> > >> One place where I am having problems is the fetch request > itself. > > >>>> I am > > >>>> > >> able to send fetch requests and can get fetch responses that I > > can > > >>>> parse > > >>>> > >> properly, but it seems like the broker is not respecting my max > > >>>> wait > > >>>> > time > > >>>> > >> and min fetch bytes parameters. > > >>>> > >> > > >>>> > >> To test this part I am sending in a fetch request for 128 > > >>>> partitions of > > >>>> > a > > >>>> > >> single topic that hasn't seen any messages for a while and is > > >>>> currently > > >>>> > >> empty. All 128 partitions are on the same broker (running > 0.9). I > > >>>> would > > >>>> > >> expect the broker to NOT send me any replies till my > > >>>> max_wait_time_ms > > >>>> > >> elapses but it is sending me a reply immediately. This reply is > > >>>> empty > > >>>> > (as > > >>>> > >> expected) since the partitions have no data and I can parse the > > >>>> data > > >>>> > just > > >>>> > >> fine but I don't understand why the broker is sending me a > reply > > >>>> > >> immediately instead of waiting long enough. > > >>>> > >> > > >>>> > >> Here is how I make a request: > > >>>> > >> > > >>>> > >> private ByteBuffer createFetchRequestBuffer(int numPartitions) > { > > >>>> > >> // This does the math to get the size required. > > >>>> > >> final int sizeRequired = > > >>>> > >> numBytesRequiredForFetchRequest(numPartitions); > > >>>> > >> final ByteBuffer buffer = > > >>>> ByteBuffer.allocateDirect(sizeRequired); > > >>>> > >> // Size field > > >>>> > >> int sizeField = sizeRequired - 4; > > >>>> > >> buffer.putInt(sizeField); > > >>>> > >> // API key. > > >>>> > >> buffer.putShort(FECTH_REQUEST_API_KEY); // 1. > > >>>> > >> // API version. > > >>>> > >> buffer.putShort((short) 0); > > >>>> > >> // Correlation id. > > >>>> > >> buffer.putInt(-3); // Just a random correlation id. > > >>>> > >> // Client id. > > >>>> > >> buffer.putShort(numClientStringBytes); // The length of the > > >>>> client > > >>>> > >> string as a short. > > >>>> > >> buffer.put(clientStringBytes); // The client string bytes. > > >>>> > >> // Replica id. > > >>>> > >> buffer.putInt(-1); // As per the recommendation. > > >>>> > >> // Max wait time in ms. > > >>>> > >> buffer.putInt(30 * 1000); // Should be 30 seconds. > > >>>> > >> // Min bytes field size. > > >>>> > >> buffer.putInt(1000000); // A big number. > > >>>> > >> // Num topics. > > >>>> > >> buffer.putInt(1); // A single topic. > > >>>> > >> // Topic string. > > >>>> > >> buffer.putShort(numTopicBytes); // The length of the topic > > >>>> string as > > >>>> > >> a short. > > >>>> > >> buffer.put(topicBytes); // The topic string bytes. > > >>>> > >> // Num partitions field. > > >>>> > >> buffer.putInt(numPartitions); // 128 like I said. > > >>>> > >> for (int i = 0; i < numPartitions; i++) { > > >>>> > >> final int partitionId = i; > > >>>> > >> // partition number. > > >>>> > >> buffer.putInt(partitionId); > > >>>> > >> // offset. > > >>>> > >> buffer.putLong(partitionToOffset[partitionId]); // I have > > an > > >>>> array > > >>>> > >> of longs to get this from. > > >>>> > >> // maxBytesPerPartition. > > >>>> > >> buffer.putInt(maxBytesPerPartition); > > >>>> > >> } > > >>>> > >> > > >>>> > >> buffer.flip(); > > >>>> > >> > > >>>> > >> return buffer; > > >>>> > >> } > > >>>> > >> > > >>>> > >> I get a response pretty much immediately when I write this > > request > > >>>> to > > >>>> > the > > >>>> > >> broker. The response parses just fine but has no actual non > zero > > >>>> size > > >>>> > >> message sets. > > >>>> > >> > > >>>> > >> Thanks in advance. > > >>>> > >> Rajiv > > >>>> > >> > > >>>> > >> > > >>>> > > > > >>>> > > > >>>> > > >>> > > >>> > > >> > > > > > >