I think I found out when the problem happens. When a broker that is sent a fetch request has no messages for any of the partitions it is being asked messages for, it returns immediately instead of waiting out the poll period. Both the kafka 0.9 consumer and my own hand written consumer suffer the same problem caused by the broker returning immediately. Since my log retention time is only 10 minutes as soon as I stop all of my producers I only need to wait for 10 minutes for the logs to age out and for this problem to happen.
Though it might not be a big problem for most people,it is quite conceivable that some users have low frequency topic-partitions that a lot of clients listen to with big min_wait_time_ms parameters. Examples being some infrequent metadata update topic. My guess is that such use cases would actually cause the problem when that low frequency topic-partition is isolated to a broker or two. It is especially harrowing because it goes against all intuition that a topic with no traffic should cause little to no over head. I'll update KAFKA-3159 with my findings, but it would be great to get confirmation that you can make this happen Jason. Thanks, Rajiv On Thu, Feb 4, 2016 at 8:58 PM, Rajiv Kurian <ra...@signalfx.com> wrote: > I actually restarted my application with the consumer config I mentioned > at https://issues.apache.org/jira/browse/KAFKA-3159 and I can't get it to > use high CPU any more :( Not quite sure about how to proceed. I'll try to > shut down all producers and let the logs age out to see if the problem > happens under those conditions. > > On Thu, Feb 4, 2016 at 8:40 PM, Rajiv Kurian <ra...@signalfx.com> wrote: > >> Hey Jason, >> >> Yes I checked for error codes. There were none. The message was perfectly >> legal as parsed by my hand written parser. I also verified the size of the >> response which was exactly the size of a response with an empty message set >> per partition. >> >> The topic has 128 partitions and has a retention of 10 minutes and a >> replication factor of 3. The 128 partitions are divided amongst 3 brokers >> but I managed to replicate the problem of premature responses even running >> my own code in a debugger connected to a locally running kafka instance. >> >> I haven't made any changes to the topic configuration while running these >> tests. All the changes I have made are to the settings of my fetch request >> i.e. min_bytes_per_fetch, max_wait_ms and max_bytes_per_partition. I >> haven't exactly noted all the changes I made but I think I can try to get >> my original configuration and see if that reproduces the problem both for >> the consumer I wrote myself and the stock 0.9 consumer. >> >> I definitely saw empty responses being returned really quickly when >> running my own client locally (under a debugger) and so it's just a theory >> that that might have been the problem being all those EOFExceptions. >> >> Rajiv >> >> On Thu, Feb 4, 2016 at 6:42 PM, Jason Gustafson <ja...@confluent.io> >> wrote: >> >>> Hey Rajiv, >>> >>> Just to be clear, when you received the empty fetch response, did you >>> check >>> the error codes? It would help to also include some more information >>> (such >>> as broker and topic settings). If you can come up with a way to reproduce >>> it, that will help immensely. >>> >>> Also, would you mind updating KAFKA-3159 with your findings about the >>> high >>> CPU issue? If the problem went away after a configuration change, does it >>> come back when those changes are reverted? >>> >>> Thanks, >>> Jason >>> >>> On Thu, Feb 4, 2016 at 5:27 PM, Rajiv Kurian <ra...@signalfx.com> wrote: >>> >>> > Indeed this seems to be the case. I am now running the client >>> mentioned in >>> > https://issues.apache.org/jira/browse/KAFKA-3159 and it is no longer >>> > taking up high CPU. The high number of EOF exceptions are also gone. >>> It is >>> > performing very well now. I can't understand if the improvement is >>> because >>> > of my config changes (min_bytes, max_bytes_per_partition, max wait >>> time) >>> > etc or because of a bug in the 0.9 broker. I have definitely under a >>> > debugger seen a problem where I was getting back empty messages from >>> the >>> > broker running locally. It might be worth creating a bug for this. >>> > >>> > Thanks, >>> > Rajiv >>> > >>> > On Thu, Feb 4, 2016 at 4:56 PM, Rajiv Kurian <ra...@signalfx.com> >>> wrote: >>> > >>> > > And just like that it stopped happening even though I didn't change >>> any >>> > of >>> > > my code. I had filed >>> https://issues.apache.org/jira/browse/KAFKA-3159 >>> > > where the stock 0.9 kafka consumer was using very high CPU and >>> seeing a >>> > lot >>> > > of EOFExceptions on the same topic and partition. I wonder if it was >>> > > hitting the same problem (lots of empty messages) even though we >>> asked >>> > the >>> > > broker to park the request till enough bytes came through. >>> > > >>> > > On Thu, Feb 4, 2016 at 3:21 PM, Rajiv Kurian <ra...@signalfx.com> >>> wrote: >>> > > >>> > >> I am writing a Kafka consumer client using the document at >>> > >> >>> > >>> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol >>> > >> >>> > >> One place where I am having problems is the fetch request itself. I >>> am >>> > >> able to send fetch requests and can get fetch responses that I can >>> parse >>> > >> properly, but it seems like the broker is not respecting my max wait >>> > time >>> > >> and min fetch bytes parameters. >>> > >> >>> > >> To test this part I am sending in a fetch request for 128 >>> partitions of >>> > a >>> > >> single topic that hasn't seen any messages for a while and is >>> currently >>> > >> empty. All 128 partitions are on the same broker (running 0.9). I >>> would >>> > >> expect the broker to NOT send me any replies till my >>> max_wait_time_ms >>> > >> elapses but it is sending me a reply immediately. This reply is >>> empty >>> > (as >>> > >> expected) since the partitions have no data and I can parse the data >>> > just >>> > >> fine but I don't understand why the broker is sending me a reply >>> > >> immediately instead of waiting long enough. >>> > >> >>> > >> Here is how I make a request: >>> > >> >>> > >> private ByteBuffer createFetchRequestBuffer(int numPartitions) { >>> > >> // This does the math to get the size required. >>> > >> final int sizeRequired = >>> > >> numBytesRequiredForFetchRequest(numPartitions); >>> > >> final ByteBuffer buffer = >>> ByteBuffer.allocateDirect(sizeRequired); >>> > >> // Size field >>> > >> int sizeField = sizeRequired - 4; >>> > >> buffer.putInt(sizeField); >>> > >> // API key. >>> > >> buffer.putShort(FECTH_REQUEST_API_KEY); // 1. >>> > >> // API version. >>> > >> buffer.putShort((short) 0); >>> > >> // Correlation id. >>> > >> buffer.putInt(-3); // Just a random correlation id. >>> > >> // Client id. >>> > >> buffer.putShort(numClientStringBytes); // The length of the >>> client >>> > >> string as a short. >>> > >> buffer.put(clientStringBytes); // The client string bytes. >>> > >> // Replica id. >>> > >> buffer.putInt(-1); // As per the recommendation. >>> > >> // Max wait time in ms. >>> > >> buffer.putInt(30 * 1000); // Should be 30 seconds. >>> > >> // Min bytes field size. >>> > >> buffer.putInt(1000000); // A big number. >>> > >> // Num topics. >>> > >> buffer.putInt(1); // A single topic. >>> > >> // Topic string. >>> > >> buffer.putShort(numTopicBytes); // The length of the topic >>> string as >>> > >> a short. >>> > >> buffer.put(topicBytes); // The topic string bytes. >>> > >> // Num partitions field. >>> > >> buffer.putInt(numPartitions); // 128 like I said. >>> > >> for (int i = 0; i < numPartitions; i++) { >>> > >> final int partitionId = i; >>> > >> // partition number. >>> > >> buffer.putInt(partitionId); >>> > >> // offset. >>> > >> buffer.putLong(partitionToOffset[partitionId]); // I have an >>> array >>> > >> of longs to get this from. >>> > >> // maxBytesPerPartition. >>> > >> buffer.putInt(maxBytesPerPartition); >>> > >> } >>> > >> >>> > >> buffer.flip(); >>> > >> >>> > >> return buffer; >>> > >> } >>> > >> >>> > >> I get a response pretty much immediately when I write this request >>> to >>> > the >>> > >> broker. The response parses just fine but has no actual non zero >>> size >>> > >> message sets. >>> > >> >>> > >> Thanks in advance. >>> > >> Rajiv >>> > >> >>> > >> >>> > > >>> > >>> >> >> >