Can anybody please help with this issue indicated below. I have not heard from anyone with a solution.
Thanks in advance Amar On Tue, Jul 16, 2013 at 11:35 PM, prashant amar <amasin...@gmail.com> wrote: > Hello, > > Specified below is my code base where I am attempting to marshall a > complex type and receiving the follow error. > Am I missing anything here? > > Sending Encoded Messages .. > [error] (run-main) java.lang.ClassCastException: java.lang.String cannot > be cast to com.test.groups.MemberRecord > java.lang.ClassCastException: java.lang.String cannot be cast to > com.test.groups.MemberRecord > > ~~~~~~~~~~~~~~~~~~~~~~~~~ > > package com.test.groups > > import java.util._ > import kafka.javaapi.producer.Producer > import kafka.producer.KeyedMessage > import kafka.producer.ProducerConfig > import kafka.serializer.Encoder > import kafka.message.Message > import java.io.ByteArrayOutputStream > import java.io.DataOutputStream > import kafka.serializer.Decoder > import java.io.ByteArrayInputStream > import java.io.DataInputStream > import kafka.utils.VerifiableProperties > > > > class MemberRecord(val memberId: Int, val name: String, val location: > String) { > override def toString = { > "(" + memberId + "," + name + "," + location + ")" > } > } > > class MemberRecordEncoder(props: VerifiableProperties = null) extends > Encoder[MemberRecord] { > def toBytes(member: MemberRecord): Array[Byte] = { > val outputStream = new ByteArrayOutputStream() > val dos = new DataOutputStream(outputStream) > dos.writeInt(member.memberId) > dos.writeUTF(member.name) > dos.writeUTF(member.location) > outputStream.flush > outputStream.toByteArray > } > } > > class MemberRecordDecoder(props: VerifiableProperties = null) extends > Decoder[MemberRecord] { > def fromBytes(messageByte: Array[Byte]):MemberRecord = { > val message = new Message(messageByte) > val inputStream = new ByteArrayInputStream(message.payload.array, > message.payload.arrayOffset, message.payload.limit) > val dataInputStream = new DataInputStream(inputStream) > new MemberRecord(dataInputStream.readInt, dataInputStream.readUTF, > dataInputStream.readUTF) > } > } > > > object SimpleDataProducer { > def main(args: Array[String]) { > val events = 100 > > val eprops = new Properties > eprops.put("metadata.broker.list", "localhost:9092") > eprops.put("serializer.class", "com.test.groups.MemberRecordEncoder"); > eprops.put("request.required.acks", "1") > > > val econfg = new ProducerConfig(eprops) > val eproducer = new Producer[String, MemberRecord](econfg) > > val dataP = new SimpleDataProducer > println(" Sending Encoded Messages .. ") > dataP.sendEncodedMessage(10,eproducer) > > println(" Shutting down Producer ") > eproducer.close > println(" Successfully shut down Producer ") > } > } > > class SimpleDataProducer { > > val rnd = new Random > > def sendEncodedMessage(nEvents: Int, producer: Producer[String, > MemberRecord]) { > for (nEvents <- 0 to nEvents) { > val message = new MemberRecord(rnd.nextInt(255), "John", "US") > val producerData = new KeyedMessage[String, > MemberRecord]("topic-Encoded", "encode", message) > producer.send(producerData) > } > } > > } > > >