Hi Harsha, I just tried to hard code a string message, then convert the message to byte array, but no lucky...
The following is how my program works: Create a hardcode key, which is String, then convert to byte array, iterate a network message, send the message one by one: networkelements foreach { case networkelement => val bytes = Injection(networkelement) logger.info(s"Synchronously sending Tweet $networkelement to topic ${producerApp.defaultTopic}") val hardKey = "2" val parkey = hardKey.getBytes("UTF8") val topic = producerApp.defaultTopic producerApp.send(parkey, bytes, topic) } Here is how the networkelements created, where NetworkElement is a class that created by avro, I think you can ignore it: val networkelements = fixture.messages val fixture = { val BeginningOfEpoch = 0.seconds val AnyTimestamp = 1234.seconds val now = System.currentTimeMillis().millis new { val t1 = new NetworkElement("ANY_USER_1", "ANY_TEXT_1", now.toSeconds) val t2 = new NetworkElement("ANY_USER_2", "ANY_TEXT_2", BeginningOfEpoch.toSeconds) val t3 = new NetworkElement("ANY_USER_3", "ANY_TEXT_3", AnyTimestamp.toSeconds) val messages = Seq(t1, t2, t3) } } BTW, I defined the Key and Val types as following: type Key = Array[Byte] type Val = Array[Byte] Thanks, Haoming > From: ka...@harsha.io > To: users@kafka.apache.org > Subject: Re: Partition Key Cannot be Send Out by Producer > Date: Thu, 20 Nov 2014 16:59:19 -0800 > > also the (key: Key, value: Val, topic: Option[String]) "value" should be > a string converted to a byte array. > Can you send a example of your key and value data. > > > On Thu, Nov 20, 2014, at 04:53 PM, Haoming Zhang wrote: > > Hi Harsha, > > > > Thanks for suggestion! > > > > I have checked this link before, and I tried to create the partition key > > like the following: > > val hardKey = "2" > > val parkey = hardKey.getBytes("UTF8") > > > > But I still get the same exception. I also tried set "UTF8" as "UTF-8", > > but no luck... > > > > Regards, > > Haoming > > > > > From: ka...@harsha.io > > > To: users@kafka.apache.org > > > Subject: Re: Partition Key Cannot be Send Out by Producer > > > Date: Thu, 20 Nov 2014 16:43:11 -0800 > > > > > > Hi Haoming, > > > Take a look at the code here > > > > > > https://github.com/stealthly/scala-kafka/blob/master/src/main/scala/KafkaProducer.scala > > > for your partKey it should be string and when you converting it into > > > byte array you can use partKey.getBytes("UTF8") > > > -Harsha > > > > > > On Thu, Nov 20, 2014, at 03:57 PM, Haoming Zhang wrote: > > > > Hi all, > > > > > > > > I'm a beginner of Kafka, currently I'm stuck by how to send out a > > > > KeyedMessage by producer. I would like to design a partition > > > > function to route the message based on the key, but the producer cannot > > > > send the KeyedMessage and I got this exception: > > > > java.lang.ClassCastException: [B cannot be cast to java.lang.String > > > > > > > > What I tried is hardcode a partition key ( I tried String and Integer, > > > > currently it is Integer ), then convert the partition key to Byte Array: > > > > val converter = new DataTypeConvertion > > > > val hardKey = 2 > > > > val partkey = converter.intToByteArray(hardKey) > > > > > > > > Then create a KeyedMessage by the following function: > > > > > > > > private def toMessage(value: Val, key: Option[Key] = None, topic: > > > > Option[String] = None): KeyedMessage[Key, Val] = { > > > > val t = topic.get > > > > require(!t.isEmpty, "Topic must not be empty") > > > > key match { > > > > case Some(key) => new KeyedMessage(t, key, value) > > > > case _ => new KeyedMessage(t, value) > > > > } > > > > } > > > > > > > > Then try to send the KeyedMessage by a Kafka producer: > > > > > > > > def send(key: Key, value: Val, topic: Option[String] = None) { > > > > val msg = toMessage(value, Option(key), topic) > > > > print(msg + "\n") > > > > print("msg.key" + msg.key + "\n") > > > > print("msg.message" + msg.message + "\n") > > > > print("msg.partKey" + msg.partKey + "\n") > > > > print("msg.topic" + msg.topic + "\n") > > > > try { > > > > p.send(msg) //P is an instance of producer, exception happens in > > > > this line > > > > } catch { > > > > case e: Exception => > > > > e.printStackTrace() > > > > System.exit(1) > > > > } > > > > } > > > > > > > > As you can see, I added many print statement in the above function, and > > > > the following is the output of above function: > > > > KeyedMessage(testingInput,[B@7ad40950,[B@7ad40950,[B@7ce18764) > > > > msg.key: [B@7ad40950 > > > > msg.message: [B@7ce18764 > > > > msg.partKey: [B@7ad40950 > > > > msg.topic: testingInput > > > > > > > > The key of KeyedMessage is displayed as [B@7ad40950 , I think it is a > > > > memory address and the exception (java.lang.ClassCastException: [B > > > > cannot > > > > be cast to java.lang.String) happens when "send" function try to convert > > > > the Byte Array to String. > > > > > > > > Am I wrong on creating a key in Byte Array type? > > > > Some examples of how to use KeyedMessage will be great! > > > > > > > > Regards, > > > > Haoming > > > > > > > > > >