I am writing a Kafka consumer client using the document at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
One place where I am having problems is the fetch request itself. I am able to send fetch requests and can get fetch responses that I can parse properly, but it seems like the broker is not respecting my max wait time and min fetch bytes parameters. To test this part I am sending in a fetch request for 128 partitions of a single topic that hasn't seen any messages for a while and is currently empty. All 128 partitions are on the same broker (running 0.9). I would expect the broker to NOT send me any replies till my max_wait_time_ms elapses but it is sending me a reply immediately. This reply is empty (as expected) since the partitions have no data and I can parse the data just fine but I don't understand why the broker is sending me a reply immediately instead of waiting long enough. Here is how I make a request: private ByteBuffer createFetchRequestBuffer(int numPartitions) { // This does the math to get the size required. final int sizeRequired = numBytesRequiredForFetchRequest(numPartitions); final ByteBuffer buffer = ByteBuffer.allocateDirect(sizeRequired); // Size field int sizeField = sizeRequired - 4; buffer.putInt(sizeField); // API key. buffer.putShort(FECTH_REQUEST_API_KEY); // 1. // API version. buffer.putShort((short) 0); // Correlation id. buffer.putInt(-3); // Just a random correlation id. // Client id. buffer.putShort(numClientStringBytes); // The length of the client string as a short. buffer.put(clientStringBytes); // The client string bytes. // Replica id. buffer.putInt(-1); // As per the recommendation. // Max wait time in ms. buffer.putInt(30 * 1000); // Should be 30 seconds. // Min bytes field size. buffer.putInt(1000000); // A big number. // Num topics. buffer.putInt(1); // A single topic. // Topic string. buffer.putShort(numTopicBytes); // The length of the topic string as a short. buffer.put(topicBytes); // The topic string bytes. // Num partitions field. buffer.putInt(numPartitions); // 128 like I said. for (int i = 0; i < numPartitions; i++) { final int partitionId = i; // partition number. buffer.putInt(partitionId); // offset. buffer.putLong(partitionToOffset[partitionId]); // I have an array of longs to get this from. // maxBytesPerPartition. buffer.putInt(maxBytesPerPartition); } buffer.flip(); return buffer; } I get a response pretty much immediately when I write this request to the broker. The response parses just fine but has no actual non zero size message sets. Thanks in advance. Rajiv