FWIW, this intensely confusing behavior is fixed in the new producer which should give the expected result by default.
-Jay On Mon, Mar 2, 2015 at 6:36 PM, Yang <teddyyyy...@gmail.com> wrote: > Thanks. This is indeed the reason. > On Mar 2, 2015 4:38 PM, "Christian Csar" <christ...@csar.us> wrote: > > > I believe you are seeing the behavior where the random partitioner is > > sticky. > > > > > http://mail-archives.apache.org/mod_mbox/kafka-users/201309.mbox/%3ccahwhrrxax5ynimqnacsk7jcggnhjc340y4qbqoqcismm43u...@mail.gmail.com%3E > > has details. So with the default 10 minute refresh if your test is only > an > > hour or two with a single producer you would not expect to see all > > partitions be hit. > > > > Christian > > > > On Mon, Mar 2, 2015 at 4:23 PM, Yang <teddyyyy...@gmail.com> wrote: > > > > > thanks. just checked code below. in the code below, the line that calls > > > Random.nextInt() seems to be called only *a few times* , and all the > rest > > > of the cases getPartition() is called, the > > > cached sendPartitionPerTopicCache.get(topic) seems to be called, so > > > apparently you won't get an even partition distribution ????? > > > > > > the code I got is from commit 7847e9c703f3a0b70519666cdb8a6e4c8e37c3a7 > > > > > > > > > "./core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala" > > 336 > > > lines --66%-- > > > 222,46 73% > > > > > > > > > private def getPartition(topic: String, key: Any, topicPartitionList: > > > Seq[PartitionAndLeader]): Int = { > > > val numPartitions = topicPartitionList.size > > > if(numPartitions <= 0) > > > throw new UnknownTopicOrPartitionException("Topic " + topic + " > > > doesn't exist") > > > val partition = > > > if(key == null) { > > > // If the key is null, we don't really need a partitioner > > > // So we look up in the send partition cache for the topic to > > > decide the target partition > > > val id = sendPartitionPerTopicCache.get(topic) > > > id match { > > > case Some(partitionId) => > > > // directly return the partitionId without checking > > > availability of the leader, > > > // since we want to postpone the failure until the send > > > operation anyways > > > partitionId > > > case None => > > > val availablePartitions = > > > topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) > > > if (availablePartitions.isEmpty) > > > throw new LeaderNotAvailableException("No leader for any > > > partition in topic " + topic) > > > val index = Utils.abs(Random.nextInt) % > > > availablePartitions.size > > > val partitionId = availablePartitions(index).partitionId > > > sendPartitionPerTopicCache.put(topic, partitionId) > > > partitionId > > > } > > > } else > > > partitioner.partition(key, numPartitions) > > > if(partition < 0 || partition >= numPartitions) > > > throw new UnknownTopicOrPartitionException("Invalid partition id: > > " + > > > partition + " for topic " + topic + > > > "; Valid values are in the inclusive range of [0, " + > > > (numPartitions-1) + "]") > > > trace("Assigning message of topic %s and key %s to a selected > > partition > > > %d".format(topic, if (key == null) "[none]" else key.toString, > > partition)) > > > partition > > > } > > > > > > > > > On Mon, Mar 2, 2015 at 3:58 PM, Mayuresh Gharat < > > > gharatmayures...@gmail.com> > > > wrote: > > > > > > > Probably your keys are getting hashed to only those partitions. I > don't > > > > think anything is wrong here. > > > > You can check how the default hashPartitioner is used in the code and > > try > > > > to do the same for your keys before you send them and check which > > > > partitions are those going to. > > > > > > > > The default hashpartitioner does something like this : > > > > > > > > hash(key) % numPartitions. > > > > > > > > Thanks, > > > > > > > > Mayuresh > > > > > > > > On Mon, Mar 2, 2015 at 3:52 PM, Yang <teddyyyy...@gmail.com> wrote: > > > > > > > > > we have 10 partitions for a topic, and omit the explicit partition > > > param > > > > in > > > > > the message creation: > > > > > > > > > > KeyedMessage<String, String> data = new KeyedMessage<String, > String> > > > > > (mytopic, myMessageContent); // partition key need to be > polished > > > > > producer.send(data); > > > > > > > > > > > > > > > > > > > > but on average 3--5 of the partitions are empty. > > > > > > > > > > > > > > > > > > > > what went wrong? > > > > > > > > > > thanks > > > > > Yang > > > > > > > > > > > > > > > > > > > > > -- > > > > -Regards, > > > > Mayuresh R. Gharat > > > > (862) 250-7125 > > > > > > > > > >