The issue is probably the hashcode of byte[] is based on reference, instead of the value. You can try binding the producer to [Array[byte],Array[byte]] and using ByteArrayPartitioner. Alternatively, you can bind the producer to [String,Array[byte]] and the default partitioner should work.
Thanks, Jun On Wed, Nov 26, 2014 at 12:54 PM, Haoming Zhang <haoming.zh...@outlook.com> wrote: > Hi François, > > I agree with you and Svante, so I think my logic is correct. But I really > can't find why my problem happens and I was stuck here for weeks. I think > posting some of my codes might be helpful, could you please give the codes > a quick check? > > Here is the producer codes, I didn't use the custom partitioner.class : > val props = new Properties() > > val codec = if(compress) DefaultCompressionCodec.codec else > NoCompressionCodec.codec > > props.put("compression.codec", codec.toString) > props.put("producer.type", if(synchronously) "sync" else "async") > props.put("metadata.broker.list", brokerList) > props.put("batch.num.messages", batchSize.toString) > props.put("message.send.max.retries", messageSendMaxRetries.toString) > props.put("request.required.acks",requestRequiredAcks.toString) > props.put("client.id",clientId.toString) > > val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props)) > > def kafkaMesssage(message: Array[Byte], partition: Array[Byte]): > KeyedMessage[AnyRef, AnyRef] = { > if (partition == null) { > new KeyedMessage(topic,message) > } else { > new KeyedMessage(topic,partition,message) > } > } > > def send(message: String, partition: String = null): Unit = > send(message.getBytes("UTF8"), if (partition == null) null else > partition.getBytes("UTF8")) > > def send(message: Array[Byte], partition: Array[Byte]): Unit = { > try { > producer.send(kafkaMesssage(message, partition)) > } catch { > case e: Exception => > e.printStackTrace > System.exit(1) > } > } > > And here is how I use the producer, create a producer instance, then use > this instance to send three message. Currently I create the partition key > as Integer, then convert it to Byte Arrays: > val testMessage = UUID.randomUUID().toString > val testTopic = "sample1" > val groupId_1 = "testGroup" > > var testStatus = false > > print("starting sample broker testing") > val producer = new KafkaProducer(testTopic, "localhost:9092") > > val numList = List(0,1,2); > for (a <- numList) { > var key = java.nio.ByteBuffer.allocate(4).putInt(a).array() // Create > a partition key as Byte Array > producer.send(testMessage.getBytes("UTF8"), key) > } > > I appreciate all the suggestions and help! > > Thanks, > Haoming > > > From: f.langel...@gmail.com > > Date: Wed, 26 Nov 2014 19:57:13 +0000 > > Subject: Re: Partition key not working properly > > To: users@kafka.apache.org > > > > Hi haoming, > > > > As far as I know, svante is right. > > > > Maybe you modified your default partitioner? > > > > or are you sure the same key go to different partitions? maybe its just 2 > > keys that are going to the same partition? > > > > Because it's possible that you have something like that > > - key "1" -> partition 3 > > - key "2" -> partition 2 > > - key "3" -> partition 3 > > > > and nothing in partition 1 > > > > On Wed Nov 26 2014 at 02:35:33 Haoming Zhang <haoming.zh...@outlook.com> > > wrote: > > > > > Hi Svante, > > > > > > Thanks for your reply! > > > > > > As you said, my purpose is let "all messages with the same key goes to > the > > > same partition", but the actual case is even I hard code the same > partition > > > key(let's say the key is "1") for three messages, the messages are > still > > > goes to different partitions. > > > > > > Regards, > > > Haoming > > > > > > > Date: Wed, 26 Nov 2014 08:03:04 +0100 > > > > Subject: Re: Partition key not working properly > > > > From: s...@csi.se > > > > To: users@kafka.apache.org > > > > > > > > By default, the partition key is used for hashing then it's placed > in a > > > > partition that has the appropriate hashed keyspace. > > > > > > > > If you have three physical partitions and then give the partition > key "5" > > > > it has nothing to do with physical partition 5 (that does not exist) > , > > > > similar to physical: partition = hash("5") mod 3 > > > > > > > > > > > > The only guarantee is that all messages with the same key goes to the > > > same > > > > partition. This is useful to make sure that for example all logs > from the > > > > same ip goest to the same partition which means that they can be > read by > > > > the same producer. > > > > > > > > /svante > > > > > > > > > > > > > > > > 2014-11-26 2:42 GMT+01:00 Haoming Zhang <haoming.zh...@outlook.com>: > > > > > > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > I'm struggling with how to use the partition key mechanism > properly. My > > > > > logic is set the partition number as 3, then create three > partition > > > keys > > > > > as "0", "1", "2", then use the partition keys to create three > > > KeyedMessage > > > > > such as > > > > > KeyedMessage(topic, "0", message), > > > > > KeyedMessage(topic, "1", message), > > > > > KeyedMessage(topic, "2", message) > > > > > > > > > > After this, creating a producer instance to send out all the > > > KeyedMessage. > > > > > > > > > > I expecting each KeyedMessage should enter to different partitions > > > > > according to the different partition keys, which means > > > > > KeyedMessage(topic, "0", message) go to Partition 0, > > > > > KeyedMessage(topic, "1", message) go to Partition 1, > > > > > KeyedMessage(topic, "2", message) go to Partition 2 > > > > > > > > > > I'm using Kafka-web-console to watch the topic status, but the > result > > > is > > > > > not like what I'm expecting. KeyedMessage still go to partitions > > > randomly, > > > > > some times two KeyedMessage will enter the same partition even they > > > have > > > > > different partition keys, . > > > > > > > > > > Not sure whether my logic is incorrect or I didn't understand the > > > > > partition key mechanism correctly. Anyone could provides some > sample > > > code > > > > > or explanation would be great! > > > > > > > > > > Thanks, > > > > > Haoming > > > > > > > > > > > > > > >