Hello,

Specified below is my code base where I am attempting to marshall a complex
type and receiving the follow error.
Am I missing anything here?

 Sending Encoded Messages ..
[error] (run-main) java.lang.ClassCastException: java.lang.String cannot be
cast to com.test.groups.MemberRecord
java.lang.ClassCastException: java.lang.String cannot be cast to
com.test.groups.MemberRecord

~~~~~~~~~~~~~~~~~~~~~~~~~

package com.test.groups

import java.util._
import kafka.javaapi.producer.Producer
import kafka.producer.KeyedMessage
import kafka.producer.ProducerConfig
import kafka.serializer.Encoder
import kafka.message.Message
import java.io.ByteArrayOutputStream
import java.io.DataOutputStream
import kafka.serializer.Decoder
import java.io.ByteArrayInputStream
import java.io.DataInputStream
import kafka.utils.VerifiableProperties



class MemberRecord(val memberId: Int, val name: String, val location:
String) {
  override def toString = {
    "(" + memberId + "," + name + "," + location + ")"
  }
}

class MemberRecordEncoder(props: VerifiableProperties = null) extends
Encoder[MemberRecord] {
  def toBytes(member: MemberRecord): Array[Byte] = {
    val outputStream = new ByteArrayOutputStream()
    val dos = new DataOutputStream(outputStream)
    dos.writeInt(member.memberId)
    dos.writeUTF(member.name)
    dos.writeUTF(member.location)
    outputStream.flush
    outputStream.toByteArray
  }
}

class MemberRecordDecoder(props: VerifiableProperties = null) extends
Decoder[MemberRecord] {
  def fromBytes(messageByte: Array[Byte]):MemberRecord = {
    val message = new Message(messageByte)
    val inputStream = new ByteArrayInputStream(message.payload.array,
message.payload.arrayOffset, message.payload.limit)
    val dataInputStream = new DataInputStream(inputStream)
    new MemberRecord(dataInputStream.readInt, dataInputStream.readUTF,
dataInputStream.readUTF)
  }
}


object SimpleDataProducer {
  def main(args: Array[String]) {
    val events = 100

    val eprops = new Properties
    eprops.put("metadata.broker.list", "localhost:9092")
    eprops.put("serializer.class", "com.test.groups.MemberRecordEncoder");
    eprops.put("request.required.acks", "1")


    val econfg = new ProducerConfig(eprops)
    val eproducer = new Producer[String, MemberRecord](econfg)

    val dataP = new SimpleDataProducer
    println(" Sending Encoded Messages .. ")
    dataP.sendEncodedMessage(10,eproducer)

    println(" Shutting down Producer ")
    eproducer.close
    println(" Successfully shut down Producer ")
  }
}

class SimpleDataProducer {

  val rnd = new Random

  def sendEncodedMessage(nEvents: Int, producer: Producer[String,
MemberRecord]) {
    for (nEvents <- 0 to nEvents) {
    val message = new MemberRecord(rnd.nextInt(255), "John", "US")
    val producerData = new KeyedMessage[String,
MemberRecord]("topic-Encoded", "encode", message)
    producer.send(producerData)
    }
  }

}

Reply via email to