I am writing a Kafka consumer client using the document at
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

One place where I am having problems is the fetch request itself. I am able
to send fetch requests and can get fetch responses that I can parse
properly, but it seems like the broker is not respecting my max wait time
and min fetch bytes parameters.

To test this part I am sending in a fetch request for 128 partitions of a
single topic  that hasn't seen any messages for a while and is currently
empty. All 128 partitions are on the same broker (running 0.9). I would
expect the broker to NOT send me any replies till my max_wait_time_ms
elapses but it is sending me a reply immediately. This reply is empty (as
expected) since the partitions have no data and I can parse the data just
fine but I don't understand why the broker is sending me a reply
immediately instead of waiting long enough.

Here is how I make a request:

private ByteBuffer createFetchRequestBuffer(int numPartitions) {
    // This does the math to get the size required.
    final int sizeRequired = numBytesRequiredForFetchRequest(numPartitions);
    final ByteBuffer buffer = ByteBuffer.allocateDirect(sizeRequired);
    // Size field
    int sizeField = sizeRequired - 4;
    buffer.putInt(sizeField);
    // API key.
    buffer.putShort(FECTH_REQUEST_API_KEY);  // 1.
    // API version.
    buffer.putShort((short) 0);
    // Correlation id.
    buffer.putInt(-3);  // Just a random correlation id.
    // Client id.
    buffer.putShort(numClientStringBytes); // The length of the client
string as a short.
    buffer.put(clientStringBytes); // The client string bytes.
    // Replica id.
    buffer.putInt(-1);  // As per the recommendation.
    // Max wait time in ms.
    buffer.putInt(30 * 1000); // Should be 30 seconds.
    // Min bytes field size.
    buffer.putInt(1000000);  // A big number.
    // Num topics.
    buffer.putInt(1); // A single topic.
    // Topic string.
    buffer.putShort(numTopicBytes); // The length of the topic string as a
short.
    buffer.put(topicBytes); // The topic string bytes.
    // Num partitions field.
    buffer.putInt(numPartitions); // 128 like I said.
    for (int i = 0; i < numPartitions; i++) {
      final int partitionId = i;
      // partition number.
      buffer.putInt(partitionId);
      // offset.
      buffer.putLong(partitionToOffset[partitionId]); // I have an array of
longs to get this from.
      // maxBytesPerPartition.
      buffer.putInt(maxBytesPerPartition);
    }

    buffer.flip();

    return buffer;
}

I get a response pretty much immediately when I write this request to the
broker. The response parses just fine but has no actual non zero size
message sets.

Thanks in advance.
Rajiv

Reply via email to