The following code indicates that you want to use "encode" as the key
associated with the message. Is that your intend? If so, you need to
specify StringEncoder as the "key.serializer.class". Otherwise, use the api
that doesn't specify the key.

    val producerData = new KeyedMessage[String,
MemberRecord]("topic-Encoded", "encode", message)

Thanks,

Jun


On Thu, Jul 18, 2013 at 9:19 PM, prashant amar <amasin...@gmail.com> wrote:

> Can anybody please help with this issue indicated below. I have not heard
> from anyone with a solution.
>
> Thanks in advance
> Amar
>
>
> On Tue, Jul 16, 2013 at 11:35 PM, prashant amar <amasin...@gmail.com>
> wrote:
>
> > Hello,
> >
> > Specified below is my code base where I am attempting to marshall a
> > complex type and receiving the follow error.
> > Am I missing anything here?
> >
> >  Sending Encoded Messages ..
> > [error] (run-main) java.lang.ClassCastException: java.lang.String cannot
> > be cast to com.test.groups.MemberRecord
> > java.lang.ClassCastException: java.lang.String cannot be cast to
> > com.test.groups.MemberRecord
> >
> > ~~~~~~~~~~~~~~~~~~~~~~~~~
> >
> > package com.test.groups
> >
> > import java.util._
> > import kafka.javaapi.producer.Producer
> > import kafka.producer.KeyedMessage
> > import kafka.producer.ProducerConfig
> > import kafka.serializer.Encoder
> > import kafka.message.Message
> > import java.io.ByteArrayOutputStream
> > import java.io.DataOutputStream
> > import kafka.serializer.Decoder
> > import java.io.ByteArrayInputStream
> > import java.io.DataInputStream
> > import kafka.utils.VerifiableProperties
> >
> >
> >
> > class MemberRecord(val memberId: Int, val name: String, val location:
> > String) {
> >   override def toString = {
> >     "(" + memberId + "," + name + "," + location + ")"
> >   }
> > }
> >
> > class MemberRecordEncoder(props: VerifiableProperties = null) extends
> > Encoder[MemberRecord] {
> >   def toBytes(member: MemberRecord): Array[Byte] = {
> >     val outputStream = new ByteArrayOutputStream()
> >     val dos = new DataOutputStream(outputStream)
> >     dos.writeInt(member.memberId)
> >     dos.writeUTF(member.name)
> >     dos.writeUTF(member.location)
> >     outputStream.flush
> >     outputStream.toByteArray
> >   }
> > }
> >
> > class MemberRecordDecoder(props: VerifiableProperties = null) extends
> > Decoder[MemberRecord] {
> >   def fromBytes(messageByte: Array[Byte]):MemberRecord = {
> >     val message = new Message(messageByte)
> >     val inputStream = new ByteArrayInputStream(message.payload.array,
> > message.payload.arrayOffset, message.payload.limit)
> >     val dataInputStream = new DataInputStream(inputStream)
> >     new MemberRecord(dataInputStream.readInt, dataInputStream.readUTF,
> > dataInputStream.readUTF)
> >   }
> > }
> >
> >
> > object SimpleDataProducer {
> >   def main(args: Array[String]) {
> >     val events = 100
> >
> >     val eprops = new Properties
> >     eprops.put("metadata.broker.list", "localhost:9092")
> >     eprops.put("serializer.class",
> "com.test.groups.MemberRecordEncoder");
> >     eprops.put("request.required.acks", "1")
> >
> >
> >     val econfg = new ProducerConfig(eprops)
> >     val eproducer = new Producer[String, MemberRecord](econfg)
> >
> >     val dataP = new SimpleDataProducer
> >     println(" Sending Encoded Messages .. ")
> >     dataP.sendEncodedMessage(10,eproducer)
> >
> >     println(" Shutting down Producer ")
> >     eproducer.close
> >     println(" Successfully shut down Producer ")
> >   }
> > }
> >
> > class SimpleDataProducer {
> >
> >   val rnd = new Random
> >
> >   def sendEncodedMessage(nEvents: Int, producer: Producer[String,
> > MemberRecord]) {
> >     for (nEvents <- 0 to nEvents) {
> >     val message = new MemberRecord(rnd.nextInt(255), "John", "US")
> >     val producerData = new KeyedMessage[String,
> > MemberRecord]("topic-Encoded", "encode", message)
> >     producer.send(producerData)
> >     }
> >   }
> >
> > }
> >
> >
> >
>

Reply via email to