Indeed this seems to be the case. I am now running the client mentioned in https://issues.apache.org/jira/browse/KAFKA-3159 and it is no longer taking up high CPU. The high number of EOF exceptions are also gone. It is performing very well now. I can't understand if the improvement is because of my config changes (min_bytes, max_bytes_per_partition, max wait time) etc or because of a bug in the 0.9 broker. I have definitely under a debugger seen a problem where I was getting back empty messages from the broker running locally. It might be worth creating a bug for this.
Thanks, Rajiv On Thu, Feb 4, 2016 at 4:56 PM, Rajiv Kurian <ra...@signalfx.com> wrote: > And just like that it stopped happening even though I didn't change any of > my code. I had filed https://issues.apache.org/jira/browse/KAFKA-3159 > where the stock 0.9 kafka consumer was using very high CPU and seeing a lot > of EOFExceptions on the same topic and partition. I wonder if it was > hitting the same problem (lots of empty messages) even though we asked the > broker to park the request till enough bytes came through. > > On Thu, Feb 4, 2016 at 3:21 PM, Rajiv Kurian <ra...@signalfx.com> wrote: > >> I am writing a Kafka consumer client using the document at >> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol >> >> One place where I am having problems is the fetch request itself. I am >> able to send fetch requests and can get fetch responses that I can parse >> properly, but it seems like the broker is not respecting my max wait time >> and min fetch bytes parameters. >> >> To test this part I am sending in a fetch request for 128 partitions of a >> single topic that hasn't seen any messages for a while and is currently >> empty. All 128 partitions are on the same broker (running 0.9). I would >> expect the broker to NOT send me any replies till my max_wait_time_ms >> elapses but it is sending me a reply immediately. This reply is empty (as >> expected) since the partitions have no data and I can parse the data just >> fine but I don't understand why the broker is sending me a reply >> immediately instead of waiting long enough. >> >> Here is how I make a request: >> >> private ByteBuffer createFetchRequestBuffer(int numPartitions) { >> // This does the math to get the size required. >> final int sizeRequired = >> numBytesRequiredForFetchRequest(numPartitions); >> final ByteBuffer buffer = ByteBuffer.allocateDirect(sizeRequired); >> // Size field >> int sizeField = sizeRequired - 4; >> buffer.putInt(sizeField); >> // API key. >> buffer.putShort(FECTH_REQUEST_API_KEY); // 1. >> // API version. >> buffer.putShort((short) 0); >> // Correlation id. >> buffer.putInt(-3); // Just a random correlation id. >> // Client id. >> buffer.putShort(numClientStringBytes); // The length of the client >> string as a short. >> buffer.put(clientStringBytes); // The client string bytes. >> // Replica id. >> buffer.putInt(-1); // As per the recommendation. >> // Max wait time in ms. >> buffer.putInt(30 * 1000); // Should be 30 seconds. >> // Min bytes field size. >> buffer.putInt(1000000); // A big number. >> // Num topics. >> buffer.putInt(1); // A single topic. >> // Topic string. >> buffer.putShort(numTopicBytes); // The length of the topic string as >> a short. >> buffer.put(topicBytes); // The topic string bytes. >> // Num partitions field. >> buffer.putInt(numPartitions); // 128 like I said. >> for (int i = 0; i < numPartitions; i++) { >> final int partitionId = i; >> // partition number. >> buffer.putInt(partitionId); >> // offset. >> buffer.putLong(partitionToOffset[partitionId]); // I have an array >> of longs to get this from. >> // maxBytesPerPartition. >> buffer.putInt(maxBytesPerPartition); >> } >> >> buffer.flip(); >> >> return buffer; >> } >> >> I get a response pretty much immediately when I write this request to the >> broker. The response parses just fine but has no actual non zero size >> message sets. >> >> Thanks in advance. >> Rajiv >> >> >