Hi Jun,

If I understand it correctly. the highlighted line is for avoiding
offline partitions, is it?

for (int i = 0; i < numPartitions; i++) {
    int partition = Utils.abs(counter.getAndIncrement()) % numPartitions;
    if (partitions.get(partition).leader() != null) {
        return partition; --> should be changed to return the actual
partition number?
    }
}


On Tue, Feb 24, 2015 at 11:30 AM, Jun Rao <j...@confluent.io> wrote:

> Hi, Xiaoyu,
>
> 1. Could you explain a bit more what the bug is? The code does try to avoid
> picking an unavailable partition. There does seem to be an issue when there
> are more than one thread producing data to the same producer instance. This
> is being tracked in KAFKA-1984. How many producing threads do you have in
> your test?
>
> Thanks,
>
> Jun
>
> On Tue, Feb 24, 2015 at 7:56 AM, Xiaoyu Wang <xw...@rocketfuel.com> wrote:
>
> > Jun,
> >
> > I am trying to test how KafkaProducer behaves with topic replication
> factor
> > = 1
> >
> >    1. One broker is offline BEFORE KafkaProducer starts sending messages.
> >    Because of  the bug I mentioned, KafkaProducer sends to the offline
> >    partition and hangs forever.
> >    2. One broker goes offline WHILE KafkaProducer is sending messages.
> >    KafkaProducer seems to be hanging forever in this case. I am still
> > looking.
> >    Do you mind take a look?
> >
> > Thanks
> >
> >
> >
> >
> > On Mon, Feb 23, 2015 at 7:01 PM, Jun Rao <j...@confluent.io> wrote:
> >
> > > The logic in that code is to cycle through all partitions and return as
> > > soon as we see a partition with the leader. I do see an issue that if
> > there
> > > are multiple threads sending messages to the same producer
> concurrently,
> > we
> > > may not cycle through all partitions and therefore we could return an
> > > unavailable partition even when available partitions are present.
> > >
> > > Do you see this issue with just a single thread producing messages? The
> > > current logic seems to work correctly in that case.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Fri, Feb 20, 2015 at 12:45 PM, Xiaoyu Wang <xw...@rocketfuel.com>
> > > wrote:
> > >
> > > > Found the problem - it is a bug with Partitions of kafka client. Can
> > you
> > > > guys confirm and patch in kafka clients?
> > > >
> > > > for (int i = 0; i < numPartitions; i++) {
> > > >     int partition = Utils.abs(counter.getAndIncrement()) %
> > numPartitions;
> > > >     if (partitions.get(partition).leader() != null) {
> > > >         return partitions.get(partition).partition();
> > > >     }
> > > > }
> > > >
> > > >
> > > >
> > > > On Fri, Feb 20, 2015 at 2:35 PM, Xiaoyu Wang <xw...@rocketfuel.com>
> > > wrote:
> > > >
> > > > > Update:
> > > > >
> > > > > I am using kafka.clients 0.8.2-beta.  Below are the test steps
> > > > >
> > > > >    1. setup local kafka clusters with 2 brokers, 0 and 1
> > > > >    2. create topic X with replication fact 1 and 4 partitions
> > > > >    3. verify that each broker has two partitions
> > > > >    4. shutdown broker 1
> > > > >    5. start a producer sending data to topic X using KafkaProducer
> > with
> > > > >    required ack = 1
> > > > >    6. producer hangs and does not exit.
> > > > >
> > > > > Offline partitions were take care of when the partitions is null
> > (code
> > > > > attached below). However, the timeout setting does not seem to
> work.
> > > Not
> > > > > sure what caused KafkaProducer to hang.
> > > > >
> > > > > // choose the next available node in a round-robin fashion
> > > > > for (int i = 0; i < numPartitions; i++) {
> > > > >     int partition = Utils.abs(counter.getAndIncrement()) %
> > > numPartitions;
> > > > >     if (partitions.get(partition).leader() != null)
> > > > >         return partition;
> > > > > }
> > > > > // no partitions are available, give a non-available partition
> > > > > return Utils.abs(counter.getAndIncrement()) % numPartitions;
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Fri, Feb 20, 2015 at 1:48 PM, Xiaoyu Wang <xw...@rocketfuel.com
> >
> > > > wrote:
> > > > >
> > > > >> Hello,
> > > > >>
> > > > >> I am experimenting sending data to kafka using KafkaProducer and
> > found
> > > > >> that when a partition is completely offline, e.g. a topic with
> > > > replication
> > > > >> factor = 1 and some broker is down, KafkaProducer seems to be
> > hanging
> > > > >> forever. Not even exit with the timeout setting. Can you take a
> > look?
> > > > >>
> > > > >> I checked code and found that the partitioner create partition
> based
> > > on
> > > > >> the total partition number - including those offline partitions.
> Is
> > it
> > > > >> possible that we change ProducerClient to ignore offline
> partitions?
> > > > >>
> > > > >>
> > > > >> Thanks,
> > > > >>
> > > > >> -Xiaoyu
> > > > >>
> > > > >>
> > > > >
> > > >
> > >
> >
>

Reply via email to