Hi all, I'm a beginner of Kafka, currently I'm stuck by how to send out a KeyedMessage by producer. I would like to design a partition function to route the message based on the key, but the producer cannot send the KeyedMessage and I got this exception: java.lang.ClassCastException: [B cannot be cast to java.lang.String
What I tried is hardcode a partition key ( I tried String and Integer, currently it is Integer ), then convert the partition key to Byte Array: val converter = new DataTypeConvertion val hardKey = 2 val partkey = converter.intToByteArray(hardKey) Then create a KeyedMessage by the following function: private def toMessage(value: Val, key: Option[Key] = None, topic: Option[String] = None): KeyedMessage[Key, Val] = { val t = topic.get require(!t.isEmpty, "Topic must not be empty") key match { case Some(key) => new KeyedMessage(t, key, value) case _ => new KeyedMessage(t, value) } } Then try to send the KeyedMessage by a Kafka producer: def send(key: Key, value: Val, topic: Option[String] = None) { val msg = toMessage(value, Option(key), topic) print(msg + "\n") print("msg.key" + msg.key + "\n") print("msg.message" + msg.message + "\n") print("msg.partKey" + msg.partKey + "\n") print("msg.topic" + msg.topic + "\n") try { p.send(msg) //P is an instance of producer, exception happens in this line } catch { case e: Exception => e.printStackTrace() System.exit(1) } } As you can see, I added many print statement in the above function, and the following is the output of above function: KeyedMessage(testingInput,[B@7ad40950,[B@7ad40950,[B@7ce18764) msg.key: [B@7ad40950 msg.message: [B@7ce18764 msg.partKey: [B@7ad40950 msg.topic: testingInput The key of KeyedMessage is displayed as [B@7ad40950 , I think it is a memory address and the exception (java.lang.ClassCastException: [B cannot be cast to java.lang.String) happens when "send" function try to convert the Byte Array to String. Am I wrong on creating a key in Byte Array type? Some examples of how to use KeyedMessage will be great! Regards, Haoming