Can anybody please help with this issue indicated below. I have not heard
from anyone with a solution.

Thanks in advance
Amar


On Tue, Jul 16, 2013 at 11:35 PM, prashant amar <amasin...@gmail.com> wrote:

> Hello,
>
> Specified below is my code base where I am attempting to marshall a
> complex type and receiving the follow error.
> Am I missing anything here?
>
>  Sending Encoded Messages ..
> [error] (run-main) java.lang.ClassCastException: java.lang.String cannot
> be cast to com.test.groups.MemberRecord
> java.lang.ClassCastException: java.lang.String cannot be cast to
> com.test.groups.MemberRecord
>
> ~~~~~~~~~~~~~~~~~~~~~~~~~
>
> package com.test.groups
>
> import java.util._
> import kafka.javaapi.producer.Producer
> import kafka.producer.KeyedMessage
> import kafka.producer.ProducerConfig
> import kafka.serializer.Encoder
> import kafka.message.Message
> import java.io.ByteArrayOutputStream
> import java.io.DataOutputStream
> import kafka.serializer.Decoder
> import java.io.ByteArrayInputStream
> import java.io.DataInputStream
> import kafka.utils.VerifiableProperties
>
>
>
> class MemberRecord(val memberId: Int, val name: String, val location:
> String) {
>   override def toString = {
>     "(" + memberId + "," + name + "," + location + ")"
>   }
> }
>
> class MemberRecordEncoder(props: VerifiableProperties = null) extends
> Encoder[MemberRecord] {
>   def toBytes(member: MemberRecord): Array[Byte] = {
>     val outputStream = new ByteArrayOutputStream()
>     val dos = new DataOutputStream(outputStream)
>     dos.writeInt(member.memberId)
>     dos.writeUTF(member.name)
>     dos.writeUTF(member.location)
>     outputStream.flush
>     outputStream.toByteArray
>   }
> }
>
> class MemberRecordDecoder(props: VerifiableProperties = null) extends
> Decoder[MemberRecord] {
>   def fromBytes(messageByte: Array[Byte]):MemberRecord = {
>     val message = new Message(messageByte)
>     val inputStream = new ByteArrayInputStream(message.payload.array,
> message.payload.arrayOffset, message.payload.limit)
>     val dataInputStream = new DataInputStream(inputStream)
>     new MemberRecord(dataInputStream.readInt, dataInputStream.readUTF,
> dataInputStream.readUTF)
>   }
> }
>
>
> object SimpleDataProducer {
>   def main(args: Array[String]) {
>     val events = 100
>
>     val eprops = new Properties
>     eprops.put("metadata.broker.list", "localhost:9092")
>     eprops.put("serializer.class", "com.test.groups.MemberRecordEncoder");
>     eprops.put("request.required.acks", "1")
>
>
>     val econfg = new ProducerConfig(eprops)
>     val eproducer = new Producer[String, MemberRecord](econfg)
>
>     val dataP = new SimpleDataProducer
>     println(" Sending Encoded Messages .. ")
>     dataP.sendEncodedMessage(10,eproducer)
>
>     println(" Shutting down Producer ")
>     eproducer.close
>     println(" Successfully shut down Producer ")
>   }
> }
>
> class SimpleDataProducer {
>
>   val rnd = new Random
>
>   def sendEncodedMessage(nEvents: Int, producer: Producer[String,
> MemberRecord]) {
>     for (nEvents <- 0 to nEvents) {
>     val message = new MemberRecord(rnd.nextInt(255), "John", "US")
>     val producerData = new KeyedMessage[String,
> MemberRecord]("topic-Encoded", "encode", message)
>     producer.send(producerData)
>     }
>   }
>
> }
>
>
>

Reply via email to