Xiaoyu,

For 1, I have a patch for 0.8.2 in
https://issues.apache.org/jira/browse/KAFKA-1984. Could you test it out and
see if it fixes your issue?

For 2, I did some local testing. The only issue I saw is that producer can
block on close since there are still unsent messages in the bufferpool.
This is a known issue and is being tracked in
https://issues.apache.org/jira/browse/KAFKA-1788. Could you confirm whether
your producer blocks during send or during close (you can figure it out by
taking a thread dump)?

Thanks,

Jun



On Tue, Feb 24, 2015 at 10:14 AM, Xiaoyu Wang <xw...@rocketfuel.com> wrote:

> Jun,
>
> Can you also take a look at the second problem I am having?
>
> > > > I am trying to test how KafkaProducer behaves with topic replication
> > > factor
> > > > = 1
> > > >
> > > >    1. One broker is offline BEFORE KafkaProducer starts sending
> > messages.
> > > >    Because of  the bug I mentioned, KafkaProducer sends to the
> offline
> > > >    partition and hangs forever.
>
>
> *> > >    2. One broker goes offline WHILE KafkaProducer is sending
> messages. > > >    KafkaProducer seems to be hanging forever in this case.
> I am still > > > looking.*
>
> On Tue, Feb 24, 2015 at 12:03 PM, Jun Rao <j...@confluent.io> wrote:
>
> > Ah, yes. You are right. That's a more obvious bug. Will fix that in
> > KAFKA-1984.
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Feb 24, 2015 at 8:37 AM, Xiaoyu Wang <xw...@rocketfuel.com>
> wrote:
> >
> > > Hi Jun,
> > >
> > > If I understand it correctly. the highlighted line is for avoiding
> > > offline partitions, is it?
> > >
> > > for (int i = 0; i < numPartitions; i++) {
> > >     int partition = Utils.abs(counter.getAndIncrement()) %
> numPartitions;
> > >     if (partitions.get(partition).leader() != null) {
> > >         return partition; --> should be changed to return the actual
> > > partition number?
> > >     }
> > > }
> > >
> > >
> > > On Tue, Feb 24, 2015 at 11:30 AM, Jun Rao <j...@confluent.io> wrote:
> > >
> > > > Hi, Xiaoyu,
> > > >
> > > > 1. Could you explain a bit more what the bug is? The code does try to
> > > avoid
> > > > picking an unavailable partition. There does seem to be an issue when
> > > there
> > > > are more than one thread producing data to the same producer
> instance.
> > > This
> > > > is being tracked in KAFKA-1984. How many producing threads do you
> have
> > in
> > > > your test?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Tue, Feb 24, 2015 at 7:56 AM, Xiaoyu Wang <xw...@rocketfuel.com>
> > > wrote:
> > > >
> > > > > Jun,
> > > > >
> > > > > I am trying to test how KafkaProducer behaves with topic
> replication
> > > > factor
> > > > > = 1
> > > > >
> > > > >    1. One broker is offline BEFORE KafkaProducer starts sending
> > > messages.
> > > > >    Because of  the bug I mentioned, KafkaProducer sends to the
> > offline
> > > > >    partition and hangs forever.
> > > > >    2. One broker goes offline WHILE KafkaProducer is sending
> > messages.
> > > > >    KafkaProducer seems to be hanging forever in this case. I am
> still
> > > > > looking.
> > > > >    Do you mind take a look?
> > > > >
> > > > > Thanks
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Feb 23, 2015 at 7:01 PM, Jun Rao <j...@confluent.io> wrote:
> > > > >
> > > > > > The logic in that code is to cycle through all partitions and
> > return
> > > as
> > > > > > soon as we see a partition with the leader. I do see an issue
> that
> > if
> > > > > there
> > > > > > are multiple threads sending messages to the same producer
> > > > concurrently,
> > > > > we
> > > > > > may not cycle through all partitions and therefore we could
> return
> > an
> > > > > > unavailable partition even when available partitions are present.
> > > > > >
> > > > > > Do you see this issue with just a single thread producing
> messages?
> > > The
> > > > > > current logic seems to work correctly in that case.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Fri, Feb 20, 2015 at 12:45 PM, Xiaoyu Wang <
> > xw...@rocketfuel.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Found the problem - it is a bug with Partitions of kafka
> client.
> > > Can
> > > > > you
> > > > > > > guys confirm and patch in kafka clients?
> > > > > > >
> > > > > > > for (int i = 0; i < numPartitions; i++) {
> > > > > > >     int partition = Utils.abs(counter.getAndIncrement()) %
> > > > > numPartitions;
> > > > > > >     if (partitions.get(partition).leader() != null) {
> > > > > > >         return partitions.get(partition).partition();
> > > > > > >     }
> > > > > > > }
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Feb 20, 2015 at 2:35 PM, Xiaoyu Wang <
> > xw...@rocketfuel.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Update:
> > > > > > > >
> > > > > > > > I am using kafka.clients 0.8.2-beta.  Below are the test
> steps
> > > > > > > >
> > > > > > > >    1. setup local kafka clusters with 2 brokers, 0 and 1
> > > > > > > >    2. create topic X with replication fact 1 and 4 partitions
> > > > > > > >    3. verify that each broker has two partitions
> > > > > > > >    4. shutdown broker 1
> > > > > > > >    5. start a producer sending data to topic X using
> > > KafkaProducer
> > > > > with
> > > > > > > >    required ack = 1
> > > > > > > >    6. producer hangs and does not exit.
> > > > > > > >
> > > > > > > > Offline partitions were take care of when the partitions is
> > null
> > > > > (code
> > > > > > > > attached below). However, the timeout setting does not seem
> to
> > > > work.
> > > > > > Not
> > > > > > > > sure what caused KafkaProducer to hang.
> > > > > > > >
> > > > > > > > // choose the next available node in a round-robin fashion
> > > > > > > > for (int i = 0; i < numPartitions; i++) {
> > > > > > > >     int partition = Utils.abs(counter.getAndIncrement()) %
> > > > > > numPartitions;
> > > > > > > >     if (partitions.get(partition).leader() != null)
> > > > > > > >         return partition;
> > > > > > > > }
> > > > > > > > // no partitions are available, give a non-available
> partition
> > > > > > > > return Utils.abs(counter.getAndIncrement()) % numPartitions;
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Feb 20, 2015 at 1:48 PM, Xiaoyu Wang <
> > > xw...@rocketfuel.com
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > >> Hello,
> > > > > > > >>
> > > > > > > >> I am experimenting sending data to kafka using KafkaProducer
> > and
> > > > > found
> > > > > > > >> that when a partition is completely offline, e.g. a topic
> with
> > > > > > > replication
> > > > > > > >> factor = 1 and some broker is down, KafkaProducer seems to
> be
> > > > > hanging
> > > > > > > >> forever. Not even exit with the timeout setting. Can you
> take
> > a
> > > > > look?
> > > > > > > >>
> > > > > > > >> I checked code and found that the partitioner create
> partition
> > > > based
> > > > > > on
> > > > > > > >> the total partition number - including those offline
> > partitions.
> > > > Is
> > > > > it
> > > > > > > >> possible that we change ProducerClient to ignore offline
> > > > partitions?
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> Thanks,
> > > > > > > >>
> > > > > > > >> -Xiaoyu
> > > > > > > >>
> > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to