Ah, yes. You are right. That's a more obvious bug. Will fix that in KAFKA-1984.
Thanks, Jun On Tue, Feb 24, 2015 at 8:37 AM, Xiaoyu Wang <xw...@rocketfuel.com> wrote: > Hi Jun, > > If I understand it correctly. the highlighted line is for avoiding > offline partitions, is it? > > for (int i = 0; i < numPartitions; i++) { > int partition = Utils.abs(counter.getAndIncrement()) % numPartitions; > if (partitions.get(partition).leader() != null) { > return partition; --> should be changed to return the actual > partition number? > } > } > > > On Tue, Feb 24, 2015 at 11:30 AM, Jun Rao <j...@confluent.io> wrote: > > > Hi, Xiaoyu, > > > > 1. Could you explain a bit more what the bug is? The code does try to > avoid > > picking an unavailable partition. There does seem to be an issue when > there > > are more than one thread producing data to the same producer instance. > This > > is being tracked in KAFKA-1984. How many producing threads do you have in > > your test? > > > > Thanks, > > > > Jun > > > > On Tue, Feb 24, 2015 at 7:56 AM, Xiaoyu Wang <xw...@rocketfuel.com> > wrote: > > > > > Jun, > > > > > > I am trying to test how KafkaProducer behaves with topic replication > > factor > > > = 1 > > > > > > 1. One broker is offline BEFORE KafkaProducer starts sending > messages. > > > Because of the bug I mentioned, KafkaProducer sends to the offline > > > partition and hangs forever. > > > 2. One broker goes offline WHILE KafkaProducer is sending messages. > > > KafkaProducer seems to be hanging forever in this case. I am still > > > looking. > > > Do you mind take a look? > > > > > > Thanks > > > > > > > > > > > > > > > On Mon, Feb 23, 2015 at 7:01 PM, Jun Rao <j...@confluent.io> wrote: > > > > > > > The logic in that code is to cycle through all partitions and return > as > > > > soon as we see a partition with the leader. I do see an issue that if > > > there > > > > are multiple threads sending messages to the same producer > > concurrently, > > > we > > > > may not cycle through all partitions and therefore we could return an > > > > unavailable partition even when available partitions are present. > > > > > > > > Do you see this issue with just a single thread producing messages? > The > > > > current logic seems to work correctly in that case. > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > On Fri, Feb 20, 2015 at 12:45 PM, Xiaoyu Wang <xw...@rocketfuel.com> > > > > wrote: > > > > > > > > > Found the problem - it is a bug with Partitions of kafka client. > Can > > > you > > > > > guys confirm and patch in kafka clients? > > > > > > > > > > for (int i = 0; i < numPartitions; i++) { > > > > > int partition = Utils.abs(counter.getAndIncrement()) % > > > numPartitions; > > > > > if (partitions.get(partition).leader() != null) { > > > > > return partitions.get(partition).partition(); > > > > > } > > > > > } > > > > > > > > > > > > > > > > > > > > On Fri, Feb 20, 2015 at 2:35 PM, Xiaoyu Wang <xw...@rocketfuel.com > > > > > > wrote: > > > > > > > > > > > Update: > > > > > > > > > > > > I am using kafka.clients 0.8.2-beta. Below are the test steps > > > > > > > > > > > > 1. setup local kafka clusters with 2 brokers, 0 and 1 > > > > > > 2. create topic X with replication fact 1 and 4 partitions > > > > > > 3. verify that each broker has two partitions > > > > > > 4. shutdown broker 1 > > > > > > 5. start a producer sending data to topic X using > KafkaProducer > > > with > > > > > > required ack = 1 > > > > > > 6. producer hangs and does not exit. > > > > > > > > > > > > Offline partitions were take care of when the partitions is null > > > (code > > > > > > attached below). However, the timeout setting does not seem to > > work. > > > > Not > > > > > > sure what caused KafkaProducer to hang. > > > > > > > > > > > > // choose the next available node in a round-robin fashion > > > > > > for (int i = 0; i < numPartitions; i++) { > > > > > > int partition = Utils.abs(counter.getAndIncrement()) % > > > > numPartitions; > > > > > > if (partitions.get(partition).leader() != null) > > > > > > return partition; > > > > > > } > > > > > > // no partitions are available, give a non-available partition > > > > > > return Utils.abs(counter.getAndIncrement()) % numPartitions; > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Feb 20, 2015 at 1:48 PM, Xiaoyu Wang < > xw...@rocketfuel.com > > > > > > > > wrote: > > > > > > > > > > > >> Hello, > > > > > >> > > > > > >> I am experimenting sending data to kafka using KafkaProducer and > > > found > > > > > >> that when a partition is completely offline, e.g. a topic with > > > > > replication > > > > > >> factor = 1 and some broker is down, KafkaProducer seems to be > > > hanging > > > > > >> forever. Not even exit with the timeout setting. Can you take a > > > look? > > > > > >> > > > > > >> I checked code and found that the partitioner create partition > > based > > > > on > > > > > >> the total partition number - including those offline partitions. > > Is > > > it > > > > > >> possible that we change ProducerClient to ignore offline > > partitions? > > > > > >> > > > > > >> > > > > > >> Thanks, > > > > > >> > > > > > >> -Xiaoyu > > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > > > >