Thanks Jun, looks like I am on the right track.

On Wed, Oct 29, 2014 at 6:51 PM, Jun Rao <jun...@gmail.com> wrote:

> 1), 2), 3) Yes. If you use SimpleConsumer, you have to figure out the
> leader of each partition and connect to the right broker. Each fetch
> request can send multiple partitions (if with same leader) and you need to
> examine the error code per partition.
>
> Not all those error codes are applicable to the consumers. For all leader
> related errors, refresh metadata and retry. For OffsetOutOfRange, you need
> to reset the offset to a valid one. The rest of the errors are not
> recoverable.
>
> You may want to take a look at
>
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
>
> Thanks,
>
> Jun
>
> On Wed, Oct 29, 2014 at 10:53 AM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>
> > I am planning to use the current Java API and have the following use
> case:
> >
> > i) A single topic with about 1024 partitions.
> > ii) A number of processes that want to consume these partitions in a
> > deterministic way. The machine -> partitions assignment is done outside
> of
> > kafka. During the lifetime of a process it might relinquish ownership of
> > partitions and also gain ownership of partitions. This coordination again
> > is done outside of kafka and is guaranteed to work through other
> > coordination channels.
> >
> > Here are my questions:
> >
> > 1) Given this use case it seems like the SimpleConsumer is the way to
> go. I
> > ran into a problem where it seems like a single SimpleConsumer can
> connect
> > to only a single broker. If my assumption is correct then I need to
> > maintain a SimpleConsumer per broker and ensure that it only receives
> > requests from partitions that it is the leader for. Further I need to
> > maintain this invariant amidst brokers falling down and the leader
> shifting
> > around. Given that the SimpleConsumer seems to be making blocking network
> > calls I was planning to have one SimpleConsumer (only responsible for a
> > single broker) per thread. So num threads = num brokers. I further have
> to
> > have these threads communicate with each other when leadership
> > responsibilities shift around. For eg: Say partition 1 is on Broker 1
> (i.e.
> > SimpleConsumer on Thread 1) and it moves to Broker 2 (SimpleConsumer on
> > thread 2). I need to communicate to thread 2 that it is responsible for
> > partition 1 now. Does that seem about right? Is there anything else I
> could
> > do to handle this scenario in an easier way? From a previous mailing list
> > question that I asked it seems that the only way to avoid head of line
> > blocking in this case is to isolate each SimpleConsumer to it's own
> thread
> > and let the OS take care of scheduling.
> >
> > 2) Is it possible to multiplex multiple partition requests in a single
> > FetchRequest as long as I can guarantee that those partitions are owned
> by
> > the same broker (the one the SimpleConsumer is connecting to)? From the
> > SimpleConsumer example it seems like this should be possible:
> >
> > FetchRequestBuilder builder = new
> FetchRequestBuilder().clientId(clientName
> > ).maxWait(
> >
> >                     MAX_WAIT);
> >
> >             for (Integer partition : partitions) {
> >
> >                 // Note: this fetchSize of 100000 might need to be
> > increased if large batches are
> >
> >                 // written to Kafka.
> >
> >                 builder.addFetchMY_TOPIC, partition,
> >
> >                         readOffsets.get(partition), 100000
> >
> >             }
> >
> > FetchResponse fetchResponse = simpleConsumer.fetch(builder.build());
> >
> >
> > 3) If (2) is possible I am still confused about how to figure out errors
> in
> > the response. The API for errorcodes seems to require the partition and
> > topic. Something like:
> >
> > short code = fetchResponse.errorCode(MY_TOPIC, somePartition);
> >
> > In the example this code is examined to figure out whether a partition
> has
> > had it's leader change. My problem is that I am multiplexing multiple
> > partition requests so from this API I can't figure out which partitions
> had
> > an issue short of trying to query the fetchResponse for every one of the
> > partitions that the consumer is responsible for. Is this the only way?
> Say
> > I was doing this
> >
> > for (Integer partition : myPartitions) {
> >
> >   short code = fetchResponse.errorCode(MY_TOPIC, partition);
> >
> >   if (code == ErrorMapping.NoError()) continue;
> >
> >  else handleErrorCode(code);
> >
> > }
> >
> >
> > 4) Is there any documentation/guidelines on how to handle the other error
> > codes ?Here are the ones I could find from ErrorMapping:
> >
> >                 ErrorMapping.BrokerNotAvailableCode(); // Should I try to
> > query a new leader?
> >
> >                 ErrorMapping.EmptyByteBuffer(); // Probably not
> applicable
> > for the consumer?
> >
> >                 ErrorMapping.InvalidFetchSizeCode(); // Something wrong
> > with my request building logic?
> >
> >                 ErrorMapping.InvalidMessageCode(); // No idea.
> >
> >                 ErrorMapping.LeaderNotAvailableCode(); // Try to query
> new
> > leader?
> >
> >                 ErrorMapping.MessageSizeTooLargeCode(); // Again probably
> > formed too big of a request?
> >
> >                 ErrorMapping.NotLeaderForPartitionCode(); // Try to
> query a
> > new leader?
> >
> >                 ErrorMapping.OffsetOutOfRangeCode(); // Refresh last read
> > offset as per the example.
> >
> >                 ErrorMapping.ReplicaNotAvailableCode(); // Is this
> relevant
> > to SimpleConsumer?
> >
> >                 ErrorMapping.RequestTimedOutCode(); // Retry?
> >
> >                ErrorMapping.StaleControllerEpochCode(); // No Idea..
> >
> >
> >                ErrorMapping.UnknownTopicOrPartitionCode(); // Something
> > terrible has happened :)
> >
> > Given that my SimpleConsumer is connecting to a single broker with
> > partition requests that the broker is the leader for, what is the
> > difference between ErrorMapping.BrokerNotAvailableCode() and
> > ErrorMapping.LeaderNotAvailableCode()?
> >
> >
> > Thanks again.
> >
>

Reply via email to