thanks. just checked code below. in the code below, the line that calls
Random.nextInt() seems to be called only *a few times* , and all the rest
of the cases getPartition() is called, the
cached sendPartitionPerTopicCache.get(topic) seems to be called, so
apparently you won't get an even partition distribution ?????

the code I got is from commit 7847e9c703f3a0b70519666cdb8a6e4c8e37c3a7


"./core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala" 336
lines --66%--
                                    222,46        73%


  private def getPartition(topic: String, key: Any, topicPartitionList:
Seq[PartitionAndLeader]): Int = {
    val numPartitions = topicPartitionList.size
    if(numPartitions <= 0)
      throw new UnknownTopicOrPartitionException("Topic " + topic + "
doesn't exist")
    val partition =
      if(key == null) {
        // If the key is null, we don't really need a partitioner
        // So we look up in the send partition cache for the topic to
decide the target partition
        val id = sendPartitionPerTopicCache.get(topic)
        id match {
          case Some(partitionId) =>
            // directly return the partitionId without checking
availability of the leader,
            // since we want to postpone the failure until the send
operation anyways
            partitionId
          case None =>
            val availablePartitions =
topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
            if (availablePartitions.isEmpty)
              throw new LeaderNotAvailableException("No leader for any
partition in topic " + topic)
            val index = Utils.abs(Random.nextInt) % availablePartitions.size
            val partitionId = availablePartitions(index).partitionId
            sendPartitionPerTopicCache.put(topic, partitionId)
            partitionId
        }
      } else
        partitioner.partition(key, numPartitions)
    if(partition < 0 || partition >= numPartitions)
      throw new UnknownTopicOrPartitionException("Invalid partition id: " +
partition + " for topic " + topic +
        "; Valid values are in the inclusive range of [0, " +
(numPartitions-1) + "]")
    trace("Assigning message of topic %s and key %s to a selected partition
%d".format(topic, if (key == null) "[none]" else key.toString, partition))
    partition
  }


On Mon, Mar 2, 2015 at 3:58 PM, Mayuresh Gharat <gharatmayures...@gmail.com>
wrote:

> Probably your keys are getting hashed to only those partitions. I don't
> think anything is wrong here.
> You can check how the default hashPartitioner is used in the code and try
> to do the same for your keys before you send them and check which
> partitions are those going to.
>
> The default hashpartitioner does something like this :
>
> hash(key) % numPartitions.
>
> Thanks,
>
> Mayuresh
>
> On Mon, Mar 2, 2015 at 3:52 PM, Yang <teddyyyy...@gmail.com> wrote:
>
> > we have 10 partitions for a topic, and omit the explicit partition param
> in
> > the message creation:
> >
> > KeyedMessage<String, String> data = new KeyedMessage<String, String>
> > (mytopic,   myMessageContent);   // partition key need to be polished
> > producer.send(data);
> >
> >
> >
> > but on average 3--5 of the partitions are empty.
> >
> >
> >
> > what went wrong?
> >
> > thanks
> > Yang
> >
>
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>

Reply via email to