Hey Jason, Yes I checked for error codes. There were none. The message was perfectly legal as parsed by my hand written parser. I also verified the size of the response which was exactly the size of a response with an empty message set per partition.
The topic has 128 partitions and has a retention of 10 minutes and a replication factor of 3. The 128 partitions are divided amongst 3 brokers but I managed to replicate the problem of premature responses even running my own code in a debugger connected to a locally running kafka instance. I haven't made any changes to the topic configuration while running these tests. All the changes I have made are to the settings of my fetch request i.e. min_bytes_per_fetch, max_wait_ms and max_bytes_per_partition. I haven't exactly noted all the changes I made but I think I can try to get my original configuration and see if that reproduces the problem both for the consumer I wrote myself and the stock 0.9 consumer. I definitely saw empty responses being returned really quickly when running my own client locally (under a debugger) and so it's just a theory that that might have been the problem being all those EOFExceptions. Rajiv On Thu, Feb 4, 2016 at 6:42 PM, Jason Gustafson <ja...@confluent.io> wrote: > Hey Rajiv, > > Just to be clear, when you received the empty fetch response, did you check > the error codes? It would help to also include some more information (such > as broker and topic settings). If you can come up with a way to reproduce > it, that will help immensely. > > Also, would you mind updating KAFKA-3159 with your findings about the high > CPU issue? If the problem went away after a configuration change, does it > come back when those changes are reverted? > > Thanks, > Jason > > On Thu, Feb 4, 2016 at 5:27 PM, Rajiv Kurian <ra...@signalfx.com> wrote: > > > Indeed this seems to be the case. I am now running the client mentioned > in > > https://issues.apache.org/jira/browse/KAFKA-3159 and it is no longer > > taking up high CPU. The high number of EOF exceptions are also gone. It > is > > performing very well now. I can't understand if the improvement is > because > > of my config changes (min_bytes, max_bytes_per_partition, max wait time) > > etc or because of a bug in the 0.9 broker. I have definitely under a > > debugger seen a problem where I was getting back empty messages from the > > broker running locally. It might be worth creating a bug for this. > > > > Thanks, > > Rajiv > > > > On Thu, Feb 4, 2016 at 4:56 PM, Rajiv Kurian <ra...@signalfx.com> wrote: > > > > > And just like that it stopped happening even though I didn't change any > > of > > > my code. I had filed https://issues.apache.org/jira/browse/KAFKA-3159 > > > where the stock 0.9 kafka consumer was using very high CPU and seeing a > > lot > > > of EOFExceptions on the same topic and partition. I wonder if it was > > > hitting the same problem (lots of empty messages) even though we asked > > the > > > broker to park the request till enough bytes came through. > > > > > > On Thu, Feb 4, 2016 at 3:21 PM, Rajiv Kurian <ra...@signalfx.com> > wrote: > > > > > >> I am writing a Kafka consumer client using the document at > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol > > >> > > >> One place where I am having problems is the fetch request itself. I am > > >> able to send fetch requests and can get fetch responses that I can > parse > > >> properly, but it seems like the broker is not respecting my max wait > > time > > >> and min fetch bytes parameters. > > >> > > >> To test this part I am sending in a fetch request for 128 partitions > of > > a > > >> single topic that hasn't seen any messages for a while and is > currently > > >> empty. All 128 partitions are on the same broker (running 0.9). I > would > > >> expect the broker to NOT send me any replies till my max_wait_time_ms > > >> elapses but it is sending me a reply immediately. This reply is empty > > (as > > >> expected) since the partitions have no data and I can parse the data > > just > > >> fine but I don't understand why the broker is sending me a reply > > >> immediately instead of waiting long enough. > > >> > > >> Here is how I make a request: > > >> > > >> private ByteBuffer createFetchRequestBuffer(int numPartitions) { > > >> // This does the math to get the size required. > > >> final int sizeRequired = > > >> numBytesRequiredForFetchRequest(numPartitions); > > >> final ByteBuffer buffer = ByteBuffer.allocateDirect(sizeRequired); > > >> // Size field > > >> int sizeField = sizeRequired - 4; > > >> buffer.putInt(sizeField); > > >> // API key. > > >> buffer.putShort(FECTH_REQUEST_API_KEY); // 1. > > >> // API version. > > >> buffer.putShort((short) 0); > > >> // Correlation id. > > >> buffer.putInt(-3); // Just a random correlation id. > > >> // Client id. > > >> buffer.putShort(numClientStringBytes); // The length of the client > > >> string as a short. > > >> buffer.put(clientStringBytes); // The client string bytes. > > >> // Replica id. > > >> buffer.putInt(-1); // As per the recommendation. > > >> // Max wait time in ms. > > >> buffer.putInt(30 * 1000); // Should be 30 seconds. > > >> // Min bytes field size. > > >> buffer.putInt(1000000); // A big number. > > >> // Num topics. > > >> buffer.putInt(1); // A single topic. > > >> // Topic string. > > >> buffer.putShort(numTopicBytes); // The length of the topic string > as > > >> a short. > > >> buffer.put(topicBytes); // The topic string bytes. > > >> // Num partitions field. > > >> buffer.putInt(numPartitions); // 128 like I said. > > >> for (int i = 0; i < numPartitions; i++) { > > >> final int partitionId = i; > > >> // partition number. > > >> buffer.putInt(partitionId); > > >> // offset. > > >> buffer.putLong(partitionToOffset[partitionId]); // I have an > array > > >> of longs to get this from. > > >> // maxBytesPerPartition. > > >> buffer.putInt(maxBytesPerPartition); > > >> } > > >> > > >> buffer.flip(); > > >> > > >> return buffer; > > >> } > > >> > > >> I get a response pretty much immediately when I write this request to > > the > > >> broker. The response parses just fine but has no actual non zero size > > >> message sets. > > >> > > >> Thanks in advance. > > >> Rajiv > > >> > > >> > > > > > >