The issue is probably the hashcode of byte[] is based on reference, instead
of the value. You can try binding the producer to [Array[byte],Array[byte]]
and using ByteArrayPartitioner. Alternatively, you can bind the producer to
[String,Array[byte]] and the default partitioner should work.

Thanks,

Jun

On Wed, Nov 26, 2014 at 12:54 PM, Haoming Zhang <haoming.zh...@outlook.com>
wrote:

> Hi François,
>
> I agree with you and Svante, so I think my logic is correct. But I really
> can't find why my problem happens and I was stuck here for weeks. I think
> posting some of my codes might be helpful, could you please give the codes
> a quick check?
>
> Here is the producer codes, I didn't use the custom partitioner.class :
>   val props = new Properties()
>
>   val codec = if(compress) DefaultCompressionCodec.codec else
> NoCompressionCodec.codec
>
>   props.put("compression.codec", codec.toString)
>   props.put("producer.type", if(synchronously) "sync" else "async")
>   props.put("metadata.broker.list", brokerList)
>   props.put("batch.num.messages", batchSize.toString)
>   props.put("message.send.max.retries", messageSendMaxRetries.toString)
>   props.put("request.required.acks",requestRequiredAcks.toString)
>   props.put("client.id",clientId.toString)
>
>   val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
>
>   def kafkaMesssage(message: Array[Byte], partition: Array[Byte]):
> KeyedMessage[AnyRef, AnyRef] = {
>      if (partition == null) {
>        new KeyedMessage(topic,message)
>      } else {
>        new KeyedMessage(topic,partition,message)
>      }
>   }
>
>   def send(message: String, partition: String = null): Unit =
> send(message.getBytes("UTF8"), if (partition == null) null else
> partition.getBytes("UTF8"))
>
>   def send(message: Array[Byte], partition: Array[Byte]): Unit = {
>     try {
>       producer.send(kafkaMesssage(message, partition))
>     } catch {
>       case e: Exception =>
>         e.printStackTrace
>         System.exit(1)
>     }
>   }
>
> And here is how I use the producer, create a producer instance, then use
> this instance to send three message. Currently I create the partition key
> as Integer, then convert it to Byte Arrays:
>   val testMessage = UUID.randomUUID().toString
>   val testTopic = "sample1"
>   val groupId_1 = "testGroup"
>
>   var testStatus = false
>
>   print("starting sample broker testing")
>   val producer = new KafkaProducer(testTopic, "localhost:9092")
>
>   val numList = List(0,1,2);
>   for (a <- numList) {
>     var key = java.nio.ByteBuffer.allocate(4).putInt(a).array() // Create
> a partition key as Byte Array
>     producer.send(testMessage.getBytes("UTF8"), key)
>   }
>
> I appreciate all the suggestions and help!
>
> Thanks,
> Haoming
>
> > From: f.langel...@gmail.com
> > Date: Wed, 26 Nov 2014 19:57:13 +0000
> > Subject: Re: Partition key not working properly
> > To: users@kafka.apache.org
> >
> > Hi haoming,
> >
> > As far as I know, svante is right.
> >
> > Maybe you modified your default partitioner?
> >
> > or are you sure the same key go to different partitions? maybe its just 2
> > keys that are going to the same partition?
> >
> > Because it's possible that you have something like that
> > - key "1" -> partition 3
> > - key "2" -> partition 2
> > - key "3" -> partition 3
> >
> > and nothing in partition 1
> >
> > On Wed Nov 26 2014 at 02:35:33 Haoming Zhang <haoming.zh...@outlook.com>
> > wrote:
> >
> > > Hi Svante,
> > >
> > > Thanks for your reply!
> > >
> > > As you said, my purpose is let "all messages with the same key goes to
> the
> > > same partition", but the actual case is even I hard code the same
> partition
> > > key(let's say the key is "1") for three messages, the messages are
> still
> > > goes to different partitions.
> > >
> > > Regards,
> > > Haoming
> > >
> > > > Date: Wed, 26 Nov 2014 08:03:04 +0100
> > > > Subject: Re: Partition key not working properly
> > > > From: s...@csi.se
> > > > To: users@kafka.apache.org
> > > >
> > > > By default, the partition key is used for hashing then it's placed
> in a
> > > > partition that has the appropriate hashed keyspace.
> > > >
> > > > If you have three physical partitions and then give the partition
> key "5"
> > > > it has nothing to do with physical partition 5 (that does not exist)
> ,
> > > > similar to physical: partition = hash("5") mod 3
> > > >
> > > >
> > > > The only guarantee is that all messages with the same key goes to the
> > > same
> > > > partition. This is useful to make sure that for example all logs
> from the
> > > > same ip goest to the same partition which means that they can be
> read by
> > > > the same producer.
> > > >
> > > > /svante
> > > >
> > > >
> > > >
> > > > 2014-11-26 2:42 GMT+01:00 Haoming Zhang <haoming.zh...@outlook.com>:
> > > >
> > > > >
> > > > >
> > > > >
> > > > > Hi all,
> > > > >
> > > > > I'm struggling with how to use the partition key mechanism
> properly. My
> > > > > logic is set the partition number as 3, then  create three
> partition
> > > keys
> > > > > as "0", "1", "2", then use the partition keys to create three
> > > KeyedMessage
> > > > > such as
> > > > > KeyedMessage(topic, "0", message),
> > > > > KeyedMessage(topic, "1", message),
> > > > > KeyedMessage(topic, "2", message)
> > > > >
> > > > > After this, creating a producer instance to send out all the
> > > KeyedMessage.
> > > > >
> > > > > I expecting each KeyedMessage should enter to different partitions
> > > > > according to the different partition keys, which means
> > > > > KeyedMessage(topic, "0", message) go to Partition 0,
> > > > > KeyedMessage(topic, "1", message) go to Partition 1,
> > > > > KeyedMessage(topic, "2", message) go to Partition 2
> > > > >
> > > > > I'm using Kafka-web-console to watch the topic status, but the
> result
> > > is
> > > > > not like what I'm expecting. KeyedMessage still go to partitions
> > > randomly,
> > > > > some times two KeyedMessage will enter the same partition even they
> > > have
> > > > > different partition keys, .
> > > > >
> > > > > Not sure whether my logic is incorrect or I didn't understand the
> > > > > partition key mechanism correctly. Anyone could provides some
> sample
> > > code
> > > > > or explanation would be great!
> > > > >
> > > > > Thanks,
> > > > > Haoming
> > > > >
> > > > >
> > >
>
>

Reply via email to