I am planning to use the current Java API and have the following use case: i) A single topic with about 1024 partitions. ii) A number of processes that want to consume these partitions in a deterministic way. The machine -> partitions assignment is done outside of kafka. During the lifetime of a process it might relinquish ownership of partitions and also gain ownership of partitions. This coordination again is done outside of kafka and is guaranteed to work through other coordination channels.
Here are my questions: 1) Given this use case it seems like the SimpleConsumer is the way to go. I ran into a problem where it seems like a single SimpleConsumer can connect to only a single broker. If my assumption is correct then I need to maintain a SimpleConsumer per broker and ensure that it only receives requests from partitions that it is the leader for. Further I need to maintain this invariant amidst brokers falling down and the leader shifting around. Given that the SimpleConsumer seems to be making blocking network calls I was planning to have one SimpleConsumer (only responsible for a single broker) per thread. So num threads = num brokers. I further have to have these threads communicate with each other when leadership responsibilities shift around. For eg: Say partition 1 is on Broker 1 (i.e. SimpleConsumer on Thread 1) and it moves to Broker 2 (SimpleConsumer on thread 2). I need to communicate to thread 2 that it is responsible for partition 1 now. Does that seem about right? Is there anything else I could do to handle this scenario in an easier way? From a previous mailing list question that I asked it seems that the only way to avoid head of line blocking in this case is to isolate each SimpleConsumer to it's own thread and let the OS take care of scheduling. 2) Is it possible to multiplex multiple partition requests in a single FetchRequest as long as I can guarantee that those partitions are owned by the same broker (the one the SimpleConsumer is connecting to)? From the SimpleConsumer example it seems like this should be possible: FetchRequestBuilder builder = new FetchRequestBuilder().clientId(clientName ).maxWait( MAX_WAIT); for (Integer partition : partitions) { // Note: this fetchSize of 100000 might need to be increased if large batches are // written to Kafka. builder.addFetchMY_TOPIC, partition, readOffsets.get(partition), 100000 } FetchResponse fetchResponse = simpleConsumer.fetch(builder.build()); 3) If (2) is possible I am still confused about how to figure out errors in the response. The API for errorcodes seems to require the partition and topic. Something like: short code = fetchResponse.errorCode(MY_TOPIC, somePartition); In the example this code is examined to figure out whether a partition has had it's leader change. My problem is that I am multiplexing multiple partition requests so from this API I can't figure out which partitions had an issue short of trying to query the fetchResponse for every one of the partitions that the consumer is responsible for. Is this the only way? Say I was doing this for (Integer partition : myPartitions) { short code = fetchResponse.errorCode(MY_TOPIC, partition); if (code == ErrorMapping.NoError()) continue; else handleErrorCode(code); } 4) Is there any documentation/guidelines on how to handle the other error codes ?Here are the ones I could find from ErrorMapping: ErrorMapping.BrokerNotAvailableCode(); // Should I try to query a new leader? ErrorMapping.EmptyByteBuffer(); // Probably not applicable for the consumer? ErrorMapping.InvalidFetchSizeCode(); // Something wrong with my request building logic? ErrorMapping.InvalidMessageCode(); // No idea. ErrorMapping.LeaderNotAvailableCode(); // Try to query new leader? ErrorMapping.MessageSizeTooLargeCode(); // Again probably formed too big of a request? ErrorMapping.NotLeaderForPartitionCode(); // Try to query a new leader? ErrorMapping.OffsetOutOfRangeCode(); // Refresh last read offset as per the example. ErrorMapping.ReplicaNotAvailableCode(); // Is this relevant to SimpleConsumer? ErrorMapping.RequestTimedOutCode(); // Retry? ErrorMapping.StaleControllerEpochCode(); // No Idea.. ErrorMapping.UnknownTopicOrPartitionCode(); // Something terrible has happened :) Given that my SimpleConsumer is connecting to a single broker with partition requests that the broker is the leader for, what is the difference between ErrorMapping.BrokerNotAvailableCode() and ErrorMapping.LeaderNotAvailableCode()? Thanks again.