FWIW, this intensely confusing behavior is fixed in the new producer which
should give the expected result by default.

-Jay

On Mon, Mar 2, 2015 at 6:36 PM, Yang <teddyyyy...@gmail.com> wrote:

> Thanks. This is indeed the reason.
> On Mar 2, 2015 4:38 PM, "Christian Csar" <christ...@csar.us> wrote:
>
> > I believe you are seeing the behavior where the random partitioner is
> > sticky.
> >
> >
> http://mail-archives.apache.org/mod_mbox/kafka-users/201309.mbox/%3ccahwhrrxax5ynimqnacsk7jcggnhjc340y4qbqoqcismm43u...@mail.gmail.com%3E
> > has details. So with the default 10 minute refresh if your test is only
> an
> > hour or two with a single producer you would not expect to see all
> > partitions be hit.
> >
> > Christian
> >
> > On Mon, Mar 2, 2015 at 4:23 PM, Yang <teddyyyy...@gmail.com> wrote:
> >
> > > thanks. just checked code below. in the code below, the line that calls
> > > Random.nextInt() seems to be called only *a few times* , and all the
> rest
> > > of the cases getPartition() is called, the
> > > cached sendPartitionPerTopicCache.get(topic) seems to be called, so
> > > apparently you won't get an even partition distribution ?????
> > >
> > > the code I got is from commit 7847e9c703f3a0b70519666cdb8a6e4c8e37c3a7
> > >
> > >
> > > "./core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala"
> > 336
> > > lines --66%--
> > >                                     222,46        73%
> > >
> > >
> > >   private def getPartition(topic: String, key: Any, topicPartitionList:
> > > Seq[PartitionAndLeader]): Int = {
> > >     val numPartitions = topicPartitionList.size
> > >     if(numPartitions <= 0)
> > >       throw new UnknownTopicOrPartitionException("Topic " + topic + "
> > > doesn't exist")
> > >     val partition =
> > >       if(key == null) {
> > >         // If the key is null, we don't really need a partitioner
> > >         // So we look up in the send partition cache for the topic to
> > > decide the target partition
> > >         val id = sendPartitionPerTopicCache.get(topic)
> > >         id match {
> > >           case Some(partitionId) =>
> > >             // directly return the partitionId without checking
> > > availability of the leader,
> > >             // since we want to postpone the failure until the send
> > > operation anyways
> > >             partitionId
> > >           case None =>
> > >             val availablePartitions =
> > > topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
> > >             if (availablePartitions.isEmpty)
> > >               throw new LeaderNotAvailableException("No leader for any
> > > partition in topic " + topic)
> > >             val index = Utils.abs(Random.nextInt) %
> > > availablePartitions.size
> > >             val partitionId = availablePartitions(index).partitionId
> > >             sendPartitionPerTopicCache.put(topic, partitionId)
> > >             partitionId
> > >         }
> > >       } else
> > >         partitioner.partition(key, numPartitions)
> > >     if(partition < 0 || partition >= numPartitions)
> > >       throw new UnknownTopicOrPartitionException("Invalid partition id:
> > " +
> > > partition + " for topic " + topic +
> > >         "; Valid values are in the inclusive range of [0, " +
> > > (numPartitions-1) + "]")
> > >     trace("Assigning message of topic %s and key %s to a selected
> > partition
> > > %d".format(topic, if (key == null) "[none]" else key.toString,
> > partition))
> > >     partition
> > >   }
> > >
> > >
> > > On Mon, Mar 2, 2015 at 3:58 PM, Mayuresh Gharat <
> > > gharatmayures...@gmail.com>
> > > wrote:
> > >
> > > > Probably your keys are getting hashed to only those partitions. I
> don't
> > > > think anything is wrong here.
> > > > You can check how the default hashPartitioner is used in the code and
> > try
> > > > to do the same for your keys before you send them and check which
> > > > partitions are those going to.
> > > >
> > > > The default hashpartitioner does something like this :
> > > >
> > > > hash(key) % numPartitions.
> > > >
> > > > Thanks,
> > > >
> > > > Mayuresh
> > > >
> > > > On Mon, Mar 2, 2015 at 3:52 PM, Yang <teddyyyy...@gmail.com> wrote:
> > > >
> > > > > we have 10 partitions for a topic, and omit the explicit partition
> > > param
> > > > in
> > > > > the message creation:
> > > > >
> > > > > KeyedMessage<String, String> data = new KeyedMessage<String,
> String>
> > > > > (mytopic,   myMessageContent);   // partition key need to be
> polished
> > > > > producer.send(data);
> > > > >
> > > > >
> > > > >
> > > > > but on average 3--5 of the partitions are empty.
> > > > >
> > > > >
> > > > >
> > > > > what went wrong?
> > > > >
> > > > > thanks
> > > > > Yang
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -Regards,
> > > > Mayuresh R. Gharat
> > > > (862) 250-7125
> > > >
> > >
> >
>

Reply via email to