thanks. just checked code below. in the code below, the line that calls Random.nextInt() seems to be called only *a few times* , and all the rest of the cases getPartition() is called, the cached sendPartitionPerTopicCache.get(topic) seems to be called, so apparently you won't get an even partition distribution ?????
the code I got is from commit 7847e9c703f3a0b70519666cdb8a6e4c8e37c3a7 "./core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala" 336 lines --66%-- 222,46 73% private def getPartition(topic: String, key: Any, topicPartitionList: Seq[PartitionAndLeader]): Int = { val numPartitions = topicPartitionList.size if(numPartitions <= 0) throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist") val partition = if(key == null) { // If the key is null, we don't really need a partitioner // So we look up in the send partition cache for the topic to decide the target partition val id = sendPartitionPerTopicCache.get(topic) id match { case Some(partitionId) => // directly return the partitionId without checking availability of the leader, // since we want to postpone the failure until the send operation anyways partitionId case None => val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) if (availablePartitions.isEmpty) throw new LeaderNotAvailableException("No leader for any partition in topic " + topic) val index = Utils.abs(Random.nextInt) % availablePartitions.size val partitionId = availablePartitions(index).partitionId sendPartitionPerTopicCache.put(topic, partitionId) partitionId } } else partitioner.partition(key, numPartitions) if(partition < 0 || partition >= numPartitions) throw new UnknownTopicOrPartitionException("Invalid partition id: " + partition + " for topic " + topic + "; Valid values are in the inclusive range of [0, " + (numPartitions-1) + "]") trace("Assigning message of topic %s and key %s to a selected partition %d".format(topic, if (key == null) "[none]" else key.toString, partition)) partition } On Mon, Mar 2, 2015 at 3:58 PM, Mayuresh Gharat <gharatmayures...@gmail.com> wrote: > Probably your keys are getting hashed to only those partitions. I don't > think anything is wrong here. > You can check how the default hashPartitioner is used in the code and try > to do the same for your keys before you send them and check which > partitions are those going to. > > The default hashpartitioner does something like this : > > hash(key) % numPartitions. > > Thanks, > > Mayuresh > > On Mon, Mar 2, 2015 at 3:52 PM, Yang <teddyyyy...@gmail.com> wrote: > > > we have 10 partitions for a topic, and omit the explicit partition param > in > > the message creation: > > > > KeyedMessage<String, String> data = new KeyedMessage<String, String> > > (mytopic, myMessageContent); // partition key need to be polished > > producer.send(data); > > > > > > > > but on average 3--5 of the partitions are empty. > > > > > > > > what went wrong? > > > > thanks > > Yang > > > > > > -- > -Regards, > Mayuresh R. Gharat > (862) 250-7125 >