Thanks Jun, looks like I am on the right track. On Wed, Oct 29, 2014 at 6:51 PM, Jun Rao <jun...@gmail.com> wrote:
> 1), 2), 3) Yes. If you use SimpleConsumer, you have to figure out the > leader of each partition and connect to the right broker. Each fetch > request can send multiple partitions (if with same leader) and you need to > examine the error code per partition. > > Not all those error codes are applicable to the consumers. For all leader > related errors, refresh metadata and retry. For OffsetOutOfRange, you need > to reset the offset to a valid one. The rest of the errors are not > recoverable. > > You may want to take a look at > > https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example > > Thanks, > > Jun > > On Wed, Oct 29, 2014 at 10:53 AM, Rajiv Kurian <ra...@signalfuse.com> > wrote: > > > I am planning to use the current Java API and have the following use > case: > > > > i) A single topic with about 1024 partitions. > > ii) A number of processes that want to consume these partitions in a > > deterministic way. The machine -> partitions assignment is done outside > of > > kafka. During the lifetime of a process it might relinquish ownership of > > partitions and also gain ownership of partitions. This coordination again > > is done outside of kafka and is guaranteed to work through other > > coordination channels. > > > > Here are my questions: > > > > 1) Given this use case it seems like the SimpleConsumer is the way to > go. I > > ran into a problem where it seems like a single SimpleConsumer can > connect > > to only a single broker. If my assumption is correct then I need to > > maintain a SimpleConsumer per broker and ensure that it only receives > > requests from partitions that it is the leader for. Further I need to > > maintain this invariant amidst brokers falling down and the leader > shifting > > around. Given that the SimpleConsumer seems to be making blocking network > > calls I was planning to have one SimpleConsumer (only responsible for a > > single broker) per thread. So num threads = num brokers. I further have > to > > have these threads communicate with each other when leadership > > responsibilities shift around. For eg: Say partition 1 is on Broker 1 > (i.e. > > SimpleConsumer on Thread 1) and it moves to Broker 2 (SimpleConsumer on > > thread 2). I need to communicate to thread 2 that it is responsible for > > partition 1 now. Does that seem about right? Is there anything else I > could > > do to handle this scenario in an easier way? From a previous mailing list > > question that I asked it seems that the only way to avoid head of line > > blocking in this case is to isolate each SimpleConsumer to it's own > thread > > and let the OS take care of scheduling. > > > > 2) Is it possible to multiplex multiple partition requests in a single > > FetchRequest as long as I can guarantee that those partitions are owned > by > > the same broker (the one the SimpleConsumer is connecting to)? From the > > SimpleConsumer example it seems like this should be possible: > > > > FetchRequestBuilder builder = new > FetchRequestBuilder().clientId(clientName > > ).maxWait( > > > > MAX_WAIT); > > > > for (Integer partition : partitions) { > > > > // Note: this fetchSize of 100000 might need to be > > increased if large batches are > > > > // written to Kafka. > > > > builder.addFetchMY_TOPIC, partition, > > > > readOffsets.get(partition), 100000 > > > > } > > > > FetchResponse fetchResponse = simpleConsumer.fetch(builder.build()); > > > > > > 3) If (2) is possible I am still confused about how to figure out errors > in > > the response. The API for errorcodes seems to require the partition and > > topic. Something like: > > > > short code = fetchResponse.errorCode(MY_TOPIC, somePartition); > > > > In the example this code is examined to figure out whether a partition > has > > had it's leader change. My problem is that I am multiplexing multiple > > partition requests so from this API I can't figure out which partitions > had > > an issue short of trying to query the fetchResponse for every one of the > > partitions that the consumer is responsible for. Is this the only way? > Say > > I was doing this > > > > for (Integer partition : myPartitions) { > > > > short code = fetchResponse.errorCode(MY_TOPIC, partition); > > > > if (code == ErrorMapping.NoError()) continue; > > > > else handleErrorCode(code); > > > > } > > > > > > 4) Is there any documentation/guidelines on how to handle the other error > > codes ?Here are the ones I could find from ErrorMapping: > > > > ErrorMapping.BrokerNotAvailableCode(); // Should I try to > > query a new leader? > > > > ErrorMapping.EmptyByteBuffer(); // Probably not > applicable > > for the consumer? > > > > ErrorMapping.InvalidFetchSizeCode(); // Something wrong > > with my request building logic? > > > > ErrorMapping.InvalidMessageCode(); // No idea. > > > > ErrorMapping.LeaderNotAvailableCode(); // Try to query > new > > leader? > > > > ErrorMapping.MessageSizeTooLargeCode(); // Again probably > > formed too big of a request? > > > > ErrorMapping.NotLeaderForPartitionCode(); // Try to > query a > > new leader? > > > > ErrorMapping.OffsetOutOfRangeCode(); // Refresh last read > > offset as per the example. > > > > ErrorMapping.ReplicaNotAvailableCode(); // Is this > relevant > > to SimpleConsumer? > > > > ErrorMapping.RequestTimedOutCode(); // Retry? > > > > ErrorMapping.StaleControllerEpochCode(); // No Idea.. > > > > > > ErrorMapping.UnknownTopicOrPartitionCode(); // Something > > terrible has happened :) > > > > Given that my SimpleConsumer is connecting to a single broker with > > partition requests that the broker is the leader for, what is the > > difference between ErrorMapping.BrokerNotAvailableCode() and > > ErrorMapping.LeaderNotAvailableCode()? > > > > > > Thanks again. > > >