I believe you are seeing the behavior where the random partitioner is
sticky.
http://mail-archives.apache.org/mod_mbox/kafka-users/201309.mbox/%3ccahwhrrxax5ynimqnacsk7jcggnhjc340y4qbqoqcismm43u...@mail.gmail.com%3E
has details. So with the default 10 minute refresh if your test is only an
hour or two with a single producer you would not expect to see all
partitions be hit.

Christian

On Mon, Mar 2, 2015 at 4:23 PM, Yang <teddyyyy...@gmail.com> wrote:

> thanks. just checked code below. in the code below, the line that calls
> Random.nextInt() seems to be called only *a few times* , and all the rest
> of the cases getPartition() is called, the
> cached sendPartitionPerTopicCache.get(topic) seems to be called, so
> apparently you won't get an even partition distribution ?????
>
> the code I got is from commit 7847e9c703f3a0b70519666cdb8a6e4c8e37c3a7
>
>
> "./core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala" 336
> lines --66%--
>                                     222,46        73%
>
>
>   private def getPartition(topic: String, key: Any, topicPartitionList:
> Seq[PartitionAndLeader]): Int = {
>     val numPartitions = topicPartitionList.size
>     if(numPartitions <= 0)
>       throw new UnknownTopicOrPartitionException("Topic " + topic + "
> doesn't exist")
>     val partition =
>       if(key == null) {
>         // If the key is null, we don't really need a partitioner
>         // So we look up in the send partition cache for the topic to
> decide the target partition
>         val id = sendPartitionPerTopicCache.get(topic)
>         id match {
>           case Some(partitionId) =>
>             // directly return the partitionId without checking
> availability of the leader,
>             // since we want to postpone the failure until the send
> operation anyways
>             partitionId
>           case None =>
>             val availablePartitions =
> topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
>             if (availablePartitions.isEmpty)
>               throw new LeaderNotAvailableException("No leader for any
> partition in topic " + topic)
>             val index = Utils.abs(Random.nextInt) %
> availablePartitions.size
>             val partitionId = availablePartitions(index).partitionId
>             sendPartitionPerTopicCache.put(topic, partitionId)
>             partitionId
>         }
>       } else
>         partitioner.partition(key, numPartitions)
>     if(partition < 0 || partition >= numPartitions)
>       throw new UnknownTopicOrPartitionException("Invalid partition id: " +
> partition + " for topic " + topic +
>         "; Valid values are in the inclusive range of [0, " +
> (numPartitions-1) + "]")
>     trace("Assigning message of topic %s and key %s to a selected partition
> %d".format(topic, if (key == null) "[none]" else key.toString, partition))
>     partition
>   }
>
>
> On Mon, Mar 2, 2015 at 3:58 PM, Mayuresh Gharat <
> gharatmayures...@gmail.com>
> wrote:
>
> > Probably your keys are getting hashed to only those partitions. I don't
> > think anything is wrong here.
> > You can check how the default hashPartitioner is used in the code and try
> > to do the same for your keys before you send them and check which
> > partitions are those going to.
> >
> > The default hashpartitioner does something like this :
> >
> > hash(key) % numPartitions.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Mon, Mar 2, 2015 at 3:52 PM, Yang <teddyyyy...@gmail.com> wrote:
> >
> > > we have 10 partitions for a topic, and omit the explicit partition
> param
> > in
> > > the message creation:
> > >
> > > KeyedMessage<String, String> data = new KeyedMessage<String, String>
> > > (mytopic,   myMessageContent);   // partition key need to be polished
> > > producer.send(data);
> > >
> > >
> > >
> > > but on average 3--5 of the partitions are empty.
> > >
> > >
> > >
> > > what went wrong?
> > >
> > > thanks
> > > Yang
> > >
> >
> >
> >
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> >
>

Reply via email to