I believe you are seeing the behavior where the random partitioner is sticky. http://mail-archives.apache.org/mod_mbox/kafka-users/201309.mbox/%3ccahwhrrxax5ynimqnacsk7jcggnhjc340y4qbqoqcismm43u...@mail.gmail.com%3E has details. So with the default 10 minute refresh if your test is only an hour or two with a single producer you would not expect to see all partitions be hit.
Christian On Mon, Mar 2, 2015 at 4:23 PM, Yang <teddyyyy...@gmail.com> wrote: > thanks. just checked code below. in the code below, the line that calls > Random.nextInt() seems to be called only *a few times* , and all the rest > of the cases getPartition() is called, the > cached sendPartitionPerTopicCache.get(topic) seems to be called, so > apparently you won't get an even partition distribution ????? > > the code I got is from commit 7847e9c703f3a0b70519666cdb8a6e4c8e37c3a7 > > > "./core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala" 336 > lines --66%-- > 222,46 73% > > > private def getPartition(topic: String, key: Any, topicPartitionList: > Seq[PartitionAndLeader]): Int = { > val numPartitions = topicPartitionList.size > if(numPartitions <= 0) > throw new UnknownTopicOrPartitionException("Topic " + topic + " > doesn't exist") > val partition = > if(key == null) { > // If the key is null, we don't really need a partitioner > // So we look up in the send partition cache for the topic to > decide the target partition > val id = sendPartitionPerTopicCache.get(topic) > id match { > case Some(partitionId) => > // directly return the partitionId without checking > availability of the leader, > // since we want to postpone the failure until the send > operation anyways > partitionId > case None => > val availablePartitions = > topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) > if (availablePartitions.isEmpty) > throw new LeaderNotAvailableException("No leader for any > partition in topic " + topic) > val index = Utils.abs(Random.nextInt) % > availablePartitions.size > val partitionId = availablePartitions(index).partitionId > sendPartitionPerTopicCache.put(topic, partitionId) > partitionId > } > } else > partitioner.partition(key, numPartitions) > if(partition < 0 || partition >= numPartitions) > throw new UnknownTopicOrPartitionException("Invalid partition id: " + > partition + " for topic " + topic + > "; Valid values are in the inclusive range of [0, " + > (numPartitions-1) + "]") > trace("Assigning message of topic %s and key %s to a selected partition > %d".format(topic, if (key == null) "[none]" else key.toString, partition)) > partition > } > > > On Mon, Mar 2, 2015 at 3:58 PM, Mayuresh Gharat < > gharatmayures...@gmail.com> > wrote: > > > Probably your keys are getting hashed to only those partitions. I don't > > think anything is wrong here. > > You can check how the default hashPartitioner is used in the code and try > > to do the same for your keys before you send them and check which > > partitions are those going to. > > > > The default hashpartitioner does something like this : > > > > hash(key) % numPartitions. > > > > Thanks, > > > > Mayuresh > > > > On Mon, Mar 2, 2015 at 3:52 PM, Yang <teddyyyy...@gmail.com> wrote: > > > > > we have 10 partitions for a topic, and omit the explicit partition > param > > in > > > the message creation: > > > > > > KeyedMessage<String, String> data = new KeyedMessage<String, String> > > > (mytopic, myMessageContent); // partition key need to be polished > > > producer.send(data); > > > > > > > > > > > > but on average 3--5 of the partitions are empty. > > > > > > > > > > > > what went wrong? > > > > > > thanks > > > Yang > > > > > > > > > > > -- > > -Regards, > > Mayuresh R. Gharat > > (862) 250-7125 > > >