I am planning to use the current Java API and have the following use case:

i) A single topic with about 1024 partitions.
ii) A number of processes that want to consume these partitions in a
deterministic way. The machine -> partitions assignment is done outside of
kafka. During the lifetime of a process it might relinquish ownership of
partitions and also gain ownership of partitions. This coordination again
is done outside of kafka and is guaranteed to work through other
coordination channels.

Here are my questions:

1) Given this use case it seems like the SimpleConsumer is the way to go. I
ran into a problem where it seems like a single SimpleConsumer can connect
to only a single broker. If my assumption is correct then I need to
maintain a SimpleConsumer per broker and ensure that it only receives
requests from partitions that it is the leader for. Further I need to
maintain this invariant amidst brokers falling down and the leader shifting
around. Given that the SimpleConsumer seems to be making blocking network
calls I was planning to have one SimpleConsumer (only responsible for a
single broker) per thread. So num threads = num brokers. I further have to
have these threads communicate with each other when leadership
responsibilities shift around. For eg: Say partition 1 is on Broker 1 (i.e.
SimpleConsumer on Thread 1) and it moves to Broker 2 (SimpleConsumer on
thread 2). I need to communicate to thread 2 that it is responsible for
partition 1 now. Does that seem about right? Is there anything else I could
do to handle this scenario in an easier way? From a previous mailing list
question that I asked it seems that the only way to avoid head of line
blocking in this case is to isolate each SimpleConsumer to it's own thread
and let the OS take care of scheduling.

2) Is it possible to multiplex multiple partition requests in a single
FetchRequest as long as I can guarantee that those partitions are owned by
the same broker (the one the SimpleConsumer is connecting to)? From the
SimpleConsumer example it seems like this should be possible:

FetchRequestBuilder builder = new FetchRequestBuilder().clientId(clientName
).maxWait(

                    MAX_WAIT);

            for (Integer partition : partitions) {

                // Note: this fetchSize of 100000 might need to be
increased if large batches are

                // written to Kafka.

                builder.addFetchMY_TOPIC, partition,

                        readOffsets.get(partition), 100000

            }

FetchResponse fetchResponse = simpleConsumer.fetch(builder.build());


3) If (2) is possible I am still confused about how to figure out errors in
the response. The API for errorcodes seems to require the partition and
topic. Something like:

short code = fetchResponse.errorCode(MY_TOPIC, somePartition);

In the example this code is examined to figure out whether a partition has
had it's leader change. My problem is that I am multiplexing multiple
partition requests so from this API I can't figure out which partitions had
an issue short of trying to query the fetchResponse for every one of the
partitions that the consumer is responsible for. Is this the only way?  Say
I was doing this

for (Integer partition : myPartitions) {

  short code = fetchResponse.errorCode(MY_TOPIC, partition);

  if (code == ErrorMapping.NoError()) continue;

 else handleErrorCode(code);

}


4) Is there any documentation/guidelines on how to handle the other error
codes ?Here are the ones I could find from ErrorMapping:

                ErrorMapping.BrokerNotAvailableCode(); // Should I try to
query a new leader?

                ErrorMapping.EmptyByteBuffer(); // Probably not applicable
for the consumer?

                ErrorMapping.InvalidFetchSizeCode(); // Something wrong
with my request building logic?

                ErrorMapping.InvalidMessageCode(); // No idea.

                ErrorMapping.LeaderNotAvailableCode(); // Try to query new
leader?

                ErrorMapping.MessageSizeTooLargeCode(); // Again probably
formed too big of a request?

                ErrorMapping.NotLeaderForPartitionCode(); // Try to query a
new leader?

                ErrorMapping.OffsetOutOfRangeCode(); // Refresh last read
offset as per the example.

                ErrorMapping.ReplicaNotAvailableCode(); // Is this relevant
to SimpleConsumer?

                ErrorMapping.RequestTimedOutCode(); // Retry?

               ErrorMapping.StaleControllerEpochCode(); // No Idea..


               ErrorMapping.UnknownTopicOrPartitionCode(); // Something
terrible has happened :)

Given that my SimpleConsumer is connecting to a single broker with
partition requests that the broker is the leader for, what is the
difference between ErrorMapping.BrokerNotAvailableCode() and
ErrorMapping.LeaderNotAvailableCode()?


Thanks again.

Reply via email to