Hello, Specified below is my code base where I am attempting to marshall a complex type and receiving the follow error. Am I missing anything here?
Sending Encoded Messages .. [error] (run-main) java.lang.ClassCastException: java.lang.String cannot be cast to com.test.groups.MemberRecord java.lang.ClassCastException: java.lang.String cannot be cast to com.test.groups.MemberRecord ~~~~~~~~~~~~~~~~~~~~~~~~~ package com.test.groups import java.util._ import kafka.javaapi.producer.Producer import kafka.producer.KeyedMessage import kafka.producer.ProducerConfig import kafka.serializer.Encoder import kafka.message.Message import java.io.ByteArrayOutputStream import java.io.DataOutputStream import kafka.serializer.Decoder import java.io.ByteArrayInputStream import java.io.DataInputStream import kafka.utils.VerifiableProperties class MemberRecord(val memberId: Int, val name: String, val location: String) { override def toString = { "(" + memberId + "," + name + "," + location + ")" } } class MemberRecordEncoder(props: VerifiableProperties = null) extends Encoder[MemberRecord] { def toBytes(member: MemberRecord): Array[Byte] = { val outputStream = new ByteArrayOutputStream() val dos = new DataOutputStream(outputStream) dos.writeInt(member.memberId) dos.writeUTF(member.name) dos.writeUTF(member.location) outputStream.flush outputStream.toByteArray } } class MemberRecordDecoder(props: VerifiableProperties = null) extends Decoder[MemberRecord] { def fromBytes(messageByte: Array[Byte]):MemberRecord = { val message = new Message(messageByte) val inputStream = new ByteArrayInputStream(message.payload.array, message.payload.arrayOffset, message.payload.limit) val dataInputStream = new DataInputStream(inputStream) new MemberRecord(dataInputStream.readInt, dataInputStream.readUTF, dataInputStream.readUTF) } } object SimpleDataProducer { def main(args: Array[String]) { val events = 100 val eprops = new Properties eprops.put("metadata.broker.list", "localhost:9092") eprops.put("serializer.class", "com.test.groups.MemberRecordEncoder"); eprops.put("request.required.acks", "1") val econfg = new ProducerConfig(eprops) val eproducer = new Producer[String, MemberRecord](econfg) val dataP = new SimpleDataProducer println(" Sending Encoded Messages .. ") dataP.sendEncodedMessage(10,eproducer) println(" Shutting down Producer ") eproducer.close println(" Successfully shut down Producer ") } } class SimpleDataProducer { val rnd = new Random def sendEncodedMessage(nEvents: Int, producer: Producer[String, MemberRecord]) { for (nEvents <- 0 to nEvents) { val message = new MemberRecord(rnd.nextInt(255), "John", "US") val producerData = new KeyedMessage[String, MemberRecord]("topic-Encoded", "encode", message) producer.send(producerData) } } }