1), 2), 3) Yes. If you use SimpleConsumer, you have to figure out the
leader of each partition and connect to the right broker. Each fetch
request can send multiple partitions (if with same leader) and you need to
examine the error code per partition.

Not all those error codes are applicable to the consumers. For all leader
related errors, refresh metadata and retry. For OffsetOutOfRange, you need
to reset the offset to a valid one. The rest of the errors are not
recoverable.

You may want to take a look at
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

Thanks,

Jun

On Wed, Oct 29, 2014 at 10:53 AM, Rajiv Kurian <ra...@signalfuse.com> wrote:

> I am planning to use the current Java API and have the following use case:
>
> i) A single topic with about 1024 partitions.
> ii) A number of processes that want to consume these partitions in a
> deterministic way. The machine -> partitions assignment is done outside of
> kafka. During the lifetime of a process it might relinquish ownership of
> partitions and also gain ownership of partitions. This coordination again
> is done outside of kafka and is guaranteed to work through other
> coordination channels.
>
> Here are my questions:
>
> 1) Given this use case it seems like the SimpleConsumer is the way to go. I
> ran into a problem where it seems like a single SimpleConsumer can connect
> to only a single broker. If my assumption is correct then I need to
> maintain a SimpleConsumer per broker and ensure that it only receives
> requests from partitions that it is the leader for. Further I need to
> maintain this invariant amidst brokers falling down and the leader shifting
> around. Given that the SimpleConsumer seems to be making blocking network
> calls I was planning to have one SimpleConsumer (only responsible for a
> single broker) per thread. So num threads = num brokers. I further have to
> have these threads communicate with each other when leadership
> responsibilities shift around. For eg: Say partition 1 is on Broker 1 (i.e.
> SimpleConsumer on Thread 1) and it moves to Broker 2 (SimpleConsumer on
> thread 2). I need to communicate to thread 2 that it is responsible for
> partition 1 now. Does that seem about right? Is there anything else I could
> do to handle this scenario in an easier way? From a previous mailing list
> question that I asked it seems that the only way to avoid head of line
> blocking in this case is to isolate each SimpleConsumer to it's own thread
> and let the OS take care of scheduling.
>
> 2) Is it possible to multiplex multiple partition requests in a single
> FetchRequest as long as I can guarantee that those partitions are owned by
> the same broker (the one the SimpleConsumer is connecting to)? From the
> SimpleConsumer example it seems like this should be possible:
>
> FetchRequestBuilder builder = new FetchRequestBuilder().clientId(clientName
> ).maxWait(
>
>                     MAX_WAIT);
>
>             for (Integer partition : partitions) {
>
>                 // Note: this fetchSize of 100000 might need to be
> increased if large batches are
>
>                 // written to Kafka.
>
>                 builder.addFetchMY_TOPIC, partition,
>
>                         readOffsets.get(partition), 100000
>
>             }
>
> FetchResponse fetchResponse = simpleConsumer.fetch(builder.build());
>
>
> 3) If (2) is possible I am still confused about how to figure out errors in
> the response. The API for errorcodes seems to require the partition and
> topic. Something like:
>
> short code = fetchResponse.errorCode(MY_TOPIC, somePartition);
>
> In the example this code is examined to figure out whether a partition has
> had it's leader change. My problem is that I am multiplexing multiple
> partition requests so from this API I can't figure out which partitions had
> an issue short of trying to query the fetchResponse for every one of the
> partitions that the consumer is responsible for. Is this the only way?  Say
> I was doing this
>
> for (Integer partition : myPartitions) {
>
>   short code = fetchResponse.errorCode(MY_TOPIC, partition);
>
>   if (code == ErrorMapping.NoError()) continue;
>
>  else handleErrorCode(code);
>
> }
>
>
> 4) Is there any documentation/guidelines on how to handle the other error
> codes ?Here are the ones I could find from ErrorMapping:
>
>                 ErrorMapping.BrokerNotAvailableCode(); // Should I try to
> query a new leader?
>
>                 ErrorMapping.EmptyByteBuffer(); // Probably not applicable
> for the consumer?
>
>                 ErrorMapping.InvalidFetchSizeCode(); // Something wrong
> with my request building logic?
>
>                 ErrorMapping.InvalidMessageCode(); // No idea.
>
>                 ErrorMapping.LeaderNotAvailableCode(); // Try to query new
> leader?
>
>                 ErrorMapping.MessageSizeTooLargeCode(); // Again probably
> formed too big of a request?
>
>                 ErrorMapping.NotLeaderForPartitionCode(); // Try to query a
> new leader?
>
>                 ErrorMapping.OffsetOutOfRangeCode(); // Refresh last read
> offset as per the example.
>
>                 ErrorMapping.ReplicaNotAvailableCode(); // Is this relevant
> to SimpleConsumer?
>
>                 ErrorMapping.RequestTimedOutCode(); // Retry?
>
>                ErrorMapping.StaleControllerEpochCode(); // No Idea..
>
>
>                ErrorMapping.UnknownTopicOrPartitionCode(); // Something
> terrible has happened :)
>
> Given that my SimpleConsumer is connecting to a single broker with
> partition requests that the broker is the leader for, what is the
> difference between ErrorMapping.BrokerNotAvailableCode() and
> ErrorMapping.LeaderNotAvailableCode()?
>
>
> Thanks again.
>

Reply via email to