Hi,

I am using Spark 0.8.1 with Kafka 0.7. I am trying to set the
parameter fetch.message.max.bytes when creating the Kafka DStream. The only
API that seems to allow this is the following:

kafkaStream[T, D <: kafka.serializer.Decoder[_]](typeClass: Class[T],
decoderClass: Class[D], kafkaParams: Map[String, String], topics:
Map[String, Integer], storageLevel: StorageLevel)

I tried to call this as so:

context.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics,
StorageLevel.MEMORY_AND_DISK())

However, this is causing an exception like:

java.lang.ClassCastException: java.lang.Object cannot be cast to
kafka.serializer.Decoder

at
org.apache.spark.streaming.dstream.KafkaReceiver.onStart(KafkaInputDStream.scala:105)

at
org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:125)

at
org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:158)

at
org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:154)

Can anyone provide help on how to set these parameters ?

Thanks

Hemanth

Reply via email to